Skip to content

Commit 454cf2f

Browse files
nsqd: configurable HTTP client timeouts
Adds configuration options HTTPClientConnectTimeout and HTTPClientRequestTimeout to control the connection and request timeout repectively of the HTTP client. Also added to the following app binaries: - nsqadmin - nsq_stat - nsq_to_file - nsq_to_http Closes nsqio#715 Closes nsqio#680
1 parent 22c8c64 commit 454cf2f

File tree

18 files changed

+165
-92
lines changed

18 files changed

+165
-92
lines changed

apps/nsq_stat/nsq_stat.go

+24-12
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ import (
2222
)
2323

2424
var (
25-
showVersion = flag.Bool("version", false, "print version")
26-
topic = flag.String("topic", "", "NSQ topic")
27-
channel = flag.String("channel", "", "NSQ channel")
28-
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
29-
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
30-
countNum = numValue{}
31-
nsqdHTTPAddrs = app.StringArray{}
32-
lookupdHTTPAddrs = app.StringArray{}
25+
showVersion = flag.Bool("version", false, "print version")
26+
topic = flag.String("topic", "", "NSQ topic")
27+
channel = flag.String("channel", "", "NSQ channel")
28+
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
29+
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
30+
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
31+
httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
32+
countNum = numValue{}
33+
nsqdHTTPAddrs = app.StringArray{}
34+
lookupdHTTPAddrs = app.StringArray{}
3335
)
3436

3537
type numValue struct {
@@ -55,9 +57,9 @@ func init() {
5557
flag.Var(&countNum, "count", "number of reports")
5658
}
5759

58-
func statLoop(interval time.Duration, topic string, channel string,
59-
nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
60-
ci := clusterinfo.New(nil, http_api.NewClient(nil))
60+
func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeout time.Duration,
61+
topic string, channel string, nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
62+
ci := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout))
6163
var o *clusterinfo.ChannelStats
6264
for i := 0; !countNum.isSet || countNum.value >= i; i++ {
6365
var producers clusterinfo.Producers
@@ -149,6 +151,16 @@ func main() {
149151
log.Fatal("--interval should be positive")
150152
}
151153

154+
connectTimeout := *httpConnectTimeout
155+
if int64(connectTimeout) <= 0 {
156+
log.Fatal("--http-client-connect-timeout should be positive")
157+
}
158+
159+
requestTimeout := *httpRequestTimeout
160+
if int64(requestTimeout) <= 0 {
161+
log.Fatal("--http-client-request-timeout should be positive")
162+
}
163+
152164
if countNum.isSet && countNum.value <= 0 {
153165
log.Fatal("--count should be positive")
154166
}
@@ -171,7 +183,7 @@ func main() {
171183
termChan := make(chan os.Signal, 1)
172184
signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
173185

174-
go statLoop(intvl, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
186+
go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
175187

176188
<-termChan
177189
}

apps/nsq_to_file/nsq_to_file.go

+21-6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ var (
4545
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
4646
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")
4747

48+
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
49+
httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
50+
4851
nsqdTCPAddrs = app.StringArray{}
4952
lookupdHTTPAddrs = app.StringArray{}
5053
topics = app.StringArray{}
@@ -404,8 +407,9 @@ func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool {
404407
return match
405408
}
406409

407-
func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
408-
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(addrs)
410+
func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string,
411+
connectTimeout time.Duration, requestTimeout time.Duration) {
412+
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(addrs)
409413
if err != nil {
410414
log.Printf("ERROR: could not retrieve topic list: %s", err)
411415
}
@@ -438,13 +442,14 @@ func (t *TopicDiscoverer) hup() {
438442
}
439443
}
440444

441-
func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
445+
func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string,
446+
connectTimeout time.Duration, requestTimeout time.Duration) {
442447
ticker := time.Tick(*topicPollRate)
443448
for {
444449
select {
445450
case <-ticker:
446451
if sync {
447-
t.syncTopics(addrs, pattern)
452+
t.syncTopics(addrs, pattern, connectTimeout, requestTimeout)
448453
}
449454
case <-t.termChan:
450455
t.stop()
@@ -474,6 +479,16 @@ func main() {
474479
log.Fatal("--channel is required")
475480
}
476481

482+
connectTimeout := *httpConnectTimeout
483+
if int64(connectTimeout) <= 0 {
484+
log.Fatal("--http-client-connect-timeout should be positive")
485+
}
486+
487+
requestTimeout := *httpRequestTimeout
488+
if int64(requestTimeout) <= 0 {
489+
log.Fatal("--http-client-request-timeout should be positive")
490+
}
491+
477492
var topicsFromNSQLookupd bool
478493

479494
if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
@@ -516,7 +531,7 @@ func main() {
516531
}
517532
topicsFromNSQLookupd = true
518533
var err error
519-
topics, err = clusterinfo.New(nil, http_api.NewClient(nil)).GetLookupdTopics(lookupdHTTPAddrs)
534+
topics, err = clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(lookupdHTTPAddrs)
520535
if err != nil {
521536
log.Fatalf("ERROR: could not retrieve topic list: %s", err)
522537
}
@@ -536,5 +551,5 @@ func main() {
536551
go discoverer.startTopicRouter(logger)
537552
}
538553

539-
discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern)
554+
discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern, connectTimeout, requestTimeout)
540555
}

apps/nsq_to_http/http.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ var httpclient *http.Client
1313
var userAgent string
1414

1515
func init() {
16-
httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpTimeout)}
16+
httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpConnectTimeout, *httpRequestTimeout), Timeout: *httpRequestTimeout}
1717
userAgent = fmt.Sprintf("nsq_to_http v%s", version.Binary)
1818
}
1919

