summaryrefslogtreecommitdiff
path: root/pkg/server/queue/queue.go
diff options
context:
space:
mode:
authorMikhail Osipov <mike.osipov@gmail.com>2020-03-02 23:12:12 +0300
committerMikhail Osipov <mike.osipov@gmail.com>2020-03-02 23:12:12 +0300
commitc37304d7059623c1d5cc4a01b2e6c2e9670dcbc5 (patch)
treeb1d13633d9ff7a5c25a61c8f589252a24720319e /pkg/server/queue/queue.go
parente6a63987f6963241dcfa981bf1081206a06f2990 (diff)
[hook/proxy] add timeouttimeout
Diffstat (limited to 'pkg/server/queue/queue.go')
-rw-r--r--pkg/server/queue/queue.go21
1 files changed, 20 insertions, 1 deletions
diff --git a/pkg/server/queue/queue.go b/pkg/server/queue/queue.go
index 71f914c..11b2158 100644
--- a/pkg/server/queue/queue.go
+++ b/pkg/server/queue/queue.go
@@ -1,14 +1,19 @@
package queue
import (
+ "errors"
+ "time"
"io"
)
+var errTimeout = errors.New("read timeout expired")
+
type Q chan []byte
type reader struct {
b []byte
q Q
+ d time.Duration
}
type writer struct {
@@ -23,9 +28,23 @@ func (q Q) Reader() io.Reader {
return &reader{q: q}
}
+func (q Q) TimeoutReader(d time.Duration) io.Reader {
+ return &reader{q: q, d: d}
+}
+
func (r *reader) Read(p []byte) (int, error) {
if len(r.b) == 0 {
- r.b = <-r.q
+ if r.d == 0 {
+ r.b = <-r.q
+ } else {
+ t := time.NewTimer(r.d)
+ select {
+ case r.b = <-r.q:
+ t.Stop()
+ case <-t.C:
+ return 0, errTimeout
+ }
+ }
if r.b == nil {
return 0, io.EOF
}