summaryrefslogtreecommitdiff
path: root/pkg/server/tunnel.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/tunnel.go')
-rw-r--r--pkg/server/tunnel.go52
1 files changed, 43 insertions, 9 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go
index 8465fac..09d275b 100644
--- a/pkg/server/tunnel.go
+++ b/pkg/server/tunnel.go
@@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"tunnel/pkg/config"
"tunnel/pkg/server/env"
@@ -17,6 +18,11 @@ import (
"tunnel/pkg/server/socket"
)
+type metric struct {
+ tx uint64
+ rx uint64
+}
+
type stream struct {
id int
t *tunnel
@@ -25,6 +31,11 @@ type stream struct {
wg sync.WaitGroup
in, out socket.Channel
pipes []*hook.Pipe
+
+ m struct {
+ in metric
+ out metric
+ }
}
type tunnel struct {
@@ -156,6 +167,17 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream {
return s
}
+func (s *stream) info() string {
+ d := time.Since(s.since).Milliseconds()
+
+ return fmt.Sprintf("%.3fms %d/%d -> %d/%d",
+ float64(d)/1000.0,
+ s.m.in.tx,
+ s.m.in.rx,
+ s.m.out.rx,
+ s.m.out.tx)
+}
+
func (s *stream) waitAndClose() {
s.wg.Wait()
@@ -169,10 +191,10 @@ func (s *stream) waitAndClose() {
p.Close()
}
- log.Println(s.t, s, "close")
+ log.Println(s.t, s, "done", s.info())
}
-func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
+func (s *stream) channel(c socket.Channel, m *metric, rq, wq queue.Q) {
watch := func(q queue.Q, f func(q queue.Q) error) {
defer s.wg.Done()
@@ -181,15 +203,27 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
}
}
+ counter := func(c *uint64, src, dst queue.Q) {
+ for b := range src {
+ dst <- b
+ atomic.AddUint64(c, uint64(len(b)))
+ }
+ close(dst)
+ }
+
s.wg.Add(2)
go func() {
- watch(wq, c.Send)
- close(wq)
+ q := queue.New()
+ go counter(&m.tx, q, wq)
+ watch(q, c.Send)
+ close(q)
}()
go func() {
- watch(rq, c.Recv)
+ q := queue.New()
+ go counter(&m.rx, rq, q)
+ watch(q, c.Recv)
rq.Dry()
}()
}
@@ -214,7 +248,7 @@ func (s *stream) run() {
rq, wq := queue.New(), queue.New()
- s.channel(s.in, rq, wq)
+ s.channel(s.in, &s.m.in, rq, wq)
for _, h := range s.t.hooks {
p, err := h.Open(s.env)
@@ -239,7 +273,7 @@ func (s *stream) run() {
s.pipes = append(s.pipes, p)
}
- s.channel(s.out, wq, rq)
+ s.channel(s.out, &s.m.out, wq, rq)
}
func (s *stream) stop() {
@@ -412,8 +446,8 @@ func showStreams(r *request) {
r.Println(t.id, t.args)
foreachStream(t.streams, func(s *stream) {
- tm := s.since.Format(config.TimeFormat)
- r.Println("\t", s.id, tm, s.in, s.out)
+ when := s.since.Format(config.TimeFormat)
+ r.Println("\t", s.id, when, s.in.Origin(), s.out.Origin(), s.info())
})
}
})