summaryrefslogtreecommitdiff
path: root/pkg/server/stream.go
diff options
context:
space:
mode:
authorMikhail Osipov <mike.osipov@gmail.com>2020-02-17 11:56:43 +0300
committerMikhail Osipov <mike.osipov@gmail.com>2020-02-19 23:51:31 +0300
commitbd5339bff8bf5f5e877e94dfef265a22570a69c7 (patch)
tree5902df7a9f21c00d9c414f6b0c2b79aadfd84752 /pkg/server/stream.go
parentdf935315c7201b7d42eb361b3ac3d36fe83e53e6 (diff)
first working version
Diffstat (limited to 'pkg/server/stream.go')
-rw-r--r--pkg/server/stream.go193
1 files changed, 193 insertions, 0 deletions
diff --git a/pkg/server/stream.go b/pkg/server/stream.go
new file mode 100644
index 0000000..7c9cc82
--- /dev/null
+++ b/pkg/server/stream.go
@@ -0,0 +1,193 @@
+package server
+
+import (
+ "tunnel/pkg/server/module"
+ "tunnel/pkg/server/queue"
+ "tunnel/pkg/server/socket"
+ "strings"
+ "sort"
+ "fmt"
+ "log"
+)
+
+type stream struct {
+ id string
+ args string
+
+ in, out socket.S
+ m []module.M
+}
+
+type streams map[string]*stream
+
+func (s *stream) String() string {
+ return fmt.Sprintf("stream(%s)", s.id)
+}
+
+func (s *stream) Close() {
+ s.in.Close()
+ s.out.Close()
+}
+
+func (s *stream) run() {
+ for {
+ if in, err := s.in.Open(); err != nil {
+ log.Println(s, err)
+ } else {
+ log.Printf("%s accept %s", s, in)
+ go s.run2(in)
+ }
+ }
+}
+
+func (s *stream) watchChannel(rq, wq queue.Q, c socket.Channel) {
+ watch := func (q queue.Q, f func (q queue.Q) error) {
+ if err := f(q); err != nil {
+ log.Println(s, err)
+ }
+ }
+
+ go func () {
+ watch(wq, c.Send)
+ close(wq)
+ }()
+
+ go watch(rq, c.Recv)
+}
+
+func (s *stream) watchPipe(rq, wq queue.Q, f func (rq, wq queue.Q) error) {
+ go func () {
+ if err := f(rq, wq); err != nil {
+ log.Println(s, err)
+ }
+
+ close(wq)
+ }()
+}
+
+func (s *stream) run2(in socket.Channel) {
+ out, err := s.out.Open()
+ if err != nil {
+ log.Println(s, err)
+ in.Close()
+ return
+ }
+
+ rq, wq := queue.New(), queue.New()
+
+ s.watchChannel(rq, wq, in)
+
+ for _, m := range s.m {
+ send, recv := m.Open()
+ if send != nil {
+ q := queue.New()
+ s.watchPipe(wq, q, send)
+ wq = q
+ }
+ if recv != nil {
+ q := queue.New()
+ s.watchPipe(q, rq, recv)
+ rq = q
+ }
+ }
+
+ s.watchChannel(wq, rq, out)
+}
+
+func newStream(id string, args []string) (*stream, error) {
+ var in, out socket.S
+ var err error
+
+ n := len(args) - 1
+
+ if in, err = socket.New(args[0]); err != nil {
+ return nil, err
+ }
+
+ if out, err = socket.New(args[n]); err != nil {
+ in.Close()
+ return nil, err
+ }
+
+ s := &stream{
+ id: id,
+ args: strings.Join(args, " "),
+ in: in,
+ out: out,
+ }
+
+ reverse := false
+
+ for _, arg := range args[1:n] {
+ var m module.M
+
+ if arg == "-" {
+ reverse = true
+ continue
+ }
+
+ if arg == "+" {
+ reverse = false
+ continue
+ }
+
+ if m, err = module.New(arg); err != nil {
+ s.Close()
+ return nil, err
+ }
+
+ if reverse {
+ m = module.Reverse(m)
+ reverse = false
+ }
+
+ s.m = append(s.m, m)
+ }
+
+ if reverse {
+ s.Close()
+ return nil, fmt.Errorf("bad '-' usage")
+ }
+
+ go s.run()
+
+ return s, nil
+}
+
+func streamAdd(r *request) {
+ if r.argc < 3 {
+ r.Fatal("not enough args")
+ }
+
+ id := r.args[0]
+ if _, ok := r.c.s.streams[id]; ok {
+ r.Fatal("duplicate id")
+ }
+
+ s, err := newStream(id, r.args[1:])
+ if err != nil {
+ r.Fatal(err)
+ }
+
+ r.c.s.streams[id] = s
+}
+
+func streamShow(r *request) {
+ var keys []string
+
+ for k := range r.c.s.streams {
+ keys = append(keys, k)
+ }
+
+ sort.Strings(keys)
+
+ for _, k := range keys {
+ s := r.c.s.streams[k]
+ r.Println(s.id, s.args)
+ }
+}
+
+func init() {
+ newCmd(streamAdd, "add")
+ newCmd(streamShow, "show")
+}