diff options
| author | Mikhail Osipov <mike.osipov@gmail.com> | 2020-02-29 21:50:58 +0300 |
|---|---|---|
| committer | Mikhail Osipov <mike.osipov@gmail.com> | 2020-02-29 23:48:53 +0300 |
| commit | 8b7283ad01a8dde92cf708f81f6c1105647bafd7 (patch) | |
| tree | 6598c17258bacb4e84f6e486e591460437e93086 /pkg/server/tunnel.go | |
| parent | 7ab641d239e502e09c6f05dfc7efd069fcf3c314 (diff) | |
close pipes at end of stream
Diffstat (limited to 'pkg/server/tunnel.go')
| -rw-r--r-- | pkg/server/tunnel.go | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index e00db6c..8465fac 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -24,6 +24,7 @@ type stream struct { since time.Time wg sync.WaitGroup in, out socket.Channel + pipes []*hook.Pipe } type tunnel struct { @@ -150,19 +151,25 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream { t.streams[s.id] = s t.mu.Unlock() - go func() { - s.wg.Wait() + go s.waitAndClose() - s.t.mu.Lock() - delete(s.t.streams, s.id) - s.t.mu.Unlock() + return s +} - s.t.wg.Done() +func (s *stream) waitAndClose() { + s.wg.Wait() - log.Println(s.t, s, "close") - }() + s.t.mu.Lock() + delete(s.t.streams, s.id) + s.t.mu.Unlock() - return s + s.t.wg.Done() + + for _, p := range s.pipes { + p.Close() + } + + log.Println(s.t, s, "close") } func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { @@ -210,24 +217,26 @@ func (s *stream) run() { s.channel(s.in, rq, wq) for _, h := range s.t.hooks { - send, recv, err := hook.Open(h, s.env) + p, err := h.Open(s.env) if err != nil { // FIXME: abort stream on error log.Println(s.t, s, h, err) continue } - if send != nil { + if p.Send != nil { q := queue.New() - s.pipe(h, send, wq, q) + s.pipe(h, p.Send, wq, q) wq = q } - if recv != nil { + if p.Recv != nil { q := queue.New() - s.pipe(h, recv, q, rq) + s.pipe(h, p.Recv, q, rq) rq = q } + + s.pipes = append(s.pipes, p) } s.channel(s.out, wq, rq) |
