Skip to content

Commit 40e798c

Browse files
committed
Sidecar: fix startup sequence
Previously we defered starting the gRPC server by blocking the whole startup until we could ping prometheus. This breaks usecases that rely on the config reloader to start prometheus. We fix it by using a channel to defer starting the grpc server and loading external labels in an actor concurrently. Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 863d914 commit 40e798c

File tree

2 files changed

+93
-72
lines changed

2 files changed

+93
-72
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1414

1515
- [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants.
1616
- [#7335](https://github.com/thanos-io/thanos/pull/7335) Dependency: Update minio-go to v7.0.70 which includes support for EKS Pod Identity.
17+
- [#7403](https://github.com/thanos-io/thanos/pull/7403) Sidecar: fix startup sequence
1718

1819
### Added
1920

cmd/thanos/sidecar.go

Lines changed: 92 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -149,111 +149,119 @@ func runSidecar(
149149
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
150150
)
151151

152-
srv := httpserver.New(logger, reg, comp, httpProbe,
153-
httpserver.WithListen(conf.http.bindAddress),
154-
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
155-
httpserver.WithTLSConfig(conf.http.tlsConfig),
156-
)
152+
// Setup the HTTP server.
153+
{
154+
srv := httpserver.New(logger, reg, comp, httpProbe,
155+
httpserver.WithListen(conf.http.bindAddress),
156+
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
157+
httpserver.WithTLSConfig(conf.http.tlsConfig),
158+
)
157159

158-
g.Add(func() error {
159-
statusProber.Healthy()
160+
g.Add(func() error {
161+
statusProber.Healthy()
162+
return srv.ListenAndServe()
163+
}, func(err error) {
160164

161-
return srv.ListenAndServe()
162-
}, func(err error) {
163-
statusProber.NotReady(err)
164-
defer statusProber.NotHealthy(err)
165+
statusProber.NotReady(err)
166+
defer statusProber.NotHealthy(err)
165167

166-
srv.Shutdown(err)
167-
})
168+
srv.Shutdown(err)
169+
})
170+
}
168171

169-
// Setup all the concurrent groups.
172+
// Once we have loaded external labels from prometheus we can use this to signal the servers
173+
// that they can start now.
174+
readyToStartGRPC := make(chan struct{})
175+
176+
// Setup Prometheus Heartbeats.
170177
{
171178
promUp := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
172179
Name: "thanos_sidecar_prometheus_up",
173180
Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.",
174181
})
175182

176-
ctx := context.Background()
177-
// Only check Prometheus's flags when upload is enabled.
178-
if uploads {
179-
// Check prometheus's flags to ensure same sidecar flags.
180-
// We retry infinitely until we validated prometheus flags
183+
ctx, cancel := context.WithCancel(context.Background())
184+
g.Add(func() error {
185+
// Only check Prometheus's flags when upload is enabled.
186+
if uploads {
187+
// Check prometheus's flags to ensure same sidecar flags.
188+
// We retry infinitely until we validated prometheus flags
189+
err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
190+
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
191+
defer iterCancel()
192+
193+
if err := validatePrometheus(iterCtx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
194+
level.Warn(logger).Log(
195+
"msg", "failed to validate prometheus flags. Is Prometheus running? Retrying",
196+
"err", err,
197+
)
198+
return err
199+
}
200+
201+
level.Info(logger).Log(
202+
"msg", "successfully validated prometheus flags",
203+
)
204+
return nil
205+
})
206+
if err != nil {
207+
return errors.Wrap(err, "failed to validate prometheus flags")
208+
}
209+
}
210+
211+
// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
181212
err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
182213
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
183214
defer iterCancel()
184215

185-
if err := validatePrometheus(iterCtx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil {
216+
if err := m.BuildVersion(iterCtx); err != nil {
186217
level.Warn(logger).Log(
187-
"msg", "failed to validate prometheus flags. Is Prometheus running? Retrying",
218+
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
188219
"err", err,
189220
)
190221
return err
191222
}
192223

193224
level.Info(logger).Log(
194-
"msg", "successfully validated prometheus flags",
225+
"msg", "successfully loaded prometheus version",
195226
)
196227
return nil
197228
})
198229
if err != nil {
199-
return errors.Wrap(err, "failed to validate prometheus flags")
230+
return errors.Wrap(err, "failed to get prometheus version")
200231
}
201-
}
202-
203-
// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
204-
err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
205-
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
206-
defer iterCancel()
207232

208-
if err := m.BuildVersion(iterCtx); err != nil {
209-
level.Warn(logger).Log(
210-
"msg", "failed to fetch prometheus version. Is Prometheus running? Retrying",
211-
"err", err,
212-
)
213-
return err
214-
}
215-
216-
level.Info(logger).Log(
217-
"msg", "successfully loaded prometheus version",
218-
)
219-
return nil
220-
})
221-
if err != nil {
222-
return errors.Wrap(err, "failed to get prometheus version")
223-
}
233+
// Blocking query of external labels before joining as a Source Peer into gossip.
234+
// We retry infinitely until we reach and fetch labels from our Prometheus.
235+
err = runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
236+
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
237+
defer iterCancel()
224238

