summaryrefslogtreecommitdiff
path: root/pkg/server/tunnel.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/tunnel.go')
-rw-r--r--pkg/server/tunnel.go37
1 files changed, 23 insertions, 14 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go
index e00db6c..8465fac 100644
--- a/pkg/server/tunnel.go
+++ b/pkg/server/tunnel.go
@@ -24,6 +24,7 @@ type stream struct {
since time.Time
wg sync.WaitGroup
in, out socket.Channel
+ pipes []*hook.Pipe
}
type tunnel struct {
@@ -150,19 +151,25 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream {
t.streams[s.id] = s
t.mu.Unlock()
- go func() {
- s.wg.Wait()
+ go s.waitAndClose()
- s.t.mu.Lock()
- delete(s.t.streams, s.id)
- s.t.mu.Unlock()
+ return s
+}
- s.t.wg.Done()
+func (s *stream) waitAndClose() {
+ s.wg.Wait()
- log.Println(s.t, s, "close")
- }()
+ s.t.mu.Lock()
+ delete(s.t.streams, s.id)
+ s.t.mu.Unlock()
- return s
+ s.t.wg.Done()
+
+ for _, p := range s.pipes {
+ p.Close()
+ }
+
+ log.Println(s.t, s, "close")
}
func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
@@ -210,24 +217,26 @@ func (s *stream) run() {
s.channel(s.in, rq, wq)
for _, h := range s.t.hooks {
- send, recv, err := hook.Open(h, s.env)
+ p, err := h.Open(s.env)
if err != nil {
// FIXME: abort stream on error
log.Println(s.t, s, h, err)
continue
}
- if send != nil {
+ if p.Send != nil {
q := queue.New()
- s.pipe(h, send, wq, q)
+ s.pipe(h, p.Send, wq, q)
wq = q
}
- if recv != nil {
+ if p.Recv != nil {
q := queue.New()
- s.pipe(h, recv, q, rq)
+ s.pipe(h, p.Recv, q, rq)
rq = q
}
+
+ s.pipes = append(s.pipes, p)
}
s.channel(s.out, wq, rq)