summaryrefslogtreecommitdiff
path: root/pkg/server/socket
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/socket')
-rw-r--r--pkg/server/socket/auto.go80
-rw-r--r--pkg/server/socket/dial.go42
-rw-r--r--pkg/server/socket/listen.go49
-rw-r--r--pkg/server/socket/loop.go46
-rw-r--r--pkg/server/socket/socket.go145
5 files changed, 238 insertions, 124 deletions
diff --git a/pkg/server/socket/auto.go b/pkg/server/socket/auto.go
new file mode 100644
index 0000000..97bc625
--- /dev/null
+++ b/pkg/server/socket/auto.go
@@ -0,0 +1,80 @@
+package socket
+
+import (
+ "tunnel/pkg/server/env"
+ "tunnel/pkg/server/queue"
+)
+
+type autoSocket struct {
+ S
+}
+
+type autoChannel struct {
+ s *autoSocket
+ c chan Channel
+ e env.Env
+}
+
+func newAutoSocket(proto, addr string) (S, error) {
+ s, err := newDialSocket(proto, addr)
+ if err != nil {
+ return s, err
+ }
+
+ return &autoSocket{s}, nil
+}
+
+func (s *autoSocket) Open(env env.Env) (Channel, error) {
+ c := &autoChannel{
+ s: s,
+ c: make(chan Channel),
+ e: env,
+ }
+
+ return c, nil
+}
+
+func (c *autoChannel) String() string {
+ return "auto"
+}
+
+func (c *autoChannel) Send(wq queue.Q) error {
+ if x := <-c.c; x == nil {
+ return nil
+ } else {
+ return x.Send(wq)
+ }
+}
+
+func (c *autoChannel) Recv(rq queue.Q) error {
+ b := <-rq
+ if b == nil {
+ close(c.c)
+ return nil
+ }
+
+ x, err := c.s.S.Open(c.e)
+ if err != nil {
+ close(c.c)
+ return err
+ }
+
+ c.c <- x
+
+ q := queue.New()
+
+ go func() {
+ q <- b
+ queue.Copy(rq, q)
+ close(q)
+ }()
+
+ defer q.Dry()
+
+ return x.Recv(q)
+}
+
+/* TODO */
+func (c *autoChannel) Close() error {
+ return nil
+}
diff --git a/pkg/server/socket/dial.go b/pkg/server/socket/dial.go
new file mode 100644
index 0000000..818fbc6
--- /dev/null
+++ b/pkg/server/socket/dial.go
@@ -0,0 +1,42 @@
+package socket
+
+import (
+ "fmt"
+ "net"
+ "strings"
+ "tunnel/pkg/server/env"
+)
+
+type dialSocket struct {
+ proto, addr string
+}
+
+func newDialSocket(proto, addr string) (S, error) {
+ switch proto {
+ case "tcp", "udp":
+ if !strings.Contains(addr, ":") {
+ addr = "localhost:" + addr
+ }
+ }
+
+ return &dialSocket{proto: proto, addr: addr}, nil
+}
+
+func (s *dialSocket) String() string {
+ return fmt.Sprintf("%s/%s", s.proto, s.addr)
+}
+
+func (s *dialSocket) Open(env.Env) (Channel, error) {
+ conn, err := net.Dial(s.proto, s.addr)
+ if err != nil {
+ return nil, err
+ }
+
+ addr := conn.RemoteAddr()
+ info := fmt.Sprintf(">%s/%s", addr.Network(), addr)
+
+ return exported{info, newConn(conn)}, nil
+}
+
+func (s *dialSocket) Close() {
+}
diff --git a/pkg/server/socket/listen.go b/pkg/server/socket/listen.go
new file mode 100644
index 0000000..c328945
--- /dev/null
+++ b/pkg/server/socket/listen.go
@@ -0,0 +1,49 @@
+package socket
+
+import (
+ "fmt"
+ "net"
+ "strings"
+ "tunnel/pkg/server/env"
+)
+
+func newListenSocket(proto, addr string) (S, error) {
+ if proto == "tcp" {
+ if !strings.Contains(addr, ":") {
+ addr = ":" + addr
+ }
+ }
+
+ listen, err := net.Listen(proto, addr)
+ if err != nil {
+ return nil, err
+ }
+
+ s := &listenSocket{
+ proto: proto,
+ addr: addr,
+ listen: listen,
+ }
+
+ return s, nil
+}
+
+func (s *listenSocket) Open(env.Env) (Channel, error) {
+ conn, err := s.listen.Accept()
+ if err != nil {
+ return nil, err
+ }
+
+ addr := conn.RemoteAddr()
+ info := fmt.Sprintf("<%s/%s", addr.Network(), addr)
+
+ return exported{info, newConn(conn)}, nil
+}
+
+func (s *listenSocket) String() string {
+ return fmt.Sprintf("%s/%s,listen", s.proto, s.addr)
+}
+
+func (s *listenSocket) Close() {
+ s.listen.Close()
+}
diff --git a/pkg/server/socket/loop.go b/pkg/server/socket/loop.go
new file mode 100644
index 0000000..88e9491
--- /dev/null
+++ b/pkg/server/socket/loop.go
@@ -0,0 +1,46 @@
+package socket
+
+import (
+ "tunnel/pkg/server/env"
+ "tunnel/pkg/server/queue"
+)
+
+type loopSocket struct{}
+
+type loopChannel struct {
+ c chan queue.Q
+ q chan error
+}
+
+func (c *loopChannel) Send(wq queue.Q) error {
+ c.c <- wq
+ return <-c.q
+}
+
+func (c *loopChannel) Recv(rq queue.Q) error {
+ defer close(c.q)
+ return queue.Copy(rq, <-c.c)
+}
+
+func (c *loopChannel) String() string {
+ return "loop"
+}
+
+func (c *loopChannel) Close() error {
+ return nil
+}
+
+func (s *loopSocket) Open(env.Env) (Channel, error) {
+ return &loopChannel{make(chan queue.Q), make(chan error)}, nil
+}
+
+func (s *loopSocket) String() string {
+ return "loop"
+}
+
+func (s *loopSocket) Close() {
+}
+
+func newLoopSocket() (S, error) {
+ return &loopSocket{}, nil
+}
diff --git a/pkg/server/socket/socket.go b/pkg/server/socket/socket.go
index 5ccf1bd..a945ce0 100644
--- a/pkg/server/socket/socket.go
+++ b/pkg/server/socket/socket.go
@@ -14,7 +14,7 @@ import (
var errAlreadyClosed = errors.New("already closed")
-type exportChannel struct {
+type exported struct {
info string
Channel
}
@@ -35,30 +35,22 @@ type listenSocket struct {
listen net.Listener
}
-type dialSocket struct {
- proto, addr string
-}
-
-type connChannel struct {
- conn net.Conn
+type conn struct {
+ net.Conn
once sync.Once
}
-type loopSocket struct{}
-
-type loopChannel struct {
- q queue.Q
-}
-
-func (c exportChannel) String() string {
+func (c exported) String() string {
return c.info
}
-func newConnChannel(conn net.Conn) Channel {
- return &connChannel{conn: conn}
+func newConn(cn net.Conn) Channel {
+ c := &conn{Conn: cn}
+ log.Println("open", c)
+ return c
}
-func (c *connChannel) final(f func() error, err error) error {
+func (c *conn) final(f func() error, err error) error {
if e := f(); e != nil {
if e == errAlreadyClosed {
return nil
@@ -70,131 +62,32 @@ func (c *connChannel) final(f func() error, err error) error {
return err
}
-func (c *connChannel) Send(wq queue.Q) error {
- err := queue.IoCopy(c.conn, wq.Writer())
+func (c *conn) Send(wq queue.Q) error {
+ err := queue.IoCopy(c, wq.Writer())
return c.final(c.Close, err)
}
-func (c *connChannel) Recv(rq queue.Q) error {
- err := queue.IoCopy(rq.Reader(), c.conn)
+func (c *conn) Recv(rq queue.Q) error {
+ err := queue.IoCopy(rq.Reader(), c)
return c.final(c.Close, err)
}
-func (c *connChannel) String() string {
- local, remote := c.conn.LocalAddr(), c.conn.RemoteAddr()
+func (c *conn) String() string {
+ local, remote := c.LocalAddr(), c.RemoteAddr()
return fmt.Sprintf("%s/%s->%s", local.Network(), local, remote)
}
-func (c *connChannel) Close() error {
+func (c *conn) Close() error {
err := errAlreadyClosed
c.once.Do(func() {
log.Println("close", c)
- err = c.conn.Close()
+ err = c.Conn.Close()
})
return err
}
-func newListenSocket(proto, addr string) (S, error) {
- if proto == "tcp" {
- if !strings.Contains(addr, ":") {
- addr = ":" + addr
- }
- }
-
- listen, err := net.Listen(proto, addr)
- if err != nil {
- return nil, err
- }
-
- s := &listenSocket{
- proto: proto,
- addr: addr,
- listen: listen,
- }
-
- return s, nil
-}
-
-func (s *listenSocket) Open(env env.Env) (Channel, error) {
- conn, err := s.listen.Accept()
- if err != nil {
- return nil, err
- }
-
- addr := conn.RemoteAddr()
- info := fmt.Sprintf("%s/%s", addr.Network(), addr)
-
- return exportChannel{info, newConnChannel(conn)}, nil
-}
-
-func (s *listenSocket) String() string {
- return fmt.Sprintf("%s/%s,listen", s.proto, s.addr)
-}
-
-func (s *listenSocket) Close() {
- s.listen.Close()
-}
-
-func newDialSocket(proto, addr string) (S, error) {
- switch proto {
- case "tcp", "udp":
- if !strings.Contains(addr, ":") {
- addr = "localhost:" + addr
- }
- }
-
- return &dialSocket{proto: proto, addr: addr}, nil
-}
-
-func (s *dialSocket) String() string {
- return fmt.Sprintf("%s/%s", s.proto, s.addr)
-}
-
-func (s *dialSocket) Open(env env.Env) (Channel, error) {
- conn, err := net.Dial(s.proto, s.addr)
- if err != nil {
- return nil, err
- }
- return exportChannel{"-", newConnChannel(conn)}, nil
-}
-
-func (s *dialSocket) Close() {
-}
-
-func (c *loopChannel) Send(wq queue.Q) error {
- return queue.Copy(c.q, wq)
-}
-
-func (c *loopChannel) Recv(rq queue.Q) error {
- defer close(c.q)
- return queue.Copy(rq, c.q)
-}
-
-func (c *loopChannel) Close() error {
- return nil
-}
-
-func (c *loopChannel) String() string {
- return "loop"
-}
-
-func (s *loopSocket) Open(env.Env) (Channel, error) {
- return &loopChannel{queue.New()}, nil
-}
-
-func (s *loopSocket) String() string {
- return "loop"
-}
-
-func (s *loopSocket) Close() {
-}
-
-func newLoopSocket() (S, error) {
- return &loopSocket{}, nil
-}
-
func New(desc string, env env.Env) (S, error) {
base, opts := opts.Parse(desc)
args := strings.SplitN(base, "/", 2)
@@ -224,5 +117,9 @@ func New(desc string, env env.Env) (S, error) {
return newListenSocket(proto, addr)
}
+ if _, ok := opts["auto"]; ok {
+ return newAutoSocket(proto, addr)
+ }
+
return newDialSocket(proto, addr)
}