package socket import ( "bufio" "errors" "fmt" "io" "log" "os/exec" "strings" "sync" "tunnel/pkg/server/env" "tunnel/pkg/server/queue" ) type execSocket struct { Cmd string `opts:"required"` } type execConn struct { s *execSocket cmd *exec.Cmd stdin io.WriteCloser stdout io.ReadCloser once sync.Once wg sync.WaitGroup } func (s *execSocket) String() string { return fmt.Sprintf("exec(%s)", s.Cmd) } func (s *execSocket) Close() { } func (s *execSocket) New(env env.Env) (Conn, error) { tunnel, stream := env.Get("tunnel"), env.Get("stream") args := strings.Fields(s.Cmd) if len(args) == 0 { return nil, errors.New("bad command") } cmd := exec.Command(args[0], args[1:]...) stdin, _ := cmd.StdinPipe() stdout, _ := cmd.StdoutPipe() stderr, _ := cmd.StderrPipe() if err := cmd.Start(); err != nil { return nil, err } c := &execConn{ s: s, cmd: cmd, stdin: stdin, stdout: stdout, } c.wg.Add(2) go func(s string, r io.Reader) { for scanner := bufio.NewScanner(r); scanner.Scan(); { log.Printf("tunnel:%s stream:%s %s > %s", tunnel, stream, s, scanner.Text()) } c.wg.Done() }(args[0], stderr) go func() { c.wg.Wait() c.cmd.Wait() }() return c, nil } func (c *execConn) String() string { return c.s.String() } func (c *execConn) Send(wq queue.Q) error { c.wg.Add(1) defer c.wg.Done() return queue.IoCopy(c.stdout, wq.Writer()) } func (c *execConn) Recv(rq queue.Q) error { c.wg.Add(1) defer c.wg.Done() return queue.IoCopy(rq.Reader(), c.stdin) } func (c *execConn) Close() error { err := ErrAlreadyClosed c.once.Do(func() { log.Println("close", c.s) c.cmd.Process.Kill() c.wg.Done() err = nil }) return err } func init() { register("exec", "in/out throw external process", execSocket{}) }