summaryrefslogtreecommitdiff
path: root/pkg/server/tunnel.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/server/tunnel.go')
-rw-r--r--pkg/server/tunnel.go74
1 files changed, 37 insertions, 37 deletions
diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go
index 32c81c3..58ae0e1 100644
--- a/pkg/server/tunnel.go
+++ b/pkg/server/tunnel.go
@@ -1,29 +1,29 @@
package server
import (
- "tunnel/pkg/server/socket"
- "tunnel/pkg/server/module"
- "tunnel/pkg/server/queue"
- "tunnel/pkg/server/env"
- "tunnel/pkg/config"
- "strings"
- "time"
- "sort"
- "sync"
"fmt"
"log"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+ "tunnel/pkg/config"
+ "tunnel/pkg/server/env"
+ "tunnel/pkg/server/module"
+ "tunnel/pkg/server/queue"
+ "tunnel/pkg/server/socket"
)
type stream struct {
- id int
- t *tunnel
- since time.Time
- wg sync.WaitGroup
+ id int
+ t *tunnel
+ since time.Time
+ wg sync.WaitGroup
in, out socket.Channel
}
type tunnel struct {
- id string
+ id string
args string
streams map[int]*stream
@@ -37,7 +37,7 @@ type tunnel struct {
done chan struct{}
in, out socket.S
- m []module.M
+ m []module.M
env env.Env
}
@@ -99,7 +99,7 @@ func (t *tunnel) serve() {
wg.Add(1)
- go func () {
+ go func() {
t.handle(in)
wg.Done()
}()
@@ -128,10 +128,10 @@ func (t *tunnel) handle(in socket.Channel) {
func (t *tunnel) newStream(in, out socket.Channel) *stream {
s := &stream{
- t: t,
- in: in,
- out: out,
- id: t.nextSid,
+ t: t,
+ in: in,
+ out: out,
+ id: t.nextSid,
since: time.Now(),
}
@@ -142,7 +142,7 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream {
t.streams[s.id] = s
t.mu.Unlock()
- go func () {
+ go func() {
s.wg.Wait()
s.t.mu.Lock()
@@ -158,7 +158,7 @@ func (t *tunnel) newStream(in, out socket.Channel) *stream {
}
func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
- watch := func (q queue.Q, f func (q queue.Q) error) {
+ watch := func(q queue.Q, f func(q queue.Q) error) {
defer s.wg.Done()
if err := f(q); err != nil {
@@ -168,12 +168,12 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
s.wg.Add(2)
- go func () {
+ go func() {
watch(wq, c.Send)
close(wq)
}()
- go func () {
+ go func() {
watch(rq, c.Recv)
rq.Dry()
}()
@@ -182,7 +182,7 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) {
func (s *stream) pipe(m module.M, p module.Pipe, rq, wq queue.Q) {
s.wg.Add(1)
- go func () {
+ go func() {
defer s.wg.Done()
if err := p(rq, wq); err != nil {
@@ -287,13 +287,13 @@ func newTunnel(args []string, env env.Env) (*tunnel, error) {
}
t := &tunnel{
- args: strings.Join(args, " "),
- quit: make(chan struct{}),
- done: make(chan struct{}),
- m: mm,
- in: in,
- out: out,
- env: env,
+ args: strings.Join(args, " "),
+ quit: make(chan struct{}),
+ done: make(chan struct{}),
+ m: mm,
+ in: in,
+ out: out,
+ env: env,
streams: make(map[int]*stream),
}
@@ -372,7 +372,7 @@ func tunnelRename(r *request) {
}
}
-func foreachTunnel(m automap, f func (t *tunnel)) {
+func foreachTunnel(m automap, f func(t *tunnel)) {
var keys []string
for k := range m {
@@ -386,7 +386,7 @@ func foreachTunnel(m automap, f func (t *tunnel)) {
}
}
-func foreachStream(m map[int]*stream, f func (s *stream)) {
+func foreachStream(m map[int]*stream, f func(s *stream)) {
var keys []int
for k := range m {
@@ -401,20 +401,20 @@ func foreachStream(m map[int]*stream, f func (s *stream)) {
}
func tunnelShow(r *request) {
- foreachTunnel(r.c.s.tunnels, func (t *tunnel) {
+ foreachTunnel(r.c.s.tunnels, func(t *tunnel) {
r.Println(t.id, t.args)
})
}
func streamShow(r *request) {
- foreachTunnel(r.c.s.tunnels, func (t *tunnel) {
+ foreachTunnel(r.c.s.tunnels, func(t *tunnel) {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.streams) > 0 {
r.Println(t.id, t.args)
- foreachStream(t.streams, func (s *stream) {
+ foreachStream(t.streams, func(s *stream) {
tm := s.since.Format(config.TimeFormat)
r.Println("\t", s.id, tm, s.in, s.out)
})