summaryrefslogtreecommitdiff
path: root/pkg/server
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server')
-rw-r--r--pkg/server/socket/socket.go26
-rw-r--r--pkg/server/tunnel.go52
2 files changed, 63 insertions, 15 deletions
diff --git a/pkg/server/socket/socket.go b/pkg/server/socket/socket.go
index a9fa319..0060ce6 100644
--- a/pkg/server/socket/socket.go
+++ b/pkg/server/socket/socket.go
@@ -15,6 +15,7 @@ import (
var errAlreadyClosed = errors.New("already closed")
type Channel interface {
+ Origin() string
Send(wq queue.Q) error
Recv(rq queue.Q) error
Close() error
@@ -35,8 +36,9 @@ type dialSocket struct {
}
type connChannel struct {
- conn net.Conn
- once sync.Once
+ origin string
+ conn net.Conn
+ once sync.Once
}
type loopSocket struct{}
@@ -45,8 +47,8 @@ type loopChannel struct {
q queue.Q
}
-func newConnChannel(conn net.Conn) Channel {
- return &connChannel{conn: conn}
+func newConnChannel(origin string, conn net.Conn) Channel {
+ return &connChannel{origin: origin, conn: conn}
}
func (c *connChannel) final(f func() error, err error) error {
@@ -61,6 +63,10 @@ func (c *connChannel) final(f func() error, err error) error {
return err
}
+func (c *connChannel) Origin() string {
+ return c.origin
+}
+
func (c *connChannel) Send(wq queue.Q) error {
err := queue.IoCopy(c.conn, wq.Writer())
return c.final(c.Close, err)
@@ -113,7 +119,11 @@ func (s *listenSocket) Open(env env.Env) (Channel, error) {
if err != nil {
return nil, err
}
- return newConnChannel(conn), nil
+
+ addr := conn.RemoteAddr()
+ origin := fmt.Sprintf("%s/%s", addr.Network(), addr)
+
+ return newConnChannel(origin, conn), nil
}
func (s *listenSocket) String() string {
@@ -144,12 +154,16 @@ func (s *dialSocket) Open(env env.Env) (Channel, error) {
if err != nil {
return nil, err
}
- return newConnChannel(conn), nil
+ return newConnChannel("-", conn), nil
}
func (s *dialSocket) Close() {
}
+func (c *loopChannel) Origin() string {
+ return "loop"
+}
+
func (c *loopChannel) Send(wq queue.Q) error {
return queue.Copy(c.q, wq)
}
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go
index 8465fac..09d275b 100644
--- a/pkg/server/tunnel.go
+++ b/pkg/server/tunnel.go
@@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"tunnel/pkg/config"
"tunnel/pkg/server/env"
@@ -17,6 +18,11 @@ import (
"tunnel/pkg/server/socket"
)
+type metric struct {
+ tx uint64
+ rx uint64
+}
+
type stream struct {
id int
t *tunnel
@@ -25,6 +31,11 @@ type stream struct {
wg sync.WaitGroup
in, out socket.Channel
pipes []*hook.Pipe
+
+ m struct {
+ in metric
+ out metric
+ }
}
type tunnel struct {
@@ -156,6 +167,17 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream {
return s
}
+func (s *stream) info() string {
+ d := time.Since(s.since).Milliseconds()
+
+ return fmt.Sprintf("%.3fms %d/%d -> %d/%d",
+ float64(d)/1000.0,
+ s.m.in.tx,
+ s.m.in.rx,
+ s.m.out.rx,
+ s.m.out.tx)
+}
+
func (s *stream) waitAndClose() {
s.wg.Wait()
@@ -169,10 +191,10 @@ func (s *stream) waitAndClose() {
p.Close()
}
- log.Println(s.t, s, "close")
+ log.Println(s.t, s, "done", s.info())
}
-func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
+func (s *stream) channel(c socket.Channel, m *metric, rq, wq queue.Q) {
watch := func(q queue.Q, f func(q queue.Q) error) {
defer s.wg.Done()
@@ -181,15 +203,27 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
}
}
+ counter := func(c *uint64, src, dst queue.Q) {
+ for b := range src {
+ dst <- b
+ atomic.AddUint64(c, uint64(len(b)))
+ }
+ close(dst)
+ }
+
s.wg.Add(2)
go func() {
- watch(wq, c.Send)
- close(wq)
+ q := queue.New()
+ go counter(&m.tx, q, wq)
+ watch(q, c.Send)
+ close(q)
}()
go func() {
- watch(rq, c.Recv)
+ q := queue.New()
+ go counter(&m.rx, rq, q)
+ watch(q, c.Recv)
rq.Dry()
}()
}
@@ -214,7 +248,7 @@ func (s *stream) run() {
rq, wq := queue.New(), queue.New()
- s.channel(s.in, rq, wq)
+ s.channel(s.in, &s.m.in, rq, wq)
for _, h := range s.t.hooks {
p, err := h.Open(s.env)
@@ -239,7 +273,7 @@ func (s *stream) run() {
s.pipes = append(s.pipes, p)
}
- s.channel(s.out, wq, rq)
+ s.channel(s.out, &s.m.out, wq, rq)
}
func (s *stream) stop() {
@@ -412,8 +446,8 @@ func showStreams(r *request) {
r.Println(t.id, t.args)
foreachStream(t.streams, func(s *stream) {
- tm := s.since.Format(config.TimeFormat)
- r.Println("\t", s.id, tm, s.in, s.out)
+ when := s.since.Format(config.TimeFormat)
+ r.Println("\t", s.id, when, s.in.Origin(), s.out.Origin(), s.info())
})
}
})