From c55afd2de177f128fae6e1c52d0c56af17096258 Mon Sep 17 00:00:00 2001 From: Mikhail Osipov Date: Sat, 29 Feb 2020 00:58:01 +0300 Subject: rename module to hook --- pkg/server/hook/aes.go | 87 +++++++++++++++++++++++++ pkg/server/hook/alpha.go | 32 ++++++++++ pkg/server/hook/auth.go | 150 ++++++++++++++++++++++++++++++++++++++++++++ pkg/server/hook/hex.go | 37 +++++++++++ pkg/server/hook/hook.go | 126 +++++++++++++++++++++++++++++++++++++ pkg/server/hook/split.go | 57 +++++++++++++++++ pkg/server/hook/tee.go | 120 +++++++++++++++++++++++++++++++++++ pkg/server/hook/zip.go | 55 ++++++++++++++++ pkg/server/module/aes.go | 85 ------------------------- pkg/server/module/alpha.go | 32 ---------- pkg/server/module/auth.go | 148 ------------------------------------------- pkg/server/module/hex.go | 37 ----------- pkg/server/module/module.go | 126 ------------------------------------- pkg/server/module/split.go | 57 ----------------- pkg/server/module/tee.go | 112 --------------------------------- pkg/server/module/zip.go | 55 ---------------- pkg/server/tunnel.go | 42 ++++++------- 17 files changed, 685 insertions(+), 673 deletions(-) create mode 100644 pkg/server/hook/aes.go create mode 100644 pkg/server/hook/alpha.go create mode 100644 pkg/server/hook/auth.go create mode 100644 pkg/server/hook/hex.go create mode 100644 pkg/server/hook/hook.go create mode 100644 pkg/server/hook/split.go create mode 100644 pkg/server/hook/tee.go create mode 100644 pkg/server/hook/zip.go delete mode 100644 pkg/server/module/aes.go delete mode 100644 pkg/server/module/alpha.go delete mode 100644 pkg/server/module/auth.go delete mode 100644 pkg/server/module/hex.go delete mode 100644 pkg/server/module/module.go delete mode 100644 pkg/server/module/split.go delete mode 100644 pkg/server/module/tee.go delete mode 100644 pkg/server/module/zip.go (limited to 'pkg/server') diff --git a/pkg/server/hook/aes.go b/pkg/server/hook/aes.go new file mode 100644 index 0000000..b461a34 --- /dev/null +++ b/pkg/server/hook/aes.go @@ -0,0 +1,87 @@ +package hook + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/md5" + "crypto/rand" + "io" + "tunnel/pkg/server/env" + "tunnel/pkg/server/opts" + "tunnel/pkg/server/queue" +) + +type aesHook struct{} + +type aesPipe struct { + key []byte +} + +func (a *aesPipe) Send(rq, wq queue.Q) error { + block, err := aes.NewCipher(a.key) + if err != nil { + return err + } + + iv := make([]byte, aes.BlockSize) + + if _, err := rand.Read(iv); err != nil { + return err + } + + writer := &cipher.StreamWriter{ + S: cipher.NewOFB(block, iv), + W: wq.Writer(), + } + + wq <- iv + + return queue.IoCopy(rq.Reader(), writer) +} + +func (a *aesPipe) Recv(rq, wq queue.Q) error { + block, err := aes.NewCipher(a.key) + if err != nil { + return err + } + + r := rq.Reader() + + iv := make([]byte, aes.BlockSize) + + if _, err := io.ReadFull(r, iv); err != nil { + if err == io.EOF { + return nil + } + return err + } + + reader := &cipher.StreamReader{ + S: cipher.NewOFB(block, iv), + R: r, + } + + return queue.IoCopy(reader, wq.Writer()) +} + +func newAes(env env.Env) *aesPipe { + s := getAuthSecret(env) + h := md5.Sum([]byte(s)) + + a := &aesPipe{key: make([]byte, 16)} + copy(a.key, h[:]) + + return a +} + +func (h aesHook) Open(env env.Env) (interface{}, error) { + return newAes(env), nil +} + +func newAesHook(opts.Opts, env.Env) (hook, error) { + return aesHook{}, nil +} + +func init() { + register("aes", newAesHook) +} diff --git a/pkg/server/hook/alpha.go b/pkg/server/hook/alpha.go new file mode 100644 index 0000000..d1fefcc --- /dev/null +++ b/pkg/server/hook/alpha.go @@ -0,0 +1,32 @@ +package hook + +import ( + "bufio" + "io" + "tunnel/pkg/server/queue" + "unicode" +) + +func alpha(cb func(rune) rune) Func { + return func(rq, wq queue.Q) error { + r := bufio.NewReader(rq.Reader()) + + for { + c, _, err := r.ReadRune() + if err != nil { + if err == io.EOF { + break + } + return err + } + wq <- []byte(string(cb(c))) + } + + return nil + } +} + +func init() { + registerFunc("lower", alpha(unicode.ToLower)) + registerFunc("upper", alpha(unicode.ToUpper)) +} diff --git a/pkg/server/hook/auth.go b/pkg/server/hook/auth.go new file mode 100644 index 0000000..fc19c2a --- /dev/null +++ b/pkg/server/hook/auth.go @@ -0,0 +1,150 @@ +package hook + +import ( + "crypto/md5" + "crypto/rand" + "errors" + "io" + "tunnel/pkg/netstring" + "tunnel/pkg/server/env" + "tunnel/pkg/server/opts" + "tunnel/pkg/server/queue" +) + +const ChallengeLen = 16 + +type auth struct { + secret string + + challenge struct { + self string + peer string + } + + hash string + + recvChallenge chan struct{} + recvHash chan struct{} + + fail chan struct{} + ok chan struct{} +} + +var errDupChallenge = errors.New("peer duplicates challenge") +var errAuthFail = errors.New("peer auth fail") + +type authHook struct{} + +func (a *auth) generateChallenge() error { + b := make([]byte, ChallengeLen) + if _, err := rand.Read(b); err != nil { + return err + } + + a.challenge.self = string(b) + + return nil +} + +func (a *auth) getHash(c string) string { + h := md5.New() + + io.WriteString(h, a.secret) + io.WriteString(h, c) + + return string(h.Sum(nil)) +} + +func (a *auth) isReady(c chan struct{}) bool { + select { + case <-a.fail: + return false + case <-c: + return true + } +} + +func (a *auth) Send(rq, wq queue.Q) error { + e := netstring.NewEncoder(wq.Writer()) + + if err := a.generateChallenge(); err != nil { + return err + } + + e.Encode(a.challenge.self) + + if !a.isReady(a.recvChallenge) { + return nil + } + + if a.challenge.self == a.challenge.peer { + return errDupChallenge + } + + e.Encode(a.getHash(a.challenge.peer)) + + if !a.isReady(a.recvHash) { + return nil + } + + if a.hash != a.getHash(a.challenge.self) { + close(a.fail) + return errAuthFail + } + + close(a.ok) + + return queue.Copy(rq, wq) +} + +func (a *auth) Recv(rq, wq queue.Q) (err error) { + r := rq.Reader() + d := netstring.NewDecoder(r) + + if a.challenge.peer, err = d.Decode(); err != nil { + close(a.fail) + return + } + + close(a.recvChallenge) + + if a.hash, err = d.Decode(); err != nil { + close(a.fail) + return err + } + + close(a.recvHash) + + if !a.isReady(a.ok) { + return nil + } + + return queue.IoCopy(r, wq.Writer()) +} + +func getAuthSecret(env env.Env) string { + if v := env.Eval("@{tunnel.@{tunnel}.secret}"); v != "" { + return v + } + + return env.Get("secret") +} + +func (h authHook) Open(env env.Env) (interface{}, error) { + a := &auth{ + secret: getAuthSecret(env), + recvChallenge: make(chan struct{}), + recvHash: make(chan struct{}), + fail: make(chan struct{}), + ok: make(chan struct{}), + } + return a, nil +} + +func newAuthHook(opts.Opts, env.Env) (hook, error) { + return authHook{}, nil +} + +func init() { + register("auth", newAuthHook) +} diff --git a/pkg/server/hook/hex.go b/pkg/server/hook/hex.go new file mode 100644 index 0000000..beaadeb --- /dev/null +++ b/pkg/server/hook/hex.go @@ -0,0 +1,37 @@ +package hook + +import ( + "encoding/hex" + "tunnel/pkg/server/env" + "tunnel/pkg/server/opts" + "tunnel/pkg/server/queue" +) + +type hexHook struct{} + +func (h hexHook) Send(rq, wq queue.Q) error { + enc := hex.NewEncoder(wq.Writer()) + + for b := range rq { + enc.Write(b) + } + + return nil +} + +func (h hexHook) Recv(rq, wq queue.Q) error { + r := hex.NewDecoder(rq.Reader()) + return queue.IoCopy(r, wq.Writer()) +} + +func (h hexHook) Open(env.Env) (interface{}, error) { + return h, nil +} + +func newHexHook(opts.Opts, env.Env) (hook, error) { + return hexHook{}, nil +} + +func init() { + register("hex", newHexHook) +} diff --git a/pkg/server/hook/hook.go b/pkg/server/hook/hook.go new file mode 100644 index 0000000..b2970ac --- /dev/null +++ b/pkg/server/hook/hook.go @@ -0,0 +1,126 @@ +package hook + +import ( + "fmt" + "log" + "sort" + "strings" + "tunnel/pkg/server/env" + "tunnel/pkg/server/opts" + "tunnel/pkg/server/queue" +) + +type hookInitFunc func(opts.Opts, env.Env) (hook, error) + +var hooks = map[string]hookInitFunc{} + +type hook interface { + Open(env env.Env) (interface{}, error) +} + +type H interface { + hook + String() string +} + +type Sender interface { + Send(rq, wq queue.Q) error +} + +type Recver interface { + Recv(rq, wq queue.Q) error +} + +type Func func(rq, wq queue.Q) error + +func (f Func) Send(rq, wq queue.Q) error { + return f(rq, wq) +} + +func (f Func) Open(env env.Env) (interface{}, error) { + return f, nil +} + +type wrapper struct { + hook + name string + reverse bool +} + +func (w *wrapper) String() string { + return fmt.Sprintf("hook:%s", w.name) +} + +func Open(h H, env env.Env) (Func, Func, error) { + var send, recv Func + + w := h.(*wrapper) + + it, err := h.Open(env) + if err != nil { + return nil, nil, err + } + + if sender, ok := it.(Sender); ok { + send = sender.Send + } + + if recver, ok := it.(Recver); ok { + recv = recver.Recv + } + + if w.reverse { + send, recv = recv, send + } + + return send, recv, nil +} + +func New(desc string, env env.Env) (H, error) { + name, opts := opts.Parse(desc) + reverse := false + + if strings.HasPrefix(name, "-") { + name = name[1:] + reverse = true + } + + if f, ok := hooks[name]; !ok { + return nil, fmt.Errorf("unknown hook '%s'", name) + } else if h, err := f(opts, env); err != nil { + return nil, err + } else { + w := &wrapper{ + hook: h, + name: name, + reverse: reverse, + } + return w, nil + } +} + +func register(name string, f hookInitFunc) { + if _, ok := hooks[name]; ok { + log.Panicf("duplicate hook name '%s'", name) + } + + hooks[name] = f +} + +func registerFunc(name string, p Func) { + register(name, func(opts.Opts, env.Env) (hook, error) { + return p, nil + }) +} + +func GetList() []string { + var list []string + + for k := range hooks { + list = append(list, k) + } + + sort.Strings(list) + + return list +} diff --git a/pkg/server/hook/split.go b/pkg/server/hook/split.go new file mode 100644 index 0000000..75faf48 --- /dev/null +++ b/pkg/server/hook/split.go @@ -0,0 +1,57 @@ +package hook + +import ( + "errors" + "strconv" + "tunnel/pkg/server/env" + "tunnel/pkg/server/opts" + "tunnel/pkg/server/queue" +) + +const splitDefaultSize = 1024 + +var errBadSize = errors.New("bad size value") + +type splitHook struct { + size int +} + +func (h *splitHook) Send(rq, wq queue.Q) error { + for b := range rq { + var upto int + + for n := 0; n < len(b); n = upto { + upto += h.size + + if upto > len(b) { + upto = len(b) + } + + wq <- b[n:upto] + } + } + + return nil +} + +func (h *splitHook) Open(env.Env) (interface{}, error) { + return h, nil +} + +func newSplitHook(opts opts.Opts, env env.Env) (hook, error) { + size := splitDefaultSize + + if s, ok := opts["size"]; ok { + var err error + + if size, err = strconv.Atoi(s); err != nil || size <= 0 { + return nil, errBadSize + } + } + + return &splitHook{size: size}, nil +} + +func init() { + register("split", newSplitHook) +} diff --git a/pkg/server/hook/tee.go b/pkg/server/hook/tee.go new file mode 100644 index 0000000..6591e33 --- /dev/null +++ b/pkg/server/hook/tee.go @@ -0,0 +1,120 @@ +package hook + +import ( + "bytes" + "encoding/hex" + "fmt" + "os" + "path" + "sync" + "tunnel/pkg/server/env" + "tunnel/pkg/server/opts" + "tunnel/pkg/server/queue" +) + +const teeDefaultFile = "/tmp/tunnel/dump" + +type tee struct { + f *os.File + mu sync.Mutex + wg sync.WaitGroup +} + +type teeHook struct { + file string +} + +func (t *tee) dump(s string, p []byte) error { + var out bytes.Buffer + + fmt.Fprintln(&out, s, len(p)) + + w := hex.Dumper(&out) + w.Write(p) + w.Close() + + if _, err := t.f.Write(out.Bytes()); err != nil { + return err + } + + return nil +} + +func (t *tee) Send(rq, wq queue.Q) error { + defer t.wg.Done() + + for b := range rq { + t.dump(">", b) + wq <- b + } + + return nil +} + +func (t *tee) Recv(rq, wq queue.Q) error { + defer t.wg.Done() + + for b := range rq { + t.dump("<", b) + wq <- b + } + + return nil +} + +func (h *teeHook) where(env env.Env) string { + if h.file != "" { + return h.file + } + + if v := env.Eval("@{tunnel.@{tunnel}.tee.file}"); v != "" { + return v + } + + if v, ok := env.Find("hook.tee.file"); ok { + return v + } + + return teeDefaultFile +} + +func (h *teeHook) Open(env env.Env) (interface{}, error) { + file := h.where(env) + dir := path.Dir(file) + + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + + tid, sid := env.Get("tunnel"), env.Get("stream") + name := fmt.Sprintf("%s.%s.%s", file, tid, sid) + + var t tee + + if f, err := os.Create(name); err != nil { + return nil, err + } else { + t.f = f + } + + t.wg.Add(2) + + go func() { + t.wg.Wait() + t.f.Close() + }() + + return &t, nil +} + +func newTeeHook(opts opts.Opts, env env.Env) (hook, error) { + h := &teeHook{} + if file, ok := opts["file"]; ok { + h.file = file + } + return h, nil +} + +func init() { + register("tee", newTeeHook) +} diff --git a/pkg/server/hook/zip.go b/pkg/server/hook/zip.go new file mode 100644 index 0000000..61264c9 --- /dev/null +++ b/pkg/server/hook/zip.go @@ -0,0 +1,55 @@ +package hook + +import ( + "compress/flate" + "io" + "tunnel/pkg/server/env" + "tunnel/pkg/server/opts" + "tunnel/pkg/server/queue" +) + +type zipHook struct{} + +func (m zipHook) Send(rq, wq queue.Q) error { + w, err := flate.NewWriter(wq.Writer(), flate.BestCompression) + if err != nil { + return err + } + + for b := range rq { + if _, err := w.Write(b); err != nil { + return err + } + if err := w.Flush(); err != nil { + return err + } + } + + return w.Close() +} + +func (m zipHook) Recv(rq, wq queue.Q) error { + r := flate.NewReader(rq.Reader()) + + // FIXME: not received ending due to ultimate conn.Close + if err := queue.IoCopy(r, wq.Writer()); err != nil { + if err == io.ErrUnexpectedEOF { + return nil + } + return err + } + + return r.Close() +} + +func (m zipHook) Open(env.Env) (interface{}, error) { + return m, nil +} + +func newZipHook(opts.Opts, env.Env) (hook, error) { + return zipHook{}, nil +} + +func init() { + register("zip", newZipHook) +} diff --git a/pkg/server/module/aes.go b/pkg/server/module/aes.go deleted file mode 100644 index 13153bc..0000000 --- a/pkg/server/module/aes.go +++ /dev/null @@ -1,85 +0,0 @@ -package module - -import ( - "crypto/aes" - "crypto/cipher" - "crypto/md5" - "crypto/rand" - "io" - "tunnel/pkg/server/env" - "tunnel/pkg/server/opts" - "tunnel/pkg/server/queue" -) - -type aesModule struct{} - -type aesPipe struct { - key []byte -} - -func (a *aesPipe) Send(rq, wq queue.Q) error { - block, err := aes.NewCipher(a.key) - if err != nil { - return err - } - - iv := make([]byte, aes.BlockSize) - - if _, err := rand.Read(iv); err != nil { - return err - } - - writer := &cipher.StreamWriter{ - S: cipher.NewOFB(block, iv), - W: wq.Writer(), - } - - wq <- iv - - return queue.IoCopy(rq.Reader(), writer) -} - -func (a *aesPipe) Recv(rq, wq queue.Q) error { - block, err := aes.NewCipher(a.key) - if err != nil { - return err - } - - r := rq.Reader() - - iv := make([]byte, aes.BlockSize) - - if _, err := io.ReadFull(r, iv); err != nil { - if err == io.EOF { - return nil - } - return err - } - - reader := &cipher.StreamReader{ - S: cipher.NewOFB(block, iv), - R: r, - } - - return queue.IoCopy(reader, wq.Writer()) -} - -func newAes(env env.Env) *aesPipe { - s := getAuthSecret(env) - h := md5.Sum([]byte(s)) - - a := &aesPipe{key: make([]byte, 16)} - copy(a.key, h[:]) - - return a -} - -func (m aesModule) Open(env env.Env) (interface{}, error) { - return newAes(env), nil -} - -func init() { - register("aes", func(opts.Opts, env.Env) (module, error) { - return aesModule{}, nil - }) -} diff --git a/pkg/server/module/alpha.go b/pkg/server/module/alpha.go deleted file mode 100644 index 8174b25..0000000 --- a/pkg/server/module/alpha.go +++ /dev/null @@ -1,32 +0,0 @@ -package module - -import ( - "bufio" - "io" - "tunnel/pkg/server/queue" - "unicode" -) - -func alpha(cb func(rune) rune) Func { - return func(rq, wq queue.Q) error { - r := bufio.NewReader(rq.Reader()) - - for { - c, _, err := r.ReadRune() - if err != nil { - if err == io.EOF { - break - } - return err - } - wq <- []byte(string(cb(c))) - } - - return nil - } -} - -func init() { - registerFunc("lower", alpha(unicode.ToLower)) - registerFunc("upper", alpha(unicode.ToUpper)) -} diff --git a/pkg/server/module/auth.go b/pkg/server/module/auth.go deleted file mode 100644 index 5e5caeb..0000000 --- a/pkg/server/module/auth.go +++ /dev/null @@ -1,148 +0,0 @@ -package module - -import ( - "crypto/md5" - "crypto/rand" - "errors" - "io" - "tunnel/pkg/netstring" - "tunnel/pkg/server/env" - "tunnel/pkg/server/opts" - "tunnel/pkg/server/queue" -) - -const ChallengeLen = 16 - -type auth struct { - secret string - - challenge struct { - self string - peer string - } - - hash string - - recvChallenge chan struct{} - recvHash chan struct{} - - fail chan struct{} - ok chan struct{} -} - -var errDupChallenge = errors.New("peer duplicates challenge") -var errAuthFail = errors.New("peer auth fail") - -type authModule struct{} - -func (a *auth) generateChallenge() error { - b := make([]byte, ChallengeLen) - if _, err := rand.Read(b); err != nil { - return err - } - - a.challenge.self = string(b) - - return nil -} - -func (a *auth) getHash(c string) string { - h := md5.New() - - io.WriteString(h, a.secret) - io.WriteString(h, c) - - return string(h.Sum(nil)) -} - -func (a *auth) isReady(c chan struct{}) bool { - select { - case <-a.fail: - return false - case <-c: - return true - } -} - -func (a *auth) Send(rq, wq queue.Q) error { - e := netstring.NewEncoder(wq.Writer()) - - if err := a.generateChallenge(); err != nil { - return err - } - - e.Encode(a.challenge.self) - - if !a.isReady(a.recvChallenge) { - return nil - } - - if a.challenge.self == a.challenge.peer { - return errDupChallenge - } - - e.Encode(a.getHash(a.challenge.peer)) - - if !a.isReady(a.recvHash) { - return nil - } - - if a.hash != a.getHash(a.challenge.self) { - close(a.fail) - return errAuthFail - } - - close(a.ok) - - return queue.Copy(rq, wq) -} - -func (a *auth) Recv(rq, wq queue.Q) (err error) { - r := rq.Reader() - d := netstring.NewDecoder(r) - - if a.challenge.peer, err = d.Decode(); err != nil { - close(a.fail) - return - } - - close(a.recvChallenge) - - if a.hash, err = d.Decode(); err != nil { - close(a.fail) - return err - } - - close(a.recvHash) - - if !a.isReady(a.ok) { - return nil - } - - return queue.IoCopy(r, wq.Writer()) -} - -func getAuthSecret(env env.Env) string { - if v := env.Eval("@{tunnel.@{tunnel}.secret}"); v != "" { - return v - } - - return env.Get("secret") -} - -func (m authModule) Open(env env.Env) (interface{}, error) { - a := &auth{ - secret: getAuthSecret(env), - recvChallenge: make(chan struct{}), - recvHash: make(chan struct{}), - fail: make(chan struct{}), - ok: make(chan struct{}), - } - return a, nil -} - -func init() { - register("auth", func(opts.Opts, env.Env) (module, error) { - return authModule{}, nil - }) -} diff --git a/pkg/server/module/hex.go b/pkg/server/module/hex.go deleted file mode 100644 index ef4ff37..0000000 --- a/pkg/server/module/hex.go +++ /dev/null @@ -1,37 +0,0 @@ -package module - -import ( - "encoding/hex" - "tunnel/pkg/server/env" - "tunnel/pkg/server/opts" - "tunnel/pkg/server/queue" -) - -type hexModule struct{} - -func (m hexModule) Send(rq, wq queue.Q) error { - enc := hex.NewEncoder(wq.Writer()) - - for b := range rq { - enc.Write(b) - } - - return nil -} - -func (m hexModule) Recv(rq, wq queue.Q) error { - r := hex.NewDecoder(rq.Reader()) - return queue.IoCopy(r, wq.Writer()) -} - -func (m hexModule) Open(env.Env) (interface{}, error) { - return m, nil -} - -func newHexModule(opts.Opts, env.Env) (module, error) { - return hexModule{}, nil -} - -func init() { - register("hex", newHexModule) -} diff --git a/pkg/server/module/module.go b/pkg/server/module/module.go deleted file mode 100644 index c638299..0000000 --- a/pkg/server/module/module.go +++ /dev/null @@ -1,126 +0,0 @@ -package module - -import ( - "fmt" - "log" - "sort" - "strings" - "tunnel/pkg/server/env" - "tunnel/pkg/server/opts" - "tunnel/pkg/server/queue" -) - -type moduleInitFunc func(opts.Opts, env.Env) (module, error) - -var modules = map[string]moduleInitFunc{} - -type module interface { - Open(env env.Env) (interface{}, error) -} - -type M interface { - module - String() string -} - -type Sender interface { - Send(rq, wq queue.Q) error -} - -type Recver interface { - Recv(rq, wq queue.Q) error -} - -type Func func(rq, wq queue.Q) error - -func (f Func) Send(rq, wq queue.Q) error { - return f(rq, wq) -} - -func (f Func) Open(env env.Env) (interface{}, error) { - return f, nil -} - -type wrapper struct { - module - name string - reverse bool -} - -func (w *wrapper) String() string { - return fmt.Sprintf("module:%s", w.name) -} - -func Open(m M, env env.Env) (Func, Func, error) { - var send, recv Func - - w := m.(*wrapper) - - it, err := m.Open(env) - if err != nil { - return nil, nil, err - } - - if sender, ok := it.(Sender); ok { - send = sender.Send - } - - if recver, ok := it.(Recver); ok { - recv = recver.Recv - } - - if w.reverse { - send, recv = recv, send - } - - return send, recv, nil -} - -func New(desc string, env env.Env) (M, error) { - name, opts := opts.Parse(desc) - reverse := false - - if strings.HasPrefix(name, "-") { - name = name[1:] - reverse = true - } - - if f, ok := modules[name]; !ok { - return nil, fmt.Errorf("unknown module '%s'", name) - } else if m, err := f(opts, env); err != nil { - return nil, err - } else { - w := &wrapper{ - module: m, - name: name, - reverse: reverse, - } - return w, nil - } -} - -func register(name string, f moduleInitFunc) { - if _, ok := modules[name]; ok { - log.Panicf("duplicate module name '%s'", name) - } - - modules[name] = f -} - -func registerFunc(name string, p Func) { - register(name, func(opts.Opts, env.Env) (module, error) { - return p, nil - }) -} - -func GetList() []string { - var list []string - - for k := range modules { - list = append(list, k) - } - - sort.Strings(list) - - return list -} diff --git a/pkg/server/module/split.go b/pkg/server/module/split.go deleted file mode 100644 index 139d062..0000000 --- a/pkg/server/module/split.go +++ /dev/null @@ -1,57 +0,0 @@ -package module - -import ( - "errors" - "strconv" - "tunnel/pkg/server/env" - "tunnel/pkg/server/opts" - "tunnel/pkg/server/queue" -) - -const splitDefaultSize = 1024 - -var errBadSize = errors.New("bad size value") - -type splitModule struct { - size int -} - -func (m *splitModule) Send(rq, wq queue.Q) error { - for b := range rq { - var upto int - - for n := 0; n < len(b); n = upto { - upto += m.size - - if upto > len(b) { - upto = len(b) - } - - wq <- b[n:upto] - } - } - - return nil -} - -func (m *splitModule) Open(env.Env) (interface{}, error) { - return m, nil -} - -func newSplitModule(opts opts.Opts, env env.Env) (module, error) { - size := splitDefaultSize - - if s, ok := opts["size"]; ok { - var err error - - if size, err = strconv.Atoi(s); err != nil || size <= 0 { - return nil, errBadSize - } - } - - return &splitModule{size: size}, nil -} - -func init() { - register("split", newSplitModule) -} diff --git a/pkg/server/module/tee.go b/pkg/server/module/tee.go deleted file mode 100644 index 7953247..0000000 --- a/pkg/server/module/tee.go +++ /dev/null @@ -1,112 +0,0 @@ -package module - -import ( - "bytes" - "fmt" - "os" - "encoding/hex" - "sync" - "tunnel/pkg/server/env" - "tunnel/pkg/server/opts" - "tunnel/pkg/server/queue" -) - -const teeDefaultPath = "/tmp/tunnel.dump" - -type tee struct { - f *os.File - mu sync.Mutex - wg sync.WaitGroup -} - -type teeModule struct { - path string -} - -func (t *tee) dump(s string, p []byte) error { - var out bytes.Buffer - - fmt.Fprintln(&out, s, len(p)) - - w := hex.Dumper(&out) - w.Write(p) - w.Close() - - if _, err := t.f.Write(out.Bytes()); err != nil { - return err - } - - return nil -} - -func (t *tee) Send(rq, wq queue.Q) error { - defer t.wg.Done() - - for b := range rq { - t.dump(">", b) - wq <- b - } - - return nil -} - -func (t *tee) Recv(rq, wq queue.Q) error { - defer t.wg.Done() - - for b := range rq { - t.dump("<", b) - wq <- b - } - - return nil -} - -func (m *teeModule) where(env env.Env) string { - if m.path != "" { - return m.path - } - - if v := env.Eval("@{tunnel.@{tunnel}.tee.path}"); v != "" { - return v - } - - if v, ok := env.Find("module.tee.path"); ok { - return v - } - - return teeDefaultPath -} - -func (m *teeModule) Open(env env.Env) (interface{}, error) { - tid, sid := env.Get("tunnel"), env.Get("stream") - name := fmt.Sprintf("%s.%s.%s", m.where(env), tid, sid) - - var t tee - - if f, err := os.Create(name); err != nil { - return nil, err - } else { - t.f = f - } - - t.wg.Add(2) - - go func() { - t.wg.Wait() - t.f.Close() - }() - - return &t, nil -} - -func newTeeModule(opts opts.Opts, env env.Env) (module, error) { - m := &teeModule{} - if path, ok := opts["path"]; ok { - m.path = path - } - return m, nil -} - -func init() { - register("tee", newTeeModule) -} diff --git a/pkg/server/module/zip.go b/pkg/server/module/zip.go deleted file mode 100644 index b0abb38..0000000 --- a/pkg/server/module/zip.go +++ /dev/null @@ -1,55 +0,0 @@ -package module - -import ( - "compress/flate" - "io" - "tunnel/pkg/server/env" - "tunnel/pkg/server/opts" - "tunnel/pkg/server/queue" -) - -type zipModule struct{} - -func (m zipModule) Send(rq, wq queue.Q) error { - w, err := flate.NewWriter(wq.Writer(), flate.BestCompression) - if err != nil { - return err - } - - for b := range rq { - if _, err := w.Write(b); err != nil { - return err - } - if err := w.Flush(); err != nil { - return err - } - } - - return w.Close() -} - -func (m zipModule) Recv(rq, wq queue.Q) error { - r := flate.NewReader(rq.Reader()) - - // FIXME: not received ending due to ultimate conn.Close - if err := queue.IoCopy(r, wq.Writer()); err != nil { - if err == io.ErrUnexpectedEOF { - return nil - } - return err - } - - return r.Close() -} - -func (m zipModule) Open(env.Env) (interface{}, error) { - return m, nil -} - -func newZipModule(opts.Opts, env.Env) (module, error) { - return zipModule{}, nil -} - -func init() { - register("zip", newZipModule) -} diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index 91d7533..e00db6c 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -12,7 +12,7 @@ import ( "time" "tunnel/pkg/config" "tunnel/pkg/server/env" - "tunnel/pkg/server/module" + "tunnel/pkg/server/hook" "tunnel/pkg/server/queue" "tunnel/pkg/server/socket" ) @@ -41,7 +41,7 @@ type tunnel struct { done chan struct{} in, out socket.S - m []module.M + hooks []hook.H env env.Env } @@ -187,14 +187,14 @@ func (s *stream) channel(c socket.Channel, rq, wq queue.Q) { }() } -func (s *stream) pipe(m module.M, f module.Func, rq, wq queue.Q) { +func (s *stream) pipe(h hook.H, f hook.Func, rq, wq queue.Q) { s.wg.Add(1) go func() { defer s.wg.Done() if err := f(rq, wq); err != nil && !errors.Is(err, io.EOF) { - log.Println(s.t, s, m, err) + log.Println(s.t, s, h, err) } close(wq) @@ -209,23 +209,23 @@ func (s *stream) run() { s.channel(s.in, rq, wq) - for _, m := range s.t.m { - send, recv, err := module.Open(m, s.env) + for _, h := range s.t.hooks { + send, recv, err := hook.Open(h, s.env) if err != nil { // FIXME: abort stream on error - log.Println(s.t, s, m, err) + log.Println(s.t, s, h, err) continue } if send != nil { q := queue.New() - s.pipe(m, send, wq, q) + s.pipe(h, send, wq, q) wq = q } if recv != nil { q := queue.New() - s.pipe(m, recv, q, rq) + s.pipe(h, recv, q, rq) rq = q } } @@ -238,23 +238,23 @@ func (s *stream) stop() { s.out.Close() } -func parseModules(args []string, env env.Env) ([]module.M, error) { - var mm []module.M +func parseHooks(args []string, env env.Env) ([]hook.H, error) { + var hooks []hook.H for _, arg := range args { - if m, err := module.New(arg, env); err != nil { + if h, err := hook.New(arg, env); err != nil { return nil, err } else { - mm = append(mm, m) + hooks = append(hooks, h) } } - return mm, nil + return hooks, nil } func newTunnel(args []string, env env.Env) (*tunnel, error) { var in, out socket.S - var mm []module.M + var hooks []hook.H var err error n := len(args) - 1 @@ -268,7 +268,7 @@ func newTunnel(args []string, env env.Env) (*tunnel, error) { return nil, err } - if mm, err = parseModules(args[1:n], env); err != nil { + if hooks, err = parseHooks(args[1:n], env); err != nil { in.Close() out.Close() return nil, err @@ -278,7 +278,7 @@ func newTunnel(args []string, env env.Env) (*tunnel, error) { args: strings.Join(args, " "), quit: make(chan struct{}), done: make(chan struct{}), - m: mm, + hooks: hooks, in: in, out: out, env: env, @@ -410,9 +410,9 @@ func showStreams(r *request) { }) } -func showModules(r *request) { - for _, m := range module.GetList() { - r.Println(m) +func showHooks(r *request) { + for _, h := range hook.GetList() { + r.Println(h) } } @@ -422,7 +422,7 @@ func init() { newCmd(tunnelRename, "rename") - newCmd(showModules, "modules") + newCmd(showHooks, "hooks") newCmd(showStreams, "streams") newCmd(showTunnels, "show") } -- cgit v1.2.3-70-g09d2