-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add per-tenant time sharding for long out-of-order ingestion #14711
Conversation
@@ -330,6 +330,7 @@ func (t *Loki) initDistributor() (services.Service, error) { | |||
logger := log.With(util_log.Logger, "component", "distributor") | |||
t.distributor, err = distributor.New( | |||
t.Cfg.Distributor, | |||
t.Cfg.Ingester, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat risky because, in microservices mode, the distributor and ingester can be ran with a different -ingester.max-chunk-age
CLI flag values. In practice this is very, very unlikely, but it's something we should be mindful of, because I am not sure if there is a way to avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we can not prevent it, even if it happens, because CLI flag can be assigned to ingesters only and distributors will be out of sync for this flag...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks awesome 💎
Also, I left some comments and one proposal
@@ -330,6 +330,7 @@ func (t *Loki) initDistributor() (services.Service, error) { | |||
logger := log.With(util_log.Logger, "component", "distributor") | |||
t.distributor, err = distributor.New( | |||
t.Cfg.Distributor, | |||
t.Cfg.Ingester, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we can not prevent it, even if it happens, because CLI flag can be assigned to ingesters only and distributors will be out of sync for this flag...
pkg/distributor/distributor.go
Outdated
} | ||
maybeShardByTime := func(stream logproto.Stream, labels labels.Labels, pushSize int) { | ||
if shardStreamsCfg.TimeShardingEnabled { | ||
streamsByTime := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would apply sharding by time only if the logs are older than now - d.ingesterCfg.MaxChunkAge/2
It would let the normal
logs to be ingested without the need to create a time-bucket every hour. At the same time, it would still allow old logs to be ingested without strict order requirements.
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this, but I'm somewhat worried that it might cause more issues than it solves:
- Assuming that the number of out-of-order logs is not insignificant, this will actually result in more total and more active streams than the current approach. We will have logs in both
{foo="bar"}
and{foo="bar", __time_shard="111_222"}
streams. - The "now" value of the distributors can be slightly different than the "now" value of the ingesters. And even if they are the same, some time passes until the ingesters handle the push request. So if we try to calculate in the distributors which logs will be rejected by the ingesters based on the current time, we might be wrong in a small percentage of the cases, resulting in TooFarBehind errors. We can add a safety margin, of course, but it would make everything more complicated.
So, because this is a per-tenant config, I think the tradeoffs are slightly better if we always inject the __time_shard__
label when it's enabled for a specific tenant, even for current data 🤔 Or maybe it should be configurable and we should support both ways? 🤔 What do you think, am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will actually result in more total and more active streams than the current approach.
I do not think so.
let say we ingest 4 logs lines with ts: now
, now-15m
, now - 1h
, now-1d
.
With your version, we would split these logs to 4 separate streams with __time_shard__
: A
, B
, C
, D
.
(If now
and now-15m
are from different time buckets).
With the changes that I propose, we would not create new buckets for now
, now-15m
. So, in result, we would have 3 streams: original
, C
, D
.
In general, I would try to guess if the data will be rejected by the ingester or not (maybe with some safe margin, maybe 15m is enough), because we would not create new streams, every hour, for the data that is fresh enough...
We already see that it almost doubles the stream count for the cases when a lot of fresh data is ingested. Also, it affects the chunks, because we get more underutilized chunks that are flushed due to the reason idle
.
Also, this change does not give us any drawbacks in terms of ingesting out-of-order old logs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the changes that I propose, we would not create new buckets for
now
,now-15m
. So, in result, we would have 3 streams:original
,C
,D
.
The question is how are the streams going to be distributed over time. Because in your example, if you keep getting more and more logs that have now
, now-15m
, now - 1h
, now-1d
timestamps as the value of now
changes, eventually you'd still cover the whole time range with 2 sets of streams - ones that do have __time_shard__
and the original ones that don't. Which probably will be worse than the current approach.
But if these out-of-order logs arrive only occasionally, we'd be much better off with what you suggest, for sure.
So I think I'll add another per-tenant config option called something like time_sharding_ignore_recent
with a value of 30 minutes or something like that. Any logs with timestamps greater than now -time_sharding_ignore_recent
won't be split into a __time_shard__
stream, but if we set the option to 0
, everything will be sharded (i.e. the current behavior from this PR will be applied). It should give us the flexibility to configure this to achieve the optimal results for different tenants. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, it would be ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should now be resolved by the most recent commit, PTAL: 7787735
What this PR does / why we need it:
This adds support for automatically splitting incoming log streams in the distributor by injecting a
__time_shard__
label. The value of that label is bounded by theingester.max_chunk_age/2
value, which should allow the ingesters to accept all logs without rejecting them as too far behind here:loki/pkg/ingester/stream.go
Lines 424 to 428 in c0856bf
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.