diff options
Diffstat (limited to 'pkg/server/tunnel.go')
| -rw-r--r-- | pkg/server/tunnel.go | 107 |
1 files changed, 57 insertions, 50 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index d59d9db..592f48c 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -19,6 +19,7 @@ import ( ) const maxRecentSize = 8 +const maxQueueLimit = 16384 type metric struct { tx uint64 @@ -56,7 +57,7 @@ type tunnel struct { quit chan struct{} done chan struct{} - mono bool + queue chan struct{} in, out socket.S hooks []hook.H @@ -82,6 +83,7 @@ func (t *tunnel) stopServe() { func (t *tunnel) stopStreams() { t.mu.Lock() for _, s := range t.streams { + log.Println(s, "stop") s.stop() } t.mu.Unlock() @@ -96,70 +98,72 @@ func (t *tunnel) Close() { log.Println(t, "delete") } -func (t *tunnel) isQuit() bool { +func (t *tunnel) alive() bool { select { case <-t.quit: - return true - default: return false + default: + return true } } -func (t *tunnel) serve() { - var wg sync.WaitGroup +func (t *tunnel) acquire() bool { + select { + case t.queue <- struct{}{}: + return true + case <-t.quit: + return false + } +} - for { - if in, err := t.in.Open(t.env); err != nil { - if t.isQuit() { - break - } +func (t *tunnel) release() { + <-t.queue +} - log.Println(t, err) - time.Sleep(5 * time.Second) - } else { - log.Println(t, "open", in) +func (t *tunnel) sleep(d time.Duration) { + tmr := time.NewTimer(d) + select { + case <-tmr.C: + case <-t.quit: + } + tmr.Stop() +} - wg.Add(1) +func (t *tunnel) serve() { + for t.acquire() { + var ok bool - go func() { - t.handle(in) - wg.Done() - }() + env := t.env.Fork() - if t.mono { - wg.Wait() - t.wg.Wait() + if in, err := t.in.Open(env); err == nil { + if out, err := t.out.Open(env); err == nil { + s := t.newStream(env, in, out) + log.Println(t, s, "create", in, out) + ok = true + } else { + log.Println(t, err) + in.Close() } + } else if t.alive() { + log.Println(t, err) + t.sleep(5 * time.Second) } - } - - wg.Wait() - close(t.done) -} - -func (t *tunnel) handle(in socket.Channel) { - out, err := t.out.Open(t.env) - if err != nil { - log.Println(t, err) - in.Close() - return + if !ok { + t.release() + } } - log.Println(t, "open", out) - - s := t.newStream(in, out) - - log.Println(t, s, "create", in, out) + close(t.done) } -func (t *tunnel) newStream(in, out socket.Channel) *stream { +func (t *tunnel) newStream(env env.Env, in, out socket.Channel) *stream { s := &stream{ t: t, in: in, out: out, + env: env, id: t.nextSid, - env: t.env.Copy(), since: time.Now(), } @@ -193,7 +197,7 @@ func (t *tunnel) delStream(s *stream) { func (s *stream) info() string { d := time.Since(s.since).Milliseconds() - return fmt.Sprintf("%.3fms %d/%d -> %d/%d", + return fmt.Sprintf("%.3fs %d/%d -> %d/%d", float64(d)/1000.0, s.m.in.tx, s.m.in.rx, @@ -207,9 +211,12 @@ func (s *stream) waitAndClose() { s.until = time.Now() s.t.delStream(s) - + s.t.release() s.t.wg.Done() + s.in.Close() + s.out.Close() + for _, p := range s.pipes { p.Close() } @@ -318,7 +325,7 @@ func parseHooks(args []string, env env.Env) ([]hook.H, error) { return hooks, nil } -func newTunnel(mono bool, args []string, env env.Env) (*tunnel, error) { +func newTunnel(limit int, args []string, env env.Env) (*tunnel, error) { var in, out socket.S var hooks []hook.H var err error @@ -344,11 +351,11 @@ func newTunnel(mono bool, args []string, env env.Env) (*tunnel, error) { args: strings.Join(args, " "), quit: make(chan struct{}), done: make(chan struct{}), - mono: mono, hooks: hooks, in: in, out: out, env: env, + queue: make(chan struct{}, limit), streams: make(map[int]*stream), } @@ -364,10 +371,10 @@ func isOkTunnelName(s string) bool { func tunnelAdd(r *request) { args := r.args name := "" - mono := false + limit := maxQueueLimit for len(args) > 1 { - if args[0] == "name" && len(args) > 1 { + if args[0] == "name" { name = args[1] if !isOkTunnelName(name) { r.Fatal("bad name") @@ -382,7 +389,7 @@ func tunnelAdd(r *request) { } if args[0] == "mono" { - mono = true + limit = 1 args = args[1:] continue } @@ -394,7 +401,7 @@ func tunnelAdd(r *request) { r.Fatal("not enough args") } - t, err := newTunnel(mono, args, r.c.s.env) + t, err := newTunnel(limit, args, r.c.s.env) if err != nil { r.Fatal(err) } |
