diff options
Diffstat (limited to 'pkg/server/stream.go')
| -rw-r--r-- | pkg/server/stream.go | 193 |
1 files changed, 0 insertions, 193 deletions
diff --git a/pkg/server/stream.go b/pkg/server/stream.go deleted file mode 100644 index 7c9cc82..0000000 --- a/pkg/server/stream.go +++ /dev/null @@ -1,193 +0,0 @@ -package server - -import ( - "tunnel/pkg/server/module" - "tunnel/pkg/server/queue" - "tunnel/pkg/server/socket" - "strings" - "sort" - "fmt" - "log" -) - -type stream struct { - id string - args string - - in, out socket.S - m []module.M -} - -type streams map[string]*stream - -func (s *stream) String() string { - return fmt.Sprintf("stream(%s)", s.id) -} - -func (s *stream) Close() { - s.in.Close() - s.out.Close() -} - -func (s *stream) run() { - for { - if in, err := s.in.Open(); err != nil { - log.Println(s, err) - } else { - log.Printf("%s accept %s", s, in) - go s.run2(in) - } - } -} - -func (s *stream) watchChannel(rq, wq queue.Q, c socket.Channel) { - watch := func (q queue.Q, f func (q queue.Q) error) { - if err := f(q); err != nil { - log.Println(s, err) - } - } - - go func () { - watch(wq, c.Send) - close(wq) - }() - - go watch(rq, c.Recv) -} - -func (s *stream) watchPipe(rq, wq queue.Q, f func (rq, wq queue.Q) error) { - go func () { - if err := f(rq, wq); err != nil { - log.Println(s, err) - } - - close(wq) - }() -} - -func (s *stream) run2(in socket.Channel) { - out, err := s.out.Open() - if err != nil { - log.Println(s, err) - in.Close() - return - } - - rq, wq := queue.New(), queue.New() - - s.watchChannel(rq, wq, in) - - for _, m := range s.m { - send, recv := m.Open() - if send != nil { - q := queue.New() - s.watchPipe(wq, q, send) - wq = q - } - if recv != nil { - q := queue.New() - s.watchPipe(q, rq, recv) - rq = q - } - } - - s.watchChannel(wq, rq, out) -} - -func newStream(id string, args []string) (*stream, error) { - var in, out socket.S - var err error - - n := len(args) - 1 - - if in, err = socket.New(args[0]); err != nil { - return nil, err - } - - if out, err = socket.New(args[n]); err != nil { - in.Close() - return nil, err - } - - s := &stream{ - id: id, - args: strings.Join(args, " "), - in: in, - out: out, - } - - reverse := false - - for _, arg := range args[1:n] { - var m module.M - - if arg == "-" { - reverse = true - continue - } - - if arg == "+" { - reverse = false - continue - } - - if m, err = module.New(arg); err != nil { - s.Close() - return nil, err - } - - if reverse { - m = module.Reverse(m) - reverse = false - } - - s.m = append(s.m, m) - } - - if reverse { - s.Close() - return nil, fmt.Errorf("bad '-' usage") - } - - go s.run() - - return s, nil -} - -func streamAdd(r *request) { - if r.argc < 3 { - r.Fatal("not enough args") - } - - id := r.args[0] - if _, ok := r.c.s.streams[id]; ok { - r.Fatal("duplicate id") - } - - s, err := newStream(id, r.args[1:]) - if err != nil { - r.Fatal(err) - } - - r.c.s.streams[id] = s -} - -func streamShow(r *request) { - var keys []string - - for k := range r.c.s.streams { - keys = append(keys, k) - } - - sort.Strings(keys) - - for _, k := range keys { - s := r.c.s.streams[k] - r.Println(s.id, s.args) - } -} - -func init() { - newCmd(streamAdd, "add") - newCmd(streamShow, "show") -} |
