diff options
Diffstat (limited to 'pkg/server/tunnel.go')
| -rw-r--r-- | pkg/server/tunnel.go | 52 |
1 files changed, 43 insertions, 9 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index 8465fac..09d275b 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "tunnel/pkg/config" "tunnel/pkg/server/env" @@ -17,6 +18,11 @@ import ( "tunnel/pkg/server/socket" ) +type metric struct { + tx uint64 + rx uint64 +} + type stream struct { id int t *tunnel @@ -25,6 +31,11 @@ type stream struct { wg sync.WaitGroup in, out socket.Channel pipes []*hook.Pipe + + m struct { + in metric + out metric + } } type tunnel struct { @@ -156,6 +167,17 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream { return s } +func (s *stream) info() string { + d := time.Since(s.since).Milliseconds() + + return fmt.Sprintf("%.3fms %d/%d -> %d/%d", + float64(d)/1000.0, + s.m.in.tx, + s.m.in.rx, + s.m.out.rx, + s.m.out.tx) +} + func (s *stream) waitAndClose() { s.wg.Wait() @@ -169,10 +191,10 @@ func (s *stream) waitAndClose() { p.Close() } - log.Println(s.t, s, "close") + log.Println(s.t, s, "done", s.info()) } -func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { +func (s *stream) channel(c socket.Channel, m *metric, rq, wq queue.Q) { watch := func(q queue.Q, f func(q queue.Q) error) { defer s.wg.Done() @@ -181,15 +203,27 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { } } + counter := func(c *uint64, src, dst queue.Q) { + for b := range src { + dst <- b + atomic.AddUint64(c, uint64(len(b))) + } + close(dst) + } + s.wg.Add(2) go func() { - watch(wq, c.Send) - close(wq) + q := queue.New() + go counter(&m.tx, q, wq) + watch(q, c.Send) + close(q) }() go func() { - watch(rq, c.Recv) + q := queue.New() + go counter(&m.rx, rq, q) + watch(q, c.Recv) rq.Dry() }() } @@ -214,7 +248,7 @@ func (s *stream) run() { rq, wq := queue.New(), queue.New() - s.channel(s.in, rq, wq) + s.channel(s.in, &s.m.in, rq, wq) for _, h := range s.t.hooks { p, err := h.Open(s.env) @@ -239,7 +273,7 @@ func (s *stream) run() { s.pipes = append(s.pipes, p) } - s.channel(s.out, wq, rq) + s.channel(s.out, &s.m.out, wq, rq) } func (s *stream) stop() { @@ -412,8 +446,8 @@ func showStreams(r *request) { r.Println(t.id, t.args) foreachStream(t.streams, func(s *stream) { - tm := s.since.Format(config.TimeFormat) - r.Println("\t", s.id, tm, s.in, s.out) + when := s.since.Format(config.TimeFormat) + r.Println("\t", s.id, when, s.in.Origin(), s.out.Origin(), s.info()) }) } }) |
