diff options
Diffstat (limited to 'pkg/server/socket/socket.go')
| -rw-r--r-- | pkg/server/socket/socket.go | 145 |
1 files changed, 21 insertions, 124 deletions
diff --git a/pkg/server/socket/socket.go b/pkg/server/socket/socket.go index 5ccf1bd..a945ce0 100644 --- a/pkg/server/socket/socket.go +++ b/pkg/server/socket/socket.go @@ -14,7 +14,7 @@ import ( var errAlreadyClosed = errors.New("already closed") -type exportChannel struct { +type exported struct { info string Channel } @@ -35,30 +35,22 @@ type listenSocket struct { listen net.Listener } -type dialSocket struct { - proto, addr string -} - -type connChannel struct { - conn net.Conn +type conn struct { + net.Conn once sync.Once } -type loopSocket struct{} - -type loopChannel struct { - q queue.Q -} - -func (c exportChannel) String() string { +func (c exported) String() string { return c.info } -func newConnChannel(conn net.Conn) Channel { - return &connChannel{conn: conn} +func newConn(cn net.Conn) Channel { + c := &conn{Conn: cn} + log.Println("open", c) + return c } -func (c *connChannel) final(f func() error, err error) error { +func (c *conn) final(f func() error, err error) error { if e := f(); e != nil { if e == errAlreadyClosed { return nil @@ -70,131 +62,32 @@ func (c *connChannel) final(f func() error, err error) error { return err } -func (c *connChannel) Send(wq queue.Q) error { - err := queue.IoCopy(c.conn, wq.Writer()) +func (c *conn) Send(wq queue.Q) error { + err := queue.IoCopy(c, wq.Writer()) return c.final(c.Close, err) } -func (c *connChannel) Recv(rq queue.Q) error { - err := queue.IoCopy(rq.Reader(), c.conn) +func (c *conn) Recv(rq queue.Q) error { + err := queue.IoCopy(rq.Reader(), c) return c.final(c.Close, err) } -func (c *connChannel) String() string { - local, remote := c.conn.LocalAddr(), c.conn.RemoteAddr() +func (c *conn) String() string { + local, remote := c.LocalAddr(), c.RemoteAddr() return fmt.Sprintf("%s/%s->%s", local.Network(), local, remote) } -func (c *connChannel) Close() error { +func (c *conn) Close() error { err := errAlreadyClosed c.once.Do(func() { log.Println("close", c) - err = c.conn.Close() + err = c.Conn.Close() }) return err } -func newListenSocket(proto, addr string) (S, error) { - if proto == "tcp" { - if !strings.Contains(addr, ":") { - addr = ":" + addr - } - } - - listen, err := net.Listen(proto, addr) - if err != nil { - return nil, err - } - - s := &listenSocket{ - proto: proto, - addr: addr, - listen: listen, - } - - return s, nil -} - -func (s *listenSocket) Open(env env.Env) (Channel, error) { - conn, err := s.listen.Accept() - if err != nil { - return nil, err - } - - addr := conn.RemoteAddr() - info := fmt.Sprintf("%s/%s", addr.Network(), addr) - - return exportChannel{info, newConnChannel(conn)}, nil -} - -func (s *listenSocket) String() string { - return fmt.Sprintf("%s/%s,listen", s.proto, s.addr) -} - -func (s *listenSocket) Close() { - s.listen.Close() -} - -func newDialSocket(proto, addr string) (S, error) { - switch proto { - case "tcp", "udp": - if !strings.Contains(addr, ":") { - addr = "localhost:" + addr - } - } - - return &dialSocket{proto: proto, addr: addr}, nil -} - -func (s *dialSocket) String() string { - return fmt.Sprintf("%s/%s", s.proto, s.addr) -} - -func (s *dialSocket) Open(env env.Env) (Channel, error) { - conn, err := net.Dial(s.proto, s.addr) - if err != nil { - return nil, err - } - return exportChannel{"-", newConnChannel(conn)}, nil -} - -func (s *dialSocket) Close() { -} - -func (c *loopChannel) Send(wq queue.Q) error { - return queue.Copy(c.q, wq) -} - -func (c *loopChannel) Recv(rq queue.Q) error { - defer close(c.q) - return queue.Copy(rq, c.q) -} - -func (c *loopChannel) Close() error { - return nil -} - -func (c *loopChannel) String() string { - return "loop" -} - -func (s *loopSocket) Open(env.Env) (Channel, error) { - return &loopChannel{queue.New()}, nil -} - -func (s *loopSocket) String() string { - return "loop" -} - -func (s *loopSocket) Close() { -} - -func newLoopSocket() (S, error) { - return &loopSocket{}, nil -} - func New(desc string, env env.Env) (S, error) { base, opts := opts.Parse(desc) args := strings.SplitN(base, "/", 2) @@ -224,5 +117,9 @@ func New(desc string, env env.Env) (S, error) { return newListenSocket(proto, addr) } + if _, ok := opts["auto"]; ok { + return newAutoSocket(proto, addr) + } + return newDialSocket(proto, addr) } |
