package server import ( "tunnel/pkg/server/module" "tunnel/pkg/server/queue" "tunnel/pkg/server/socket" "strings" "sort" "fmt" "log" ) type stream struct { id string args string in, out socket.S m []module.M } type streams map[string]*stream func (s *stream) String() string { return fmt.Sprintf("stream(%s)", s.id) } func (s *stream) Close() { s.in.Close() s.out.Close() } func (s *stream) run() { for { if in, err := s.in.Open(); err != nil { log.Println(s, err) } else { log.Printf("%s accept %s", s, in) go s.run2(in) } } } func (s *stream) watchChannel(rq, wq queue.Q, c socket.Channel) { watch := func (q queue.Q, f func (q queue.Q) error) { if err := f(q); err != nil { log.Println(s, err) } } go func () { watch(wq, c.Send) close(wq) }() go watch(rq, c.Recv) } func (s *stream) watchPipe(rq, wq queue.Q, f func (rq, wq queue.Q) error) { go func () { if err := f(rq, wq); err != nil { log.Println(s, err) } close(wq) }() } func (s *stream) run2(in socket.Channel) { out, err := s.out.Open() if err != nil { log.Println(s, err) in.Close() return } rq, wq := queue.New(), queue.New() s.watchChannel(rq, wq, in) for _, m := range s.m { send, recv := m.Open() if send != nil { q := queue.New() s.watchPipe(wq, q, send) wq = q } if recv != nil { q := queue.New() s.watchPipe(q, rq, recv) rq = q } } s.watchChannel(wq, rq, out) } func newStream(id string, args []string) (*stream, error) { var in, out socket.S var err error n := len(args) - 1 if in, err = socket.New(args[0]); err != nil { return nil, err } if out, err = socket.New(args[n]); err != nil { in.Close() return nil, err } s := &stream{ id: id, args: strings.Join(args, " "), in: in, out: out, } reverse := false for _, arg := range args[1:n] { var m module.M if arg == "-" { reverse = true continue } if arg == "+" { reverse = false continue } if m, err = module.New(arg); err != nil { s.Close() return nil, err } if reverse { m = module.Reverse(m) reverse = false } s.m = append(s.m, m) } if reverse { s.Close() return nil, fmt.Errorf("bad '-' usage") } go s.run() return s, nil } func streamAdd(r *request) { if r.argc < 3 { r.Fatal("not enough args") } id := r.args[0] if _, ok := r.c.s.streams[id]; ok { r.Fatal("duplicate id") } s, err := newStream(id, r.args[1:]) if err != nil { r.Fatal(err) } r.c.s.streams[id] = s } func streamShow(r *request) { var keys []string for k := range r.c.s.streams { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { s := r.c.s.streams[k] r.Println(s.id, s.args) } } func init() { newCmd(streamAdd, "add") newCmd(streamShow, "show") }