apps/nsq_to_http/nsq_to_http.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@ var (
4242
numPublishers = flag.Int("n", 100, "number of concurrent publishers")
4343
mode = flag.String("mode", "hostpool", "the upstream request mode options: multicast, round-robin, hostpool (default), epsilon-greedy")
4444
sample = flag.Float64("sample", 1.0, "% of messages to publish (float b/w 0 -> 1)")
45-
httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)")
46-
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
47-
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")
45+
// TODO: remove; deprecated in favor of http-client-connect-timeout, http-client-request-timeout
46+
httpTimeout = flag.Duration("http-timeout", 20*time.Second, "timeout for HTTP connect/read/write (each)")
47+
httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
48+
httpRequestTimeout = flag.Duration("http-client-request-timeout", 20*time.Second, "timeout for HTTP request")
49+
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
50+
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")
4851

4952
getAddrs = app.StringArray{}
5053
postAddrs = app.StringArray{}
@@ -249,6 +252,13 @@ func main() {
249252
*httpTimeout = time.Duration(*httpTimeoutMs) * time.Millisecond
250253
}
251254

255+
// TODO: remove, deprecated
256+
if hasArg("http-timeout") {
257+
log.Printf("WARNING: --http-timeout is deprecated in favor of --http-client-connect-timeout=X and --http-client-request-timeout=Y")
258+
*httpConnectTimeout = *httpTimeout
259+
*httpRequestTimeout = *httpTimeout
260+
}
261+
252262
termChan := make(chan os.Signal, 1)
253263
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
254264

