diff options
Diffstat (limited to 'pkg/server/tunnel.go')
| -rw-r--r-- | pkg/server/tunnel.go | 42 |
1 files changed, 39 insertions, 3 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index e2fce92..afa0f31 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -36,6 +36,8 @@ type stream struct { wg sync.WaitGroup in, out socket.Conn pipes []*hook.Pipe + mu sync.Mutex + zombie bool m struct { in metric @@ -295,6 +297,10 @@ func (s *stream) channel(c socket.Conn, m *metric, rq, wq queue.Q) { if err != nil { log.Println(s.t, s, c, err) } + + s.mu.Lock() + s.zombie = true + s.mu.Unlock() } counter := func(c *uint64, src, dst queue.Q) { @@ -499,6 +505,26 @@ func tunnelDel(r *request, id string) { } } +func streamKick(r *request, tid string, sid int) { + e, ok := r.c.s.tunnels[tid] + if !ok { + r.Fatal("no such tunnel") + } + + t := e.(*tunnel) + + t.mu.Lock() + defer t.mu.Unlock() + + s, ok := t.streams[sid] + if !ok { + r.Fatal("no such stream") + } + + log.Println(s, "kick") + s.stop() +} + func tunnelRename(r *request, old, new string) { if !isOkTunnelName(new) { r.Fatal("bad name") @@ -551,7 +577,15 @@ func showActive(r *request) { defer t.mu.Unlock() foreachStream(t.streams, func(s *stream) { - r.Println(t.id, s.id, s.in, s.out, s.info()) + var opts string + + s.mu.Lock() + if s.zombie { + opts = "zombie" + } + s.mu.Unlock() + + r.Println(t.id, s.id, s.in, s.out, s.info(), opts) }) }) } @@ -577,9 +611,11 @@ func showRecent(r *request) { func init() { newCmd("add", tunnelAdd, "[name id] [limit N] [single] socket [hook ...] socket") - newCmd("del", tunnelDel, "id") + newCmd("del", tunnelDel, "tunnel-id") + + newCmd("kick", streamKick, "tunnel-id stream-id") - newCmd("rename", tunnelRename, "old-id new-id") + newCmd("rename", tunnelRename, "tunnel-old-id tunnel-new-id") newCmd("show", showTunnels, "") |
