diff options
Diffstat (limited to 'pkg/server/socket')
| -rw-r--r-- | pkg/server/socket/auto.go | 80 | ||||
| -rw-r--r-- | pkg/server/socket/dial.go | 42 | ||||
| -rw-r--r-- | pkg/server/socket/listen.go | 49 | ||||
| -rw-r--r-- | pkg/server/socket/loop.go | 46 | ||||
| -rw-r--r-- | pkg/server/socket/socket.go | 145 |
5 files changed, 238 insertions, 124 deletions
diff --git a/pkg/server/socket/auto.go b/pkg/server/socket/auto.go new file mode 100644 index 0000000..97bc625 --- /dev/null +++ b/pkg/server/socket/auto.go @@ -0,0 +1,80 @@ +package socket + +import ( + "tunnel/pkg/server/env" + "tunnel/pkg/server/queue" +) + +type autoSocket struct { + S +} + +type autoChannel struct { + s *autoSocket + c chan Channel + e env.Env +} + +func newAutoSocket(proto, addr string) (S, error) { + s, err := newDialSocket(proto, addr) + if err != nil { + return s, err + } + + return &autoSocket{s}, nil +} + +func (s *autoSocket) Open(env env.Env) (Channel, error) { + c := &autoChannel{ + s: s, + c: make(chan Channel), + e: env, + } + + return c, nil +} + +func (c *autoChannel) String() string { + return "auto" +} + +func (c *autoChannel) Send(wq queue.Q) error { + if x := <-c.c; x == nil { + return nil + } else { + return x.Send(wq) + } +} + +func (c *autoChannel) Recv(rq queue.Q) error { + b := <-rq + if b == nil { + close(c.c) + return nil + } + + x, err := c.s.S.Open(c.e) + if err != nil { + close(c.c) + return err + } + + c.c <- x + + q := queue.New() + + go func() { + q <- b + queue.Copy(rq, q) + close(q) + }() + + defer q.Dry() + + return x.Recv(q) +} + +/* TODO */ +func (c *autoChannel) Close() error { + return nil +} diff --git a/pkg/server/socket/dial.go b/pkg/server/socket/dial.go new file mode 100644 index 0000000..818fbc6 --- /dev/null +++ b/pkg/server/socket/dial.go @@ -0,0 +1,42 @@ +package socket + +import ( + "fmt" + "net" + "strings" + "tunnel/pkg/server/env" +) + +type dialSocket struct { + proto, addr string +} + +func newDialSocket(proto, addr string) (S, error) { + switch proto { + case "tcp", "udp": + if !strings.Contains(addr, ":") { + addr = "localhost:" + addr + } + } + + return &dialSocket{proto: proto, addr: addr}, nil +} + +func (s *dialSocket) String() string { + return fmt.Sprintf("%s/%s", s.proto, s.addr) +} + +func (s *dialSocket) Open(env.Env) (Channel, error) { + conn, err := net.Dial(s.proto, s.addr) + if err != nil { + return nil, err + } + + addr := conn.RemoteAddr() + info := fmt.Sprintf(">%s/%s", addr.Network(), addr) + + return exported{info, newConn(conn)}, nil +} + +func (s *dialSocket) Close() { +} diff --git a/pkg/server/socket/listen.go b/pkg/server/socket/listen.go new file mode 100644 index 0000000..c328945 --- /dev/null +++ b/pkg/server/socket/listen.go @@ -0,0 +1,49 @@ +package socket + +import ( + "fmt" + "net" + "strings" + "tunnel/pkg/server/env" +) + +func newListenSocket(proto, addr string) (S, error) { + if proto == "tcp" { + if !strings.Contains(addr, ":") { + addr = ":" + addr + } + } + + listen, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + + s := &listenSocket{ + proto: proto, + addr: addr, + listen: listen, + } + + return s, nil +} + +func (s *listenSocket) Open(env.Env) (Channel, error) { + conn, err := s.listen.Accept() + if err != nil { + return nil, err + } + + addr := conn.RemoteAddr() + info := fmt.Sprintf("<%s/%s", addr.Network(), addr) + + return exported{info, newConn(conn)}, nil +} + +func (s *listenSocket) String() string { + return fmt.Sprintf("%s/%s,listen", s.proto, s.addr) +} + +func (s *listenSocket) Close() { + s.listen.Close() +} diff --git a/pkg/server/socket/loop.go b/pkg/server/socket/loop.go new file mode 100644 index 0000000..88e9491 --- /dev/null +++ b/pkg/server/socket/loop.go @@ -0,0 +1,46 @@ +package socket + +import ( + "tunnel/pkg/server/env" + "tunnel/pkg/server/queue" +) + +type loopSocket struct{} + +type loopChannel struct { + c chan queue.Q + q chan error +} + +func (c *loopChannel) Send(wq queue.Q) error { + c.c <- wq + return <-c.q +} + +func (c *loopChannel) Recv(rq queue.Q) error { + defer close(c.q) + return queue.Copy(rq, <-c.c) +} + +func (c *loopChannel) String() string { + return "loop" +} + +func (c *loopChannel) Close() error { + return nil +} + +func (s *loopSocket) Open(env.Env) (Channel, error) { + return &loopChannel{make(chan queue.Q), make(chan error)}, nil +} + +func (s *loopSocket) String() string { + return "loop" +} + +func (s *loopSocket) Close() { +} + +func newLoopSocket() (S, error) { + return &loopSocket{}, nil +} diff --git a/pkg/server/socket/socket.go b/pkg/server/socket/socket.go index 5ccf1bd..a945ce0 100644 --- a/pkg/server/socket/socket.go +++ b/pkg/server/socket/socket.go @@ -14,7 +14,7 @@ import ( var errAlreadyClosed = errors.New("already closed") -type exportChannel struct { +type exported struct { info string Channel } @@ -35,30 +35,22 @@ type listenSocket struct { listen net.Listener } -type dialSocket struct { - proto, addr string -} - -type connChannel struct { - conn net.Conn +type conn struct { + net.Conn once sync.Once } -type loopSocket struct{} - -type loopChannel struct { - q queue.Q -} - -func (c exportChannel) String() string { +func (c exported) String() string { return c.info } -func newConnChannel(conn net.Conn) Channel { - return &connChannel{conn: conn} +func newConn(cn net.Conn) Channel { + c := &conn{Conn: cn} + log.Println("open", c) + return c } -func (c *connChannel) final(f func() error, err error) error { +func (c *conn) final(f func() error, err error) error { if e := f(); e != nil { if e == errAlreadyClosed { return nil @@ -70,131 +62,32 @@ func (c *connChannel) final(f func() error, err error) error { return err } -func (c *connChannel) Send(wq queue.Q) error { - err := queue.IoCopy(c.conn, wq.Writer()) +func (c *conn) Send(wq queue.Q) error { + err := queue.IoCopy(c, wq.Writer()) return c.final(c.Close, err) } -func (c *connChannel) Recv(rq queue.Q) error { - err := queue.IoCopy(rq.Reader(), c.conn) +func (c *conn) Recv(rq queue.Q) error { + err := queue.IoCopy(rq.Reader(), c) return c.final(c.Close, err) } -func (c *connChannel) String() string { - local, remote := c.conn.LocalAddr(), c.conn.RemoteAddr() +func (c *conn) String() string { + local, remote := c.LocalAddr(), c.RemoteAddr() return fmt.Sprintf("%s/%s->%s", local.Network(), local, remote) } -func (c *connChannel) Close() error { +func (c *conn) Close() error { err := errAlreadyClosed c.once.Do(func() { log.Println("close", c) - err = c.conn.Close() + err = c.Conn.Close() }) return err } -func newListenSocket(proto, addr string) (S, error) { - if proto == "tcp" { - if !strings.Contains(addr, ":") { - addr = ":" + addr - } - } - - listen, err := net.Listen(proto, addr) - if err != nil { - return nil, err - } - - s := &listenSocket{ - proto: proto, - addr: addr, - listen: listen, - } - - return s, nil -} - -func (s *listenSocket) Open(env env.Env) (Channel, error) { - conn, err := s.listen.Accept() - if err != nil { - return nil, err - } - - addr := conn.RemoteAddr() - info := fmt.Sprintf("%s/%s", addr.Network(), addr) - - return exportChannel{info, newConnChannel(conn)}, nil -} - -func (s *listenSocket) String() string { - return fmt.Sprintf("%s/%s,listen", s.proto, s.addr) -} - -func (s *listenSocket) Close() { - s.listen.Close() -} - -func newDialSocket(proto, addr string) (S, error) { - switch proto { - case "tcp", "udp": - if !strings.Contains(addr, ":") { - addr = "localhost:" + addr - } - } - - return &dialSocket{proto: proto, addr: addr}, nil -} - -func (s *dialSocket) String() string { - return fmt.Sprintf("%s/%s", s.proto, s.addr) -} - -func (s *dialSocket) Open(env env.Env) (Channel, error) { - conn, err := net.Dial(s.proto, s.addr) - if err != nil { - return nil, err - } - return exportChannel{"-", newConnChannel(conn)}, nil -} - -func (s *dialSocket) Close() { -} - -func (c *loopChannel) Send(wq queue.Q) error { - return queue.Copy(c.q, wq) -} - -func (c *loopChannel) Recv(rq queue.Q) error { - defer close(c.q) - return queue.Copy(rq, c.q) -} - -func (c *loopChannel) Close() error { - return nil -} - -func (c *loopChannel) String() string { - return "loop" -} - -func (s *loopSocket) Open(env.Env) (Channel, error) { - return &loopChannel{queue.New()}, nil -} - -func (s *loopSocket) String() string { - return "loop" -} - -func (s *loopSocket) Close() { -} - -func newLoopSocket() (S, error) { - return &loopSocket{}, nil -} - func New(desc string, env env.Env) (S, error) { base, opts := opts.Parse(desc) args := strings.SplitN(base, "/", 2) @@ -224,5 +117,9 @@ func New(desc string, env env.Env) (S, error) { return newListenSocket(proto, addr) } + if _, ok := opts["auto"]; ok { + return newAutoSocket(proto, addr) + } + return newDialSocket(proto, addr) } |