apps/nsqadmin/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ var (
3636

3737
notificationHTTPEndpoint = flagSet.String("notification-http-endpoint", "", "HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent")
3838

39+
httpConnectTimeout = flagSet.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
40+
httpRequestTimeout = flagSet.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
41+
3942
httpClientTLSInsecureSkipVerify = flagSet.Bool("http-client-tls-insecure-skip-verify", false, "configure the HTTP client to skip verification of TLS certificates")
4043
httpClientTLSRootCAFile = flagSet.String("http-client-tls-root-ca-file", "", "path to CA file for the HTTP client")
4144
httpClientTLSCert = flagSet.String("http-client-tls-cert", "", "path to certificate file for the HTTP client")

apps/nsqd/nsqd.go

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
9090
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
9191
lookupdTCPAddrs := app.StringArray{}
9292
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
93+
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
94+
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")
9395

9496
// diskqueue options
9597
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")

contrib/nsqd.cfg.example

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ nsqlookupd_tcp_addresses = [
2121
"127.0.0.1:4160"
2222
]
2323

24+
## duration to wait before HTTP client connection timeout
25+
http_client_connect_timeout = "2s"
26+
27+
## duration to wait before HTTP client request timeout
28+
http_client_request_timeout = "5s"
2429

2530
## path to store disk-backed messages
2631
# data_path = "/var/lib/nsq"

internal/auth/authorizations.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ func (a *State) IsExpired() bool {
7676
return false
7777
}
7878

79-
func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*State, error) {
79+
func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string,
80+
connectTimeout time.Duration, requestTimeout time.Duration) (*State, error) {
8081
for _, a := range authd {
81-
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret)
82+
authState, err := QueryAuthd(a, remoteIP, tlsEnabled, authSecret, connectTimeout, requestTimeout)
8283
if err != nil {
8384
log.Printf("Error: failed auth against %s %s", a, err)
8485
continue
@@ -88,7 +89,8 @@ func QueryAnyAuthd(authd []string, remoteIP, tlsEnabled, authSecret string) (*St
8889
return nil, errors.New("Unable to access auth server")
8990
}
9091

91-
func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error) {
92+
func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string,
93+
connectTimeout time.Duration, requestTimeout time.Duration) (*State, error) {
9294
v := url.Values{}
9395
v.Set("remote_ip", remoteIP)
9496
v.Set("tls", tlsEnabled)
@@ -97,7 +99,7 @@ func QueryAuthd(authd, remoteIP, tlsEnabled, authSecret string) (*State, error)
9799
endpoint := fmt.Sprintf("http://%s/auth?%s", authd, v.Encode())
98100

99101
var authState State
100-
client := http_api.NewClient(nil)
102+
client := http_api.NewClient(nil, connectTimeout, requestTimeout)
101103
if err := client.GETV1(endpoint, &authState); err != nil {
102104
return nil, err
103105
}

internal/http_api/api_request.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,24 @@ type deadlinedConn struct {
1919
}
2020

2121
func (c *deadlinedConn) Read(b []byte) (n int, err error) {
22-
c.Conn.SetReadDeadline(time.Now().Add(c.Timeout))
2322
return c.Conn.Read(b)
2423
}
2524

2625
func (c *deadlinedConn) Write(b []byte) (n int, err error) {
27-
c.Conn.SetWriteDeadline(time.Now().Add(c.Timeout))
2826
return c.Conn.Write(b)
2927
}
3028

3129
// A custom http.Transport with support for deadline timeouts
32-
func NewDeadlineTransport(timeout time.Duration) *http.Transport {
30+
func NewDeadlineTransport(connectTimeout time.Duration, requestTimeout time.Duration) *http.Transport {
3331
transport := &http.Transport{
3432
Dial: func(netw, addr string) (net.Conn, error) {
35-
c, err := net.DialTimeout(netw, addr, timeout)
33+
c, err := net.DialTimeout(netw, addr, connectTimeout)
3634
if err != nil {
3735
return nil, err
3836
}
39-
return &deadlinedConn{timeout, c}, nil
37+
return &deadlinedConn{connectTimeout, c}, nil
4038
},
39+
ResponseHeaderTimeout: requestTimeout,
4140
}
4241
return transport
4342
}
@@ -46,12 +45,13 @@ type Client struct {
4645
c *http.Client
4746
}
4847

49-
func NewClient(tlsConfig *tls.Config) *Client {
50-
transport := NewDeadlineTransport(2 * time.Second)
48+
func NewClient(tlsConfig *tls.Config, connectTimeout time.Duration, requestTimeout time.Duration) *Client {
49+
transport := NewDeadlineTransport(connectTimeout, requestTimeout)
5150
transport.TLSClientConfig = tlsConfig
5251
return &Client{
5352
c: &http.Client{
5453
Transport: transport,
54+
Timeout: requestTimeout,
5555
},
5656
}
5757
}

nsqadmin/bindata.go

+25-25
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)