summaryrefslogtreecommitdiff
path: root/pkg/server/socket
diff options
context:
space:
mode:
authorMikhail Osipov <mike.osipov@gmail.com>2020-03-05 02:11:12 +0300
committerMikhail Osipov <mike.osipov@gmail.com>2020-03-05 02:11:12 +0300
commit878fb83abd4003b50896b3dd3f9fcf242440991b (patch)
tree9479bd46b1d165fd0f687f6f658b0cb9b1773e95 /pkg/server/socket
parentff51af3c48cee0d15c9802b2adb44f59a77e4c75 (diff)
add stat for streams
Diffstat (limited to 'pkg/server/socket')
-rw-r--r--pkg/server/socket/socket.go26
1 files changed, 20 insertions, 6 deletions
diff --git a/pkg/server/socket/socket.go b/pkg/server/socket/socket.go
index a9fa319..0060ce6 100644
--- a/pkg/server/socket/socket.go
+++ b/pkg/server/socket/socket.go
@@ -15,6 +15,7 @@ import (
var errAlreadyClosed = errors.New("already closed")
type Channel interface {
+ Origin() string
Send(wq queue.Q) error
Recv(rq queue.Q) error
Close() error
@@ -35,8 +36,9 @@ type dialSocket struct {
}
type connChannel struct {
- conn net.Conn
- once sync.Once
+ origin string
+ conn net.Conn
+ once sync.Once
}
type loopSocket struct{}
@@ -45,8 +47,8 @@ type loopChannel struct {
q queue.Q
}
-func newConnChannel(conn net.Conn) Channel {
- return &connChannel{conn: conn}
+func newConnChannel(origin string, conn net.Conn) Channel {
+ return &connChannel{origin: origin, conn: conn}
}
func (c *connChannel) final(f func() error, err error) error {
@@ -61,6 +63,10 @@ func (c *connChannel) final(f func() error, err error) error {
return err
}
+func (c *connChannel) Origin() string {
+ return c.origin
+}
+
func (c *connChannel) Send(wq queue.Q) error {
err := queue.IoCopy(c.conn, wq.Writer())
return c.final(c.Close, err)
@@ -113,7 +119,11 @@ func (s *listenSocket) Open(env env.Env) (Channel, error) {
if err != nil {
return nil, err
}
- return newConnChannel(conn), nil
+
+ addr := conn.RemoteAddr()
+ origin := fmt.Sprintf("%s/%s", addr.Network(), addr)
+
+ return newConnChannel(origin, conn), nil
}
func (s *listenSocket) String() string {
@@ -144,12 +154,16 @@ func (s *dialSocket) Open(env env.Env) (Channel, error) {
if err != nil {
return nil, err
}
- return newConnChannel(conn), nil
+ return newConnChannel("-", conn), nil
}
func (s *dialSocket) Close() {
}
+func (c *loopChannel) Origin() string {
+ return "loop"
+}
+
func (c *loopChannel) Send(wq queue.Q) error {
return queue.Copy(c.q, wq)
}