diff options
Diffstat (limited to 'pkg/server/tunnel.go')
| -rw-r--r-- | pkg/server/tunnel.go | 74 |
1 files changed, 37 insertions, 37 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index 32c81c3..58ae0e1 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -1,29 +1,29 @@ package server import ( - "tunnel/pkg/server/socket" - "tunnel/pkg/server/module" - "tunnel/pkg/server/queue" - "tunnel/pkg/server/env" - "tunnel/pkg/config" - "strings" - "time" - "sort" - "sync" "fmt" "log" + "sort" + "strings" + "sync" + "time" + "tunnel/pkg/config" + "tunnel/pkg/server/env" + "tunnel/pkg/server/module" + "tunnel/pkg/server/queue" + "tunnel/pkg/server/socket" ) type stream struct { - id int - t *tunnel - since time.Time - wg sync.WaitGroup + id int + t *tunnel + since time.Time + wg sync.WaitGroup in, out socket.Channel } type tunnel struct { - id string + id string args string streams map[int]*stream @@ -37,7 +37,7 @@ type tunnel struct { done chan struct{} in, out socket.S - m []module.M + m []module.M env env.Env } @@ -99,7 +99,7 @@ func (t *tunnel) serve() { wg.Add(1) - go func () { + go func() { t.handle(in) wg.Done() }() @@ -128,10 +128,10 @@ func (t *tunnel) handle(in socket.Channel) { func (t *tunnel) newStream(in, out socket.Channel) *stream { s := &stream{ - t: t, - in: in, - out: out, - id: t.nextSid, + t: t, + in: in, + out: out, + id: t.nextSid, since: time.Now(), } @@ -142,7 +142,7 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream { t.streams[s.id] = s t.mu.Unlock() - go func () { + go func() { s.wg.Wait() s.t.mu.Lock() @@ -158,7 +158,7 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream { } func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { - watch := func (q queue.Q, f func (q queue.Q) error) { + watch := func(q queue.Q, f func(q queue.Q) error) { defer s.wg.Done() if err := f(q); err != nil { @@ -168,12 +168,12 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { s.wg.Add(2) - go func () { + go func() { watch(wq, c.Send) close(wq) }() - go func () { + go func() { watch(rq, c.Recv) rq.Dry() }() @@ -182,7 +182,7 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { func (s *stream) pipe(m module.M, p module.Pipe, rq, wq queue.Q) { s.wg.Add(1) - go func () { + go func() { defer s.wg.Done() if err := p(rq, wq); err != nil { @@ -287,13 +287,13 @@ func newTunnel(args []string, env env.Env) (*tunnel, error) { } t := &tunnel{ - args: strings.Join(args, " "), - quit: make(chan struct{}), - done: make(chan struct{}), - m: mm, - in: in, - out: out, - env: env, + args: strings.Join(args, " "), + quit: make(chan struct{}), + done: make(chan struct{}), + m: mm, + in: in, + out: out, + env: env, streams: make(map[int]*stream), } @@ -372,7 +372,7 @@ func tunnelRename(r *request) { } } -func foreachTunnel(m automap, f func (t *tunnel)) { +func foreachTunnel(m automap, f func(t *tunnel)) { var keys []string for k := range m { @@ -386,7 +386,7 @@ func foreachTunnel(m automap, f func (t *tunnel)) { } } -func foreachStream(m map[int]*stream, f func (s *stream)) { +func foreachStream(m map[int]*stream, f func(s *stream)) { var keys []int for k := range m { @@ -401,20 +401,20 @@ func foreachStream(m map[int]*stream, f func (s *stream)) { } func tunnelShow(r *request) { - foreachTunnel(r.c.s.tunnels, func (t *tunnel) { + foreachTunnel(r.c.s.tunnels, func(t *tunnel) { r.Println(t.id, t.args) }) } func streamShow(r *request) { - foreachTunnel(r.c.s.tunnels, func (t *tunnel) { + foreachTunnel(r.c.s.tunnels, func(t *tunnel) { t.mu.Lock() defer t.mu.Unlock() if len(t.streams) > 0 { r.Println(t.id, t.args) - foreachStream(t.streams, func (s *stream) { + foreachStream(t.streams, func(s *stream) { tm := s.since.Format(config.TimeFormat) r.Println("\t", s.id, tm, s.in, s.out) }) |
