Skip to content

Commit

Permalink
chore: Refactor storage interface for rf1 (#13415)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 5, 2024
1 parent 7fc926e commit 0076bbd
Show file tree
Hide file tree
Showing 37 changed files with 279 additions and 347 deletions.
50 changes: 10 additions & 40 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingesterrf1

import (
"crypto/rand"
"fmt"
"net/http"
"time"
Expand All @@ -9,14 +10,14 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

const (
Expand Down Expand Up @@ -140,47 +141,16 @@ func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error {
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
if err := i.store.PutWal(ctx, ch); err != nil {
reader := ch.Reader()
defer runutil.CloseWithLogOnErr(util_log.Logger, reader, "flushSegment")

newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)

if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+newUlid.String()), reader); err != nil {
i.metrics.chunksFlushFailures.Inc()
return fmt.Errorf("store put chunk: %w", err)
}
i.metrics.flushedChunksStats.Inc(1)
// TODO: report some flush metrics
return nil
}

// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process.
func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) {
byt, err := ch.Encoded()
if err != nil {
level.Error(i.logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
return
}

i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)

compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)

if ok && compressedSize > 0 {
i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}

utilization := ch.Data.Utilization()
i.metrics.chunkUtilization.Observe(utilization)
numEntries := desc.chunk.Size()
i.metrics.chunkEntries.Observe(float64(numEntries))
i.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()

boundsFrom, boundsTo := desc.chunk.Bounds()
i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds())
i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

i.metrics.flushedChunksBytesStats.Record(compressedSize)
i.metrics.flushedChunksLinesStats.Record(float64(numEntries))
i.metrics.flushedChunksUtilizationStats.Record(utilization)
i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Seconds())
}
142 changes: 37 additions & 105 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"os"
Expand All @@ -16,9 +17,10 @@ import (
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
lokilog "github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/storage/wal"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand All @@ -35,20 +37,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/health/grpc_health_v1"

server_util "github.com/grafana/loki/v3/pkg/util/server"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
indexstore "github.com/grafana/loki/v3/pkg/storage/stores/index"
"github.com/grafana/loki/v3/pkg/util"
)

Expand Down Expand Up @@ -121,7 +116,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester-rf1.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester-rf1.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
f.DurationVar(&cfg.RetainPeriod, "ingester-rf1.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
//f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
// f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester-rf1.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
f.IntVar(&cfg.TargetChunkSize, "ingester-rf1.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB
f.StringVar(&cfg.ChunkEncoding, "ingester-rf1.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding()))
Expand Down Expand Up @@ -159,13 +154,10 @@ type Wrapper interface {
Wrap(wrapped Interface) Interface
}

// Store is the store interface we need on the ingester.
type Store interface {
stores.ChunkWriter
stores.ChunkFetcher
storage.SelectStore
storage.SchemaConfigProvider
indexstore.StatsReader
// Storage is the store interface we need on the ingester.
type Storage interface {
PutObject(ctx context.Context, objectKey string, object io.Reader) error
Stop()
}

// Interface is an interface for the Ingester
Expand All @@ -174,8 +166,6 @@ type Interface interface {
http.Handler

logproto.PusherServer
//logproto.QuerierServer
//logproto.StreamDataServer

CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
Expand Down Expand Up @@ -218,7 +208,7 @@ type Ingester struct {
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher

store Store
store Storage
periodicConfigs []config.PeriodConfig

loopDone sync.WaitGroup
Expand All @@ -240,14 +230,10 @@ type Ingester struct {
terminateOnShutdown bool

// Only used by WAL & flusher to coordinate backpressure during replay.
//replayController *replayController
// replayController *replayController

metrics *ingesterMetrics

chunkFilter chunk.RequestChunkFilterer
extractorWrapper lokilog.SampleExtractorWrapper
pipelineWrapper lokilog.PipelineWrapper

streamRateCalculator *StreamRateCalculator

writeLogManager *writefailures.Manager
Expand All @@ -256,11 +242,25 @@ type Ingester struct {

// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
//recalculateOwnedStreams *recalculateOwnedStreams
// recalculateOwnedStreams *recalculateOwnedStreams
}

// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config,
periodConfigs []config.PeriodConfig,
storageConfig storage.Config,
clientMetrics storage.ClientMetrics,
limits Limits, configs *runtime.TenantConfigs,
registerer prometheus.Registerer,
writeFailuresCfg writefailures.Cfg,
metricsNamespace string,
logger log.Logger,
customStreamsTracker push.UsageTracker, readRing ring.ReadRing,
) (*Ingester, error) {
storage, err := objstore.New(periodConfigs, storageConfig, clientMetrics)
if err != nil {
return nil, err
}
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
Expand All @@ -279,13 +279,13 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
store: storage,
periodicConfigs: periodConfigs,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
//flushOnShutdownSwitch: &OnceSwitch{},
// flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"),
Expand All @@ -298,7 +298,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
segmentWriter: segmentWriter,
},
}
//i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})

// TODO: change flush on shutdown
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester-rf1", "ingester-rf1-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer))
Expand Down Expand Up @@ -334,18 +333,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
return i, nil
}

func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}

func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) {
i.extractorWrapper = wrapper
}

