summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/server/hook/proxy.go3
-rw-r--r--pkg/server/queue/queue.go21
2 files changed, 22 insertions, 2 deletions
diff --git a/pkg/server/hook/proxy.go b/pkg/server/hook/proxy.go
index 64db784..6c41742 100644
--- a/pkg/server/hook/proxy.go
+++ b/pkg/server/hook/proxy.go
@@ -9,6 +9,7 @@ import (
"io"
"regexp"
"strconv"
+ "time"
"tunnel/pkg/server/env"
"tunnel/pkg/server/opts"
"tunnel/pkg/server/queue"
@@ -87,7 +88,7 @@ func (p *proxy) Recv(rq, wq queue.Q) (err error) {
}
}()
- s := bufio.NewScanner(rq.Reader())
+ s := bufio.NewScanner(rq.TimeoutReader(5 * time.Second))
var resp bool
diff --git a/pkg/server/queue/queue.go b/pkg/server/queue/queue.go
index 71f914c..11b2158 100644
--- a/pkg/server/queue/queue.go
+++ b/pkg/server/queue/queue.go
@@ -1,14 +1,19 @@
package queue
import (
+ "errors"
+ "time"
"io"
)
+var errTimeout = errors.New("read timeout expired")
+
type Q chan []byte
type reader struct {
b []byte
q Q
+ d time.Duration
}
type writer struct {
@@ -23,9 +28,23 @@ func (q Q) Reader() io.Reader {
return &reader{q: q}
}
+func (q Q) TimeoutReader(d time.Duration) io.Reader {
+ return &reader{q: q, d: d}
+}
+
func (r *reader) Read(p []byte) (int, error) {
if len(r.b) == 0 {
- r.b = <-r.q
+ if r.d == 0 {
+ r.b = <-r.q
+ } else {
+ t := time.NewTimer(r.d)
+ select {
+ case r.b = <-r.q:
+ t.Stop()
+ case <-t.C:
+ return 0, errTimeout
+ }
+ }
if r.b == nil {
return 0, io.EOF
}