summaryrefslogtreecommitdiff
path: root/pkg/server/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/stream.go')
-rw-r--r--pkg/server/stream.go193
1 files changed, 0 insertions, 193 deletions
diff --git a/pkg/server/stream.go b/pkg/server/stream.go
deleted file mode 100644
index 7c9cc82..0000000
--- a/pkg/server/stream.go
+++ /dev/null
@@ -1,193 +0,0 @@
-package server
-
-import (
- "tunnel/pkg/server/module"
- "tunnel/pkg/server/queue"
- "tunnel/pkg/server/socket"
- "strings"
- "sort"
- "fmt"
- "log"
-)
-
-type stream struct {
- id string
- args string
-
- in, out socket.S
- m []module.M
-}
-
-type streams map[string]*stream
-
-func (s *stream) String() string {
- return fmt.Sprintf("stream(%s)", s.id)
-}
-
-func (s *stream) Close() {
- s.in.Close()
- s.out.Close()
-}
-
-func (s *stream) run() {
- for {
- if in, err := s.in.Open(); err != nil {
- log.Println(s, err)
- } else {
- log.Printf("%s accept %s", s, in)
- go s.run2(in)
- }
- }
-}
-
-func (s *stream) watchChannel(rq, wq queue.Q, c socket.Channel) {
- watch := func (q queue.Q, f func (q queue.Q) error) {
- if err := f(q); err != nil {
- log.Println(s, err)
- }
- }
-
- go func () {
- watch(wq, c.Send)
- close(wq)
- }()
-
- go watch(rq, c.Recv)
-}
-
-func (s *stream) watchPipe(rq, wq queue.Q, f func (rq, wq queue.Q) error) {
- go func () {
- if err := f(rq, wq); err != nil {
- log.Println(s, err)
- }
-
- close(wq)
- }()
-}
-
-func (s *stream) run2(in socket.Channel) {
- out, err := s.out.Open()
- if err != nil {
- log.Println(s, err)
- in.Close()
- return
- }
-
- rq, wq := queue.New(), queue.New()
-
- s.watchChannel(rq, wq, in)
-
- for _, m := range s.m {
- send, recv := m.Open()
- if send != nil {
- q := queue.New()
- s.watchPipe(wq, q, send)
- wq = q
- }
- if recv != nil {
- q := queue.New()
- s.watchPipe(q, rq, recv)
- rq = q
- }
- }
-
- s.watchChannel(wq, rq, out)
-}
-
-func newStream(id string, args []string) (*stream, error) {
- var in, out socket.S
- var err error
-
- n := len(args) - 1
-
- if in, err = socket.New(args[0]); err != nil {
- return nil, err
- }
-
- if out, err = socket.New(args[n]); err != nil {
- in.Close()
- return nil, err
- }
-
- s := &stream{
- id: id,
- args: strings.Join(args, " "),
- in: in,
- out: out,
- }
-
- reverse := false
-
- for _, arg := range args[1:n] {
- var m module.M
-
- if arg == "-" {
- reverse = true
- continue
- }
-
- if arg == "+" {
- reverse = false
- continue
- }
-
- if m, err = module.New(arg); err != nil {
- s.Close()
- return nil, err
- }
-
- if reverse {
- m = module.Reverse(m)
- reverse = false
- }
-
- s.m = append(s.m, m)
- }
-
- if reverse {
- s.Close()
- return nil, fmt.Errorf("bad '-' usage")
- }
-
- go s.run()
-
- return s, nil
-}
-
-func streamAdd(r *request) {
- if r.argc < 3 {
- r.Fatal("not enough args")
- }
-
- id := r.args[0]
- if _, ok := r.c.s.streams[id]; ok {
- r.Fatal("duplicate id")
- }
-
- s, err := newStream(id, r.args[1:])
- if err != nil {
- r.Fatal(err)
- }
-
- r.c.s.streams[id] = s
-}
-
-func streamShow(r *request) {
- var keys []string
-
- for k := range r.c.s.streams {
- keys = append(keys, k)
- }
-
- sort.Strings(keys)
-
- for _, k := range keys {
- s := r.c.s.streams[k]
- r.Println(s.id, s.args)
- }
-}
-
-func init() {
- newCmd(streamAdd, "add")
- newCmd(streamShow, "show")
-}