func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) {
i.pipelineWrapper = wrapper
}

// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
Expand Down Expand Up @@ -490,7 +477,7 @@ func (i *Ingester) running(ctx context.Context) error {
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs util.MultiError
//errs.Add(i.wal.Stop())
// errs.Add(i.wal.Stop())

//if i.flushOnShutdownSwitch.Get() {
// i.lifecycler.SetFlushOnShutdown(true)
Expand All @@ -502,7 +489,7 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()

//i.streamRateCalculator.Stop()
// i.streamRateCalculator.Stop()

// In case the flag to terminate on shutdown is set or this instance is marked to release its resources,
// we need to mark the ingester service as "failed", so Loki will shut down entirely.
Expand All @@ -511,6 +498,7 @@ func (i *Ingester) stopping(_ error) error {
i.removeShutdownMarkerFile()
return modules.ErrStopProcess
}
i.store.Stop()
return errs.Err()
}

Expand Down Expand Up @@ -581,7 +569,7 @@ func (i *Ingester) loop() {
func (i *Ingester) doFlushTick() {
i.flushCtx.lock.Lock()

//i.logger.Log("msg", "starting periodic flush")
// i.logger.Log("msg", "starting periodic flush")
// Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used.
currentFlushCtx := i.flushCtx

Expand Down Expand Up @@ -708,7 +696,7 @@ func createShutdownMarker(p string) error {
return err
}

dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777)
if err != nil {
return err
}
Expand All @@ -725,7 +713,7 @@ func removeShutdownMarker(p string) error {
return err
}

dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777)
if err != nil {
return err
}
Expand Down Expand Up @@ -811,7 +799,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
// Fetch a flush context and try to acquire the RLock
// The only time the Write Lock is held is when this context is no longer usable and a new one is being created.
// In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available.
//The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
// The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
currentFlushCtx := i.flushCtx
for !currentFlushCtx.lock.TryRLock() {
select {
Expand Down Expand Up @@ -863,7 +851,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -894,62 +882,6 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration {
return maxLookBack
}

// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
gcr, err := i.getChunkIDs(ctx, req)
err = server_util.ClientGrpcStatusAndError(err)
return gcr, err
}

// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
orgID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID))
pprof.SetGoroutineLabels(ctx)

asyncStoreMaxLookBack := i.asyncStoreMaxLookBack()
if asyncStoreMaxLookBack == 0 {
return &logproto.GetChunkIDsResponse{}, nil
}

reqStart := req.Start
reqStart = adjustQueryStartTime(asyncStoreMaxLookBack, reqStart, time.Now())

// parse the request
start, end := util.RoundToMilliseconds(reqStart, req.End)
matchers, err := syntax.ParseMatchers(req.Matchers, true)
if err != nil {
return nil, err
}

// get chunk references
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil)
if err != nil {
return nil, err
}

// todo (Callum) ingester should maybe store the whole schema config?
s := config.SchemaConfig{
Configs: i.periodicConfigs,
}

// build the response
resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}}
for _, chunks := range chunksGroups {
for _, chk := range chunks {
resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef))
}
}

return &resp, nil
}

// Watch implements grpc_health_v1.HealthCheck.
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
return nil
Expand Down
Loading

0 comments on commit 0076bbd

Please sign in to comment.