Skip to content

Commit

Permalink
feat: add per-tenant time sharding for long out-of-order ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Nov 1, 2024
1 parent c0856bf commit ccf3d9a
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 10 deletions.
7 changes: 7 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3706,6 +3706,13 @@ shard_streams:
# CLI flag: -shard-streams.enabled
[enabled: <boolean> | default = true]

# Automatically shard streams by adding a __time_shard__ label, with values
# calculated from the log timestamps divided by MaxChunkAge/2. This allows the
# out-of-order ingestion of very old logs. If both flags are enabled,
# time-based sharding will happen before rate-based sharding.
# CLI flag: -shard-streams.time-sharding-enabled
[time_sharding_enabled: <boolean> | default = false]

# Whether to log sharding streams behavior or not. Not recommended for
# production environments.
# CLI flag: -shard-streams.logging-enabled
Expand Down
85 changes: 76 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"net/http"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -64,6 +65,8 @@ const (
ringKey = "distributor"

ringAutoForgetUnhealthyPeriods = 2

timeShardLabel = "__time_shard__"
)

var (
Expand Down Expand Up @@ -132,6 +135,7 @@ type Distributor struct {
services.Service

cfg Config
ingesterCfg ingester.Config
logger log.Logger
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
Expand Down Expand Up @@ -187,6 +191,7 @@ type Distributor struct {
// New a distributor creates.
func New(
cfg Config,
ingesterCfg ingester.Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
Expand Down Expand Up @@ -245,6 +250,7 @@ func New(

d := &Distributor{
cfg: cfg,
ingesterCfg: ingesterCfg,
logger: logger,
clientCfg: clientCfg,
tenantConfigs: configs,
Expand Down Expand Up @@ -448,6 +454,28 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
var validationErrors util.GroupedErrors
validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID)

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
} else {
streams = append(streams, KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
Stream: stream,
})
}
}
maybeShardByTime := func(stream logproto.Stream, labels labels.Labels, pushSize int) {
if shardStreamsCfg.TimeShardingEnabled {
streamsByTime := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2)
for _, ts := range streamsByTime {
maybeShardByRate(ts.Stream, ts.linesTotalLen)
}
} else {
maybeShardByRate(stream, pushSize)
}
}

func() {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
Expand All @@ -456,6 +484,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
sp.LogKV("event", "finished to validate request")
}()
}

for _, stream := range req.Streams {
// Return early if stream does not contain any entries
if len(stream.Entries) == 0 {
Expand Down Expand Up @@ -534,15 +563,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
} else {
streams = append(streams, KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
Stream: stream,
})
}
maybeShardByTime(stream, lbs, pushSize)
}
}()

Expand Down Expand Up @@ -721,6 +742,52 @@ func hasAnyLevelLabels(l labels.Labels) (string, bool) {
return "", false
}

type streamWithTimeShard struct {
logproto.Stream
linesTotalLen int
}

// This should shard the stream into multiple sub-streams based on the log
// timestamps, but with no new alocations for the log entries. It will sort them
// in-place in the given stream object (so it may modify it!) and reference
// sub-slices of the same stream.Entries slice.
func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen time.Duration) []streamWithTimeShard {
entries := stream.Entries
entriesLen := len(entries)
if entriesLen == 0 {
return nil
}

slices.SortStableFunc(entries, func(a, b logproto.Entry) int { return a.Timestamp.Compare(b.Timestamp) })

result := make([]streamWithTimeShard, 0, entries[entriesLen-1].Timestamp.Sub(entries[0].Timestamp)/timeShardLen)
labelBuilder := labels.NewBuilder(lbls)

for startIdx := 0; startIdx < entriesLen; /* the index is changed below */ {
timeShardStart := entries[startIdx].Timestamp.Truncate(timeShardLen)
timeShardEnd := timeShardStart.Add(timeShardLen)

endIdx := startIdx + 1
linesTotalLen := len(entries[startIdx].Line)
for ; endIdx < entriesLen && entries[endIdx].Timestamp.Before(timeShardEnd); endIdx++ {
linesTotalLen += len(entries[endIdx].Line)
}

shardLbls := labelBuilder.Set(timeShardLabel, fmt.Sprintf("%d_%d", timeShardStart.Unix(), timeShardEnd.Unix())).Labels()
result = append(result, streamWithTimeShard{
Stream: logproto.Stream{
Labels: shardLbls.String(),
Hash: shardLbls.Hash(),
Entries: stream.Entries[startIdx:endIdx],
},
linesTotalLen: linesTotalLen,
})

startIdx = endIdx
}
return result
}

// shardStream shards (divides) the given stream into N smaller streams, where
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
Expand Down
176 changes: 175 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,178 @@ func TestStreamShardAcrossCalls(t *testing.T) {
})
}

