summaryrefslogtreecommitdiff
path: root/pkg/server/tunnel.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/tunnel.go')
-rw-r--r--pkg/server/tunnel.go107
1 files changed, 57 insertions, 50 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go
index d59d9db..592f48c 100644
--- a/pkg/server/tunnel.go
+++ b/pkg/server/tunnel.go
@@ -19,6 +19,7 @@ import (
)
const maxRecentSize = 8
+const maxQueueLimit = 16384
type metric struct {
tx uint64
@@ -56,7 +57,7 @@ type tunnel struct {
quit chan struct{}
done chan struct{}
- mono bool
+ queue chan struct{}
in, out socket.S
hooks []hook.H
@@ -82,6 +83,7 @@ func (t *tunnel) stopServe() {
func (t *tunnel) stopStreams() {
t.mu.Lock()
for _, s := range t.streams {
+ log.Println(s, "stop")
s.stop()
}
t.mu.Unlock()
@@ -96,70 +98,72 @@ func (t *tunnel) Close() {
log.Println(t, "delete")
}
-func (t *tunnel) isQuit() bool {
+func (t *tunnel) alive() bool {
select {
case <-t.quit:
- return true
- default:
return false
+ default:
+ return true
}
}
-func (t *tunnel) serve() {
- var wg sync.WaitGroup
+func (t *tunnel) acquire() bool {
+ select {
+ case t.queue <- struct{}{}:
+ return true
+ case <-t.quit:
+ return false
+ }
+}
- for {
- if in, err := t.in.Open(t.env); err != nil {
- if t.isQuit() {
- break
- }
+func (t *tunnel) release() {
+ <-t.queue
+}
- log.Println(t, err)
- time.Sleep(5 * time.Second)
- } else {
- log.Println(t, "open", in)
+func (t *tunnel) sleep(d time.Duration) {
+ tmr := time.NewTimer(d)
+ select {
+ case <-tmr.C:
+ case <-t.quit:
+ }
+ tmr.Stop()
+}
- wg.Add(1)
+func (t *tunnel) serve() {
+ for t.acquire() {
+ var ok bool
- go func() {
- t.handle(in)
- wg.Done()
- }()
+ env := t.env.Fork()
- if t.mono {
- wg.Wait()
- t.wg.Wait()
+ if in, err := t.in.Open(env); err == nil {
+ if out, err := t.out.Open(env); err == nil {
+ s := t.newStream(env, in, out)
+ log.Println(t, s, "create", in, out)
+ ok = true
+ } else {
+ log.Println(t, err)
+ in.Close()
}
+ } else if t.alive() {
+ log.Println(t, err)
+ t.sleep(5 * time.Second)
}
- }
-
- wg.Wait()
- close(t.done)
-}
-
-func (t *tunnel) handle(in socket.Channel) {
- out, err := t.out.Open(t.env)
- if err != nil {
- log.Println(t, err)
- in.Close()
- return
+ if !ok {
+ t.release()
+ }
}
- log.Println(t, "open", out)
-
- s := t.newStream(in, out)
-
- log.Println(t, s, "create", in, out)
+ close(t.done)
}
-func (t *tunnel) newStream(in, out socket.Channel) *stream {
+func (t *tunnel) newStream(env env.Env, in, out socket.Channel) *stream {
s := &stream{
t: t,
in: in,
out: out,
+ env: env,
id: t.nextSid,
- env: t.env.Copy(),
since: time.Now(),
}
@@ -193,7 +197,7 @@ func (t *tunnel) delStream(s *stream) {
func (s *stream) info() string {
d := time.Since(s.since).Milliseconds()
- return fmt.Sprintf("%.3fms %d/%d -> %d/%d",
+ return fmt.Sprintf("%.3fs %d/%d -> %d/%d",
float64(d)/1000.0,
s.m.in.tx,
s.m.in.rx,
@@ -207,9 +211,12 @@ func (s *stream) waitAndClose() {
s.until = time.Now()
s.t.delStream(s)
-
+ s.t.release()
s.t.wg.Done()
+ s.in.Close()
+ s.out.Close()
+
for _, p := range s.pipes {
p.Close()
}
@@ -318,7 +325,7 @@ func parseHooks(args []string, env env.Env) ([]hook.H, error) {
return hooks, nil
}
-func newTunnel(mono bool, args []string, env env.Env) (*tunnel, error) {
+func newTunnel(limit int, args []string, env env.Env) (*tunnel, error) {
var in, out socket.S
var hooks []hook.H
var err error
@@ -344,11 +351,11 @@ func newTunnel(mono bool, args []string, env env.Env) (*tunnel, error) {
args: strings.Join(args, " "),
quit: make(chan struct{}),
done: make(chan struct{}),
- mono: mono,
hooks: hooks,
in: in,
out: out,
env: env,
+ queue: make(chan struct{}, limit),
streams: make(map[int]*stream),
}
@@ -364,10 +371,10 @@ func isOkTunnelName(s string) bool {
func tunnelAdd(r *request) {
args := r.args
name := ""
- mono := false
+ limit := maxQueueLimit
for len(args) > 1 {
- if args[0] == "name" && len(args) > 1 {
+ if args[0] == "name" {
name = args[1]
if !isOkTunnelName(name) {
r.Fatal("bad name")
@@ -382,7 +389,7 @@ func tunnelAdd(r *request) {
}
if args[0] == "mono" {
- mono = true
+ limit = 1
args = args[1:]
continue
}
@@ -394,7 +401,7 @@ func tunnelAdd(r *request) {
r.Fatal("not enough args")
}
- t, err := newTunnel(mono, args, r.c.s.env)
+ t, err := newTunnel(limit, args, r.c.s.env)
if err != nil {
r.Fatal(err)
}