From bd5339bff8bf5f5e877e94dfef265a22570a69c7 Mon Sep 17 00:00:00 2001 From: Mikhail Osipov Date: Mon, 17 Feb 2020 11:56:43 +0300 Subject: first working version --- pkg/server/stream.go | 193 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 pkg/server/stream.go (limited to 'pkg/server/stream.go') diff --git a/pkg/server/stream.go b/pkg/server/stream.go new file mode 100644 index 0000000..7c9cc82 --- /dev/null +++ b/pkg/server/stream.go @@ -0,0 +1,193 @@ +package server + +import ( + "tunnel/pkg/server/module" + "tunnel/pkg/server/queue" + "tunnel/pkg/server/socket" + "strings" + "sort" + "fmt" + "log" +) + +type stream struct { + id string + args string + + in, out socket.S + m []module.M +} + +type streams map[string]*stream + +func (s *stream) String() string { + return fmt.Sprintf("stream(%s)", s.id) +} + +func (s *stream) Close() { + s.in.Close() + s.out.Close() +} + +func (s *stream) run() { + for { + if in, err := s.in.Open(); err != nil { + log.Println(s, err) + } else { + log.Printf("%s accept %s", s, in) + go s.run2(in) + } + } +} + +func (s *stream) watchChannel(rq, wq queue.Q, c socket.Channel) { + watch := func (q queue.Q, f func (q queue.Q) error) { + if err := f(q); err != nil { + log.Println(s, err) + } + } + + go func () { + watch(wq, c.Send) + close(wq) + }() + + go watch(rq, c.Recv) +} + +func (s *stream) watchPipe(rq, wq queue.Q, f func (rq, wq queue.Q) error) { + go func () { + if err := f(rq, wq); err != nil { + log.Println(s, err) + } + + close(wq) + }() +} + +func (s *stream) run2(in socket.Channel) { + out, err := s.out.Open() + if err != nil { + log.Println(s, err) + in.Close() + return + } + + rq, wq := queue.New(), queue.New() + + s.watchChannel(rq, wq, in) + + for _, m := range s.m { + send, recv := m.Open() + if send != nil { + q := queue.New() + s.watchPipe(wq, q, send) + wq = q + } + if recv != nil { + q := queue.New() + s.watchPipe(q, rq, recv) + rq = q + } + } + + s.watchChannel(wq, rq, out) +} + +func newStream(id string, args []string) (*stream, error) { + var in, out socket.S + var err error + + n := len(args) - 1 + + if in, err = socket.New(args[0]); err != nil { + return nil, err + } + + if out, err = socket.New(args[n]); err != nil { + in.Close() + return nil, err + } + + s := &stream{ + id: id, + args: strings.Join(args, " "), + in: in, + out: out, + } + + reverse := false + + for _, arg := range args[1:n] { + var m module.M + + if arg == "-" { + reverse = true + continue + } + + if arg == "+" { + reverse = false + continue + } + + if m, err = module.New(arg); err != nil { + s.Close() + return nil, err + } + + if reverse { + m = module.Reverse(m) + reverse = false + } + + s.m = append(s.m, m) + } + + if reverse { + s.Close() + return nil, fmt.Errorf("bad '-' usage") + } + + go s.run() + + return s, nil +} + +func streamAdd(r *request) { + if r.argc < 3 { + r.Fatal("not enough args") + } + + id := r.args[0] + if _, ok := r.c.s.streams[id]; ok { + r.Fatal("duplicate id") + } + + s, err := newStream(id, r.args[1:]) + if err != nil { + r.Fatal(err) + } + + r.c.s.streams[id] = s +} + +func streamShow(r *request) { + var keys []string + + for k := range r.c.s.streams { + keys = append(keys, k) + } + + sort.Strings(keys) + + for _, k := range keys { + s := r.c.s.streams[k] + r.Println(s.id, s.args) + } +} + +func init() { + newCmd(streamAdd, "add") + newCmd(streamShow, "show") +} -- cgit v1.2.3-70-g09d2