225-
// Blocking query of external labels before joining as a Source Peer into gossip.
226-
// We retry infinitely until we reach and fetch labels from our Prometheus.
227-
err = runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
228-
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
229-
defer iterCancel()
239+
if err := m.UpdateLabels(iterCtx); err != nil {
240+
level.Warn(logger).Log(
241+
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
242+
"err", err,
243+
)
244+
return err
245+
}
230246

231-
if err := m.UpdateLabels(iterCtx); err != nil {
232-
level.Warn(logger).Log(
233-
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
234-
"err", err,
247+
level.Info(logger).Log(
248+
"msg", "successfully loaded prometheus external labels",
249+
"external_labels", m.Labels().String(),
235250
)
236-
return err
251+
return nil
252+
})
253+
if err != nil {
254+
return errors.Wrap(err, "initial external labels query")
237255
}
238256

239-
level.Info(logger).Log(
240-
"msg", "successfully loaded prometheus external labels",
241-
"external_labels", m.Labels().String(),
242-
)
243-
return nil
244-
})
245-
if err != nil {
246-
return errors.Wrap(err, "initial external labels query")
247-
}
257+
if len(m.Labels()) == 0 {
258+
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
259+
}
260+
promUp.Set(1)
261+
statusProber.Ready()
248262

249-
if len(m.Labels()) == 0 {
250-
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
251-
}
252-
promUp.Set(1)
253-
statusProber.Ready()
263+
close(readyToStartGRPC)
254264

255-
ctx, cancel := context.WithCancel(context.Background())
256-
g.Add(func() error {
257265
// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
258266
// the external labels we apply.
259267
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
@@ -275,6 +283,8 @@ func runSidecar(
275283
cancel()
276284
})
277285
}
286+
287+
// Setup the Reloader.
278288
{
279289
ctx, cancel := context.WithCancel(context.Background())
280290
g.Add(func() error {
@@ -283,6 +293,8 @@ func runSidecar(
283293
cancel()
284294
})
285295
}
296+
297+
// Setup the gRPC server.
286298
{
287299
c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent)
288300

@@ -336,15 +348,23 @@ func runSidecar(
336348
grpcserver.WithMaxConnAge(conf.grpc.maxConnectionAge),
337349
grpcserver.WithTLSConfig(tlsCfg),
338350
)
351+
352+
ctx, cancel := context.WithCancel(context.Background())
339353
g.Add(func() error {
354+
select {
355+
case <-ctx.Done():
356+
return ctx.Err()
357+
case <-readyToStartGRPC:
358+
}
359+
340360
statusProber.Ready()
341361
return s.ListenAndServe()
342362
}, func(err error) {
363+
cancel()
343364
statusProber.NotReady(err)
344365
s.Shutdown(err)
345366
})
346367
}
347-
348368
if uploads {
349369
// The background shipper continuously scans the data directory and uploads
350370
// new blocks to Google Cloud Storage or an S3-compatible storage service.

0 commit comments

Comments
 (0)