From bd5339bff8bf5f5e877e94dfef265a22570a69c7 Mon Sep 17 00:00:00 2001 From: Mikhail Osipov Date: Mon, 17 Feb 2020 11:56:43 +0300 Subject: first working version --- TODO | 26 +++++- pkg/server/cmds.go | 12 ++- pkg/server/echo.go | 8 +- pkg/server/env.go | 35 +++----- pkg/server/exit.go | 8 +- pkg/server/module/alpha.go | 32 ++++++++ pkg/server/module/hex.go | 27 +++++++ pkg/server/module/module.go | 48 +++++++++++ pkg/server/queue/queue.go | 60 ++++++++++++++ pkg/server/server.go | 30 ++++--- pkg/server/sleep.go | 8 +- pkg/server/socket/socket.go | 151 ++++++++++++++++++++++++++++++++++ pkg/server/status.go | 8 +- pkg/server/stream.go | 193 ++++++++++++++++++++++++++++++++++++++++++++ test/hello.sh | 4 +- tmp/automap.go | 103 +++++++++++++++++++++++ tmp/proto.go | 33 ++++++++ tmp/socket.go | 84 +++++++++++++++++++ 18 files changed, 808 insertions(+), 62 deletions(-) create mode 100644 pkg/server/module/alpha.go create mode 100644 pkg/server/module/hex.go create mode 100644 pkg/server/module/module.go create mode 100644 pkg/server/queue/queue.go create mode 100644 pkg/server/socket/socket.go create mode 100644 pkg/server/stream.go create mode 100644 tmp/automap.go create mode 100644 tmp/proto.go create mode 100644 tmp/socket.go diff --git a/TODO b/TODO index 823ada7..9eedc7c 100644 --- a/TODO +++ b/TODO @@ -3,9 +3,28 @@ 3. DONE add help command 4. DONE env set/get 5. add simple tcp proxy: - add tcp server host port - add tcp client addr port - add stream $id1 $id2 + in add tcp [host] port + out add tcp host port + + stream add in-tcp-1 out-tcp-1 + stream add in-tcp-2 mixer out-tcp-2 + stream add in-tcp-3 packer out-tcp-2 + + module add + + stream add tcp-listen/80 tcp/mikeos.ru:22 + stream add tcp-listen/80 hex tcp/mikeos.ru:22 + stream add tcp-listen/80 packer mixer tcp/mikeos.ru:22 + + stream add tcp-listen/80 >hex tcp/mikeos.ru:22 + stream add tcp-listen/80 0 { + n, err := cc.conn.Write(b) + if err != nil { + return err + } + b = b[n:] + } + } + + return nil +} + +func (cc *connChannel) String() string { + addr := cc.conn.RemoteAddr() + return fmt.Sprintf("%s/%s", addr.Network(), addr.String()) +} + +func (cc *connChannel) isCanceled() bool { + select { + case <- cc.cancel: + return true + default: + return false + } +} + +func (cc *connChannel) shutdown(err *error) { + select { + case <- cc.cancel: + *err = nil + default: + cc.once.Do(func () { + close(cc.cancel) + log.Println("close", cc) + if e := cc.conn.Close(); e != nil && *err != nil { + *err = e + } + }) + } +} + +func (cc *connChannel) Close() { + var err error + cc.shutdown(&err) +} + +func newListenSocket(proto, addr string) (S, error) { + if !strings.Contains(addr, ":") { + addr = ":" + addr + } + + listen, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + + return &listenSocket{listen: listen}, nil +} + +func (s *listenSocket) Open() (Channel, error) { + conn, err := s.listen.Accept() + if err != nil { + return nil, err + } + return newConnChannel(conn), nil +} + +func (s *listenSocket) Close() { + s.listen.Close() +} + +func newDialSocket(proto, addr string) (S, error) { + return &dialSocket{proto: proto, addr: addr}, nil +} + +func (s *dialSocket) Open() (Channel, error) { + conn, err := net.Dial(s.proto, s.addr) + if err != nil { + return nil, err + } + return newConnChannel(conn), nil +} + +func (s *dialSocket) Close() { +} + +func New(desc string) (S, error) { + args := strings.Split(desc, "/") + + if len(args) != 2 { + return nil, fmt.Errorf("bad socket '%s'", desc) + } + + proto, addr := args[0], args[1] + + switch proto { + case "tcp-listen": return newListenSocket("tcp", addr) + case "tcp": return newDialSocket("tcp", addr) + } + + return nil, fmt.Errorf("bad socket '%s': unknown type", desc) +} diff --git a/pkg/server/status.go b/pkg/server/status.go index aff3844..4689274 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -4,12 +4,12 @@ import ( "tunnel/pkg/config" ) -func init() { - newCmd(status, "status") -} - func status(r *request) { r.expect() r.Printf("since %s", r.c.s.since.Format(config.TimeFormat)) } + +func init() { + newCmd(status, "status") +} diff --git a/pkg/server/stream.go b/pkg/server/stream.go new file mode 100644 index 0000000..7c9cc82 --- /dev/null +++ b/pkg/server/stream.go @@ -0,0 +1,193 @@ +package server + +import ( + "tunnel/pkg/server/module" + "tunnel/pkg/server/queue" + "tunnel/pkg/server/socket" + "strings" + "sort" + "fmt" + "log" +) + +type stream struct { + id string + args string + + in, out socket.S + m []module.M +} + +type streams map[string]*stream + +func (s *stream) String() string { + return fmt.Sprintf("stream(%s)", s.id) +} + +func (s *stream) Close() { + s.in.Close() + s.out.Close() +} + +func (s *stream) run() { + for { + if in, err := s.in.Open(); err != nil { + log.Println(s, err) + } else { + log.Printf("%s accept %s", s, in) + go s.run2(in) + } + } +} + +func (s *stream) watchChannel(rq, wq queue.Q, c socket.Channel) { + watch := func (q queue.Q, f func (q queue.Q) error) { + if err := f(q); err != nil { + log.Println(s, err) + } + } + + go func () { + watch(wq, c.Send) + close(wq) + }() + + go watch(rq, c.Recv) +} + +func (s *stream) watchPipe(rq, wq queue.Q, f func (rq, wq queue.Q) error) { + go func () { + if err := f(rq, wq); err != nil { + log.Println(s, err) + } + + close(wq) + }() +} + +func (s *stream) run2(in socket.Channel) { + out, err := s.out.Open() + if err != nil { + log.Println(s, err) + in.Close() + return + } + + rq, wq := queue.New(), queue.New() + + s.watchChannel(rq, wq, in) + + for _, m := range s.m { + send, recv := m.Open() + if send != nil { + q := queue.New() + s.watchPipe(wq, q, send) + wq = q + } + if recv != nil { + q := queue.New() + s.watchPipe(q, rq, recv) + rq = q + } + } + + s.watchChannel(wq, rq, out) +} + +func newStream(id string, args []string) (*stream, error) { + var in, out socket.S + var err error + + n := len(args) - 1 + + if in, err = socket.New(args[0]); err != nil { + return nil, err + } + + if out, err = socket.New(args[n]); err != nil { + in.Close() + return nil, err + } + + s := &stream{ + id: id, + args: strings.Join(args, " "), + in: in, + out: out, + } + + reverse := false + + for _, arg := range args[1:n] { + var m module.M + + if arg == "-" { + reverse = true + continue + } + + if arg == "+" { + reverse = false + continue + } + + if m, err = module.New(arg); err != nil { + s.Close() + return nil, err + } + + if reverse { + m = module.Reverse(m) + reverse = false + } + + s.m = append(s.m, m) + } + + if reverse { + s.Close() + return nil, fmt.Errorf("bad '-' usage") + } + + go s.run() + + return s, nil +} + +func streamAdd(r *request) { + if r.argc < 3 { + r.Fatal("not enough args") + } + + id := r.args[0] + if _, ok := r.c.s.streams[id]; ok { + r.Fatal("duplicate id") + } + + s, err := newStream(id, r.args[1:]) + if err != nil { + r.Fatal(err) + } + + r.c.s.streams[id] = s +} + +func streamShow(r *request) { + var keys []string + + for k := range r.c.s.streams { + keys = append(keys, k) + } + + sort.Strings(keys) + + for _, k := range keys { + s := r.c.s.streams[k] + r.Println(s.id, s.args) + } +} + +func init() { + newCmd(streamAdd, "add") + newCmd(streamShow, "show") +} diff --git a/test/hello.sh b/test/hello.sh index 9893089..81dcec8 100755 --- a/test/hello.sh +++ b/test/hello.sh @@ -5,7 +5,7 @@ PATH=$ROOT/cmd/tunnel tunnel var clear tunnel var set cmd echo -tunnel var set args ^"%x, %y!" +tunnel var set args ^"@x, @y!" tunnel var set x Hello tunnel var set y World -tunnel %cmd %args +tunnel @cmd @args diff --git a/tmp/automap.go b/tmp/automap.go new file mode 100644 index 0000000..187960c --- /dev/null +++ b/tmp/automap.go @@ -0,0 +1,103 @@ +package server + +import ( + "strconv" + "sort" + "fmt" + "io" +) + +type autokey struct { + flag bool + n int + v string +} + +type automap map[string]interface{} + +type autokeys []autokey + +func (t autokey) String() string { + return t.v +} + +func (t autokeys) Len() int { + return len(t) +} + +func (t autokeys) Less(i, j int) bool { + a := &t[i] + b := &t[j] + + if a.flag && b.flag { + return a.n < b.n + } + + if !a.flag && !b.flag { + return a.v < b.v + } + + return b.flag +} + +func (t autokeys) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} + +func (m automap) add(v interface{}) { + for n := 1;; n++ { + k := fmt.Sprint(n) + if _, ok := m[k]; !ok { + m[k] = v + break + } + } +} + +func (m automap) del(k string) bool { + if _, ok := m[k]; !ok { + return false + } + + delete(m, k) + + return true +} + +func (m automap) rename(old string, new string) bool { + if _, ok := m[old]; !ok { + return false + } + + if _, ok := m[new]; ok { + return false + } + + m[new] = m[old] + + delete(m, old) + + return true +} + +func (m automap) show(w io.Writer) { + var keys autokeys + + for k, _ := range m { + t := autokey{v: k} + + n, err := strconv.Atoi(k) + if err == nil { + t.flag = true + t.n = n + } + + keys = append(keys, t) + } + + sort.Sort(keys) + + for _, k := range keys { + fmt.Fprintln(w, k, m[k.v]) + } +} diff --git a/tmp/proto.go b/tmp/proto.go new file mode 100644 index 0000000..104f1fa --- /dev/null +++ b/tmp/proto.go @@ -0,0 +1,33 @@ +package server + +type proto interface { + Open() (proto, error) + Close() error + String() string +} + +func protoShow(r *request) { + r.c.s.proto.show(r.out) +} + +func protoDel(r *request) { + r.expect(1) + + if !r.c.s.proto.del(r.args[0]) { + r.Fatal("no such proto") + } +} + +func protoRename(r *request) { + r.expect(2) + + if !r.c.s.proto.rename(r.args[0], r.args[1]) { + r.Fatal("rename failed") + } +} + +func init() { + newCmd(protoDel, "proto", "del") + newCmd(protoShow, "proto", "show") + newCmd(protoRename, "proto", "rename") +} diff --git a/tmp/socket.go b/tmp/socket.go new file mode 100644 index 0000000..febd650 --- /dev/null +++ b/tmp/socket.go @@ -0,0 +1,84 @@ +package server + +import ( + "strconv" + "fmt" +) + +const MAXPORT = 1 << 16 - 1 + +type tcpListen struct { + addr string +} + +type tcpOut struct { + addr string +} + +func (t *tcpListen) Open() (proto, error) { + return nil, errNotImplemented +} + +func (t *tcpListen) Close() error { + return nil +} + +func (t *tcpListen) String() string { + return fmt.Sprintf("tcp <- %s", t.addr) +} + +func (t *tcpOut) Open() (proto, error) { + return nil, errNotImplemented +} + +func (t *tcpOut) Close() error { + return nil +} + +func (t *tcpOut) String() string { + return fmt.Sprintf("tcp -> %s", t.addr) +} + +func parsePort(r *request, s string) int { + port, err := strconv.Atoi(s) + if err != nil { + r.Fatalf("bad port value: %v", err) + } + if port <= 0 || port > MAXPORT { + r.Fatal("bad port value") + } + return port +} + +func addTcpListen(r *request) { + r.expect(1, 2) + + var host string + var n int + + if r.argc > 1 { + host = r.args[n] + n++ + } + + port := parsePort(r, r.args[n]) + t := &tcpListen{addr: fmt.Sprintf("%s:%d", host, port)} + + r.c.s.proto.add(t) +} + +func addTcpOut(r *request) { + r.expect(2) + + host := r.args[0] + port := parsePort(r, r.args[1]) + + t := &tcpOut{addr: fmt.Sprintf("%s:%d", host, port)} + + r.c.s.proto.add(t) +} + +func init() { + newCmd(addTcpListen, "add", "tcp", "listen") + newCmd(addTcpOut, "add", "tcp", "out") +} -- cgit v1.2.3-70-g09d2