summaryrefslogtreecommitdiff
path: root/pkg/server/socket/defer.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/socket/defer.go')
-rw-r--r--pkg/server/socket/defer.go66
1 files changed, 35 insertions, 31 deletions
diff --git a/pkg/server/socket/defer.go b/pkg/server/socket/defer.go
index d63d51e..3eb2dfd 100644
--- a/pkg/server/socket/defer.go
+++ b/pkg/server/socket/defer.go
@@ -1,6 +1,8 @@
package socket
import (
+ "context"
+
"tunnel/pkg/server/env"
"tunnel/pkg/server/queue"
)
@@ -10,17 +12,26 @@ type deferSocket struct {
}
type deferConn struct {
- sock S
- wait chan bool
- env env.Env
- conn Conn
+ s *deferSocket
+ e env.Env
+
+ ctx context.Context
+ cancel func()
+
+ conn chan *conn
}
func (s *deferSocket) New(env env.Env) (Conn, error) {
+ ctx, cancel := context.WithCancel(context.TODO())
+
c := &deferConn{
- sock: &s.dialSocket,
- wait: make(chan bool),
- env: env,
+ s: s,
+ e: env,
+
+ conn: make(chan *conn),
+
+ ctx: ctx,
+ cancel: cancel,
}
return c, nil
@@ -31,48 +42,41 @@ func (c *deferConn) String() string {
}
func (c *deferConn) Send(wq queue.Q) error {
- if !<-c.wait {
+ conn := <-c.conn
+ if conn == nil {
return nil
- } else {
- return c.conn.Send(wq)
}
+ return conn.Send(wq)
}
func (c *deferConn) Recv(rq queue.Q) error {
- b := <-rq
- if b == nil {
- c.wait <- false
+ // TODO: and context check
+ r := rq.Reader()
+
+ if _, err := r.Read(nil); err != nil {
+ c.conn <- nil
return nil
}
- conn, err := c.sock.New(c.env)
+ conn, err := dial(c.ctx, c.e, c.s.Proto, c.s.Addr)
if err != nil {
- c.wait <- false
+ c.conn <- nil
return err
}
- c.conn = conn
- c.wait <- true
-
- q := queue.New()
-
go func() {
- q <- b
- queue.Copy(rq, q)
- close(q)
+ <-c.ctx.Done()
+ conn.Close()
}()
- defer q.Dry()
+ c.conn <- conn
- return c.conn.Recv(q)
+ return queue.IoCopy(r, conn)
}
-func (c *deferConn) Close() (err error) {
- if c.conn != nil {
- err = c.conn.Close()
- }
-
- return
+func (c *deferConn) Close() error {
+ c.cancel()
+ return nil
}
func init() {