diff options
| author | Mikhail Osipov <mike.osipov@gmail.com> | 2020-03-05 02:11:12 +0300 |
|---|---|---|
| committer | Mikhail Osipov <mike.osipov@gmail.com> | 2020-03-05 02:11:12 +0300 |
| commit | 878fb83abd4003b50896b3dd3f9fcf242440991b (patch) | |
| tree | 9479bd46b1d165fd0f687f6f658b0cb9b1773e95 /pkg | |
| parent | ff51af3c48cee0d15c9802b2adb44f59a77e4c75 (diff) | |
add stat for streams
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/pack/pack.go | 57 | ||||
| -rw-r--r-- | pkg/server/socket/socket.go | 26 | ||||
| -rw-r--r-- | pkg/server/tunnel.go | 52 |
3 files changed, 120 insertions, 15 deletions
diff --git a/pkg/pack/pack.go b/pkg/pack/pack.go new file mode 100644 index 0000000..e186773 --- /dev/null +++ b/pkg/pack/pack.go @@ -0,0 +1,57 @@ +package pack + +import ( + "encoding/binary" + "fmt" + "io" +) + +type Reader interface { + io.Reader + io.ByteReader +} + +type Encoder struct { + w io.Writer +} + +type Decoder struct { + r Reader +} + +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w: w} +} + +func NewDecoder(r Reader) *Decoder { + return &Decoder{r: r} +} + +func (e *Encoder) Lps(b []byte) error { + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutVarint(buf, int64(len(b))) + + if _, err := e.w.Write(buf[:n]); err != nil { + return err + } + + _, err := e.w.Write(b) + return err +} + +func (d *Decoder) Lps() ([]byte, error) { + if n, err := binary.ReadVarint(d.r); err != nil { + return nil, fmt.Errorf("lps: %w", err) + } else { + buf := make([]byte, n) + + if _, err := io.ReadFull(d.r, buf); err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return nil, fmt.Errorf("lps: %w", err) + } + + return buf, nil + } +} diff --git a/pkg/server/socket/socket.go b/pkg/server/socket/socket.go index a9fa319..0060ce6 100644 --- a/pkg/server/socket/socket.go +++ b/pkg/server/socket/socket.go @@ -15,6 +15,7 @@ import ( var errAlreadyClosed = errors.New("already closed") type Channel interface { + Origin() string Send(wq queue.Q) error Recv(rq queue.Q) error Close() error @@ -35,8 +36,9 @@ type dialSocket struct { } type connChannel struct { - conn net.Conn - once sync.Once + origin string + conn net.Conn + once sync.Once } type loopSocket struct{} @@ -45,8 +47,8 @@ type loopChannel struct { q queue.Q } -func newConnChannel(conn net.Conn) Channel { - return &connChannel{conn: conn} +func newConnChannel(origin string, conn net.Conn) Channel { + return &connChannel{origin: origin, conn: conn} } func (c *connChannel) final(f func() error, err error) error { @@ -61,6 +63,10 @@ func (c *connChannel) final(f func() error, err error) error { return err } +func (c *connChannel) Origin() string { + return c.origin +} + func (c *connChannel) Send(wq queue.Q) error { err := queue.IoCopy(c.conn, wq.Writer()) return c.final(c.Close, err) @@ -113,7 +119,11 @@ func (s *listenSocket) Open(env env.Env) (Channel, error) { if err != nil { return nil, err } - return newConnChannel(conn), nil + + addr := conn.RemoteAddr() + origin := fmt.Sprintf("%s/%s", addr.Network(), addr) + + return newConnChannel(origin, conn), nil } func (s *listenSocket) String() string { @@ -144,12 +154,16 @@ func (s *dialSocket) Open(env env.Env) (Channel, error) { if err != nil { return nil, err } - return newConnChannel(conn), nil + return newConnChannel("-", conn), nil } func (s *dialSocket) Close() { } +func (c *loopChannel) Origin() string { + return "loop" +} + func (c *loopChannel) Send(wq queue.Q) error { return queue.Copy(c.q, wq) } diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index 8465fac..09d275b 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "tunnel/pkg/config" "tunnel/pkg/server/env" @@ -17,6 +18,11 @@ import ( "tunnel/pkg/server/socket" ) +type metric struct { + tx uint64 + rx uint64 +} + type stream struct { id int t *tunnel @@ -25,6 +31,11 @@ type stream struct { wg sync.WaitGroup in, out socket.Channel pipes []*hook.Pipe + + m struct { + in metric + out metric + } } type tunnel struct { @@ -156,6 +167,17 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream { return s } +func (s *stream) info() string { + d := time.Since(s.since).Milliseconds() + + return fmt.Sprintf("%.3fms %d/%d -> %d/%d", + float64(d)/1000.0, + s.m.in.tx, + s.m.in.rx, + s.m.out.rx, + s.m.out.tx) +} + func (s *stream) waitAndClose() { s.wg.Wait() @@ -169,10 +191,10 @@ func (s *stream) waitAndClose() { p.Close() } - log.Println(s.t, s, "close") + log.Println(s.t, s, "done", s.info()) } -func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { +func (s *stream) channel(c socket.Channel, m *metric, rq, wq queue.Q) { watch := func(q queue.Q, f func(q queue.Q) error) { defer s.wg.Done() @@ -181,15 +203,27 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { } } + counter := func(c *uint64, src, dst queue.Q) { + for b := range src { + dst <- b + atomic.AddUint64(c, uint64(len(b))) + } + close(dst) + } + s.wg.Add(2) go func() { - watch(wq, c.Send) - close(wq) + q := queue.New() + go counter(&m.tx, q, wq) + watch(q, c.Send) + close(q) }() go func() { - watch(rq, c.Recv) + q := queue.New() + go counter(&m.rx, rq, q) + watch(q, c.Recv) rq.Dry() }() } @@ -214,7 +248,7 @@ func (s *stream) run() { rq, wq := queue.New(), queue.New() - s.channel(s.in, rq, wq) + s.channel(s.in, &s.m.in, rq, wq) for _, h := range s.t.hooks { p, err := h.Open(s.env) @@ -239,7 +273,7 @@ func (s *stream) run() { s.pipes = append(s.pipes, p) } - s.channel(s.out, wq, rq) + s.channel(s.out, &s.m.out, wq, rq) } func (s *stream) stop() { @@ -412,8 +446,8 @@ func showStreams(r *request) { r.Println(t.id, t.args) foreachStream(t.streams, func(s *stream) { - tm := s.since.Format(config.TimeFormat) - r.Println("\t", s.id, tm, s.in, s.out) + when := s.since.Format(config.TimeFormat) + r.Println("\t", s.id, when, s.in.Origin(), s.out.Origin(), s.info()) }) } }) |