func TestStreamShardByTime(t *testing.T) {
baseTimestamp := time.Date(2024, 10, 31, 12, 34, 56, 0, time.UTC)
t.Logf("Base timestamp: %s (unix %d)", baseTimestamp.Format(time.RFC3339Nano), baseTimestamp.Unix())

for _, tc := range []struct {
test string
labels string
entries []logproto.Entry
timeShardLen time.Duration
expResult []streamWithTimeShard
}{
{
test: "zero shard because no entries",
labels: "{app='myapp'}",
entries: nil,
timeShardLen: time.Hour,
expResult: nil,
},
{
test: "single shard with one entry",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
//{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
},
timeShardLen: time.Hour,
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
}}, linesTotalLen: 3},
},
},
{
test: "single shard with two entries",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
},
timeShardLen: time.Hour,
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
}}, linesTotalLen: 6},
},
},
{
test: "single shard with two entries reversed",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
{Timestamp: baseTimestamp, Line: "foo"},
},
timeShardLen: time.Hour,
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
}}, linesTotalLen: 6},
},
},
{
test: "two shards without a gap",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
{Timestamp: baseTimestamp.Add(time.Hour), Line: "baz"},
},
timeShardLen: time.Hour,
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
}}, linesTotalLen: 6},
{Stream: logproto.Stream{Labels: `{__time_shard__="1730379600_1730383200", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Add(time.Hour), Line: "baz"},
}}, linesTotalLen: 3},
},
},
{
test: "two shards with a gap",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
{Timestamp: baseTimestamp.Add(4 * time.Hour), Line: "baz"},
},
timeShardLen: time.Hour,
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
}}, linesTotalLen: 6},
{Stream: logproto.Stream{Labels: `{__time_shard__="1730390400_1730394000", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Add(4 * time.Hour), Line: "baz"},
}}, linesTotalLen: 3},
},
},
{
test: "bigger shard len",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
{Timestamp: baseTimestamp.Add(6 * time.Hour), Line: "baz"},
},
timeShardLen: 24 * time.Hour,
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730332800_1730419200", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
{Timestamp: baseTimestamp.Add(6 * time.Hour), Line: "baz"},
}}, linesTotalLen: 9},
},
},
{
test: "longer messy gaps",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "11"},
{Timestamp: baseTimestamp, Line: "13"},
{Timestamp: baseTimestamp, Line: "14"},
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "12"},
{Timestamp: baseTimestamp.Add(2 * time.Hour), Line: "32"},
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(2 * time.Hour), Line: "31"},
{Timestamp: baseTimestamp.Add(5 * time.Hour), Line: "41"},
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(time.Hour), Line: "21"},
},
timeShardLen: time.Hour,
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "11"},
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "12"},
{Timestamp: baseTimestamp, Line: "13"},
{Timestamp: baseTimestamp, Line: "14"},
}}, linesTotalLen: 8},
{Stream: logproto.Stream{Labels: `{__time_shard__="1730379600_1730383200", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(time.Hour), Line: "21"},
}}, linesTotalLen: 2},
{Stream: logproto.Stream{Labels: `{__time_shard__="1730383200_1730386800", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(2 * time.Hour), Line: "31"},
{Timestamp: baseTimestamp.Add(2 * time.Hour), Line: "32"},
}}, linesTotalLen: 4},
{Stream: logproto.Stream{Labels: `{__time_shard__="1730394000_1730397600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Add(5 * time.Hour), Line: "41"},
}}, linesTotalLen: 2},
},
},
} {
t.Run(tc.test, func(t *testing.T) {
lbls, err := syntax.ParseLabels(tc.labels)
require.NoError(t, err)
stream := logproto.Stream{
Labels: tc.labels,
Hash: lbls.Hash(),
Entries: tc.entries,
}

shardedStreams := shardStreamByTime(stream, lbls, tc.timeShardLen)
require.Len(t, shardedStreams, len(tc.expResult))

for i, ss := range shardedStreams {
assert.Equal(t, tc.expResult[i].linesTotalLen, ss.linesTotalLen)
assert.Equal(t, tc.expResult[i].Labels, ss.Labels)
assert.EqualValues(t, tc.expResult[i].Entries, ss.Entries)
}
})
}
}

func generateEntries(n int) []logproto.Entry {
var entries []logproto.Entry
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -1386,7 +1558,9 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)

d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
ingesterConfig := ingester.Config{MaxChunkAge: 2 * time.Hour}

d, err := New(distributorConfig, ingesterConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
Expand Down
3 changes: 3 additions & 0 deletions pkg/distributor/shardstreams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
type Config struct {
Enabled bool `yaml:"enabled" json:"enabled" doc:"description=Automatically shard streams to keep them under the per-stream rate limit. Sharding is dictated by the desired rate."`

TimeShardingEnabled bool `yaml:"time_sharding_enabled" json:"time_sharding_enabled" doc:"description=Automatically shard streams by adding a __time_shard__ label, with values calculated from the log timestamps divided by MaxChunkAge/2. This allows the out-of-order ingestion of very old logs. If both flags are enabled, time-based sharding will happen before rate-based sharding."`

LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled" doc:"description=Whether to log sharding streams behavior or not. Not recommended for production environments."`

// DesiredRate is the threshold used to shard the stream into smaller pieces.
Expand All @@ -18,6 +20,7 @@ type Config struct {

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, prefix+".enabled", true, "Automatically shard streams to keep them under the per-stream rate limit")
fs.BoolVar(&cfg.TimeShardingEnabled, prefix+".time-sharding-enabled", false, "Automatically shard streams by time (in MaxChunkAge/2 buckets), to allow out-of-order ingestion of very old logs.")
fs.BoolVar(&cfg.LoggingEnabled, prefix+".logging-enabled", false, "Enable logging when sharding streams")
cfg.DesiredRate.Set("1536KB") //nolint:errcheck
fs.Var(&cfg.DesiredRate, prefix+".desired-rate", "threshold used to cut a new shard. Default (1536KB) means if a rate is above 1536KB/s, it will be sharded.")
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func (t *Loki) initDistributor() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "distributor")
t.distributor, err = distributor.New(
t.Cfg.Distributor,
t.Cfg.Ingester,
t.Cfg.IngesterClient,
t.tenantConfigs,
t.ring,
Expand Down

0 comments on commit ccf3d9a

Please sign in to comment.