Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor storage interface for rf1 #13415

Merged
merged 2 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 10 additions & 40 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingesterrf1

import (
"crypto/rand"
"fmt"
"net/http"
"time"
Expand All @@ -9,14 +10,14 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

const (
Expand Down Expand Up @@ -140,47 +141,16 @@ func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error {
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
if err := i.store.PutWal(ctx, ch); err != nil {
reader := ch.Reader()
defer runutil.CloseWithLogOnErr(util_log.Logger, reader, "flushSegment")

newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)

if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+newUlid.String()), reader); err != nil {
i.metrics.chunksFlushFailures.Inc()
return fmt.Errorf("store put chunk: %w", err)
}
i.metrics.flushedChunksStats.Inc(1)
// TODO: report some flush metrics
return nil
}

// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process.
func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) {
byt, err := ch.Encoded()
if err != nil {
level.Error(i.logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
return
}

i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)

compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)

if ok && compressedSize > 0 {
i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}

utilization := ch.Data.Utilization()
i.metrics.chunkUtilization.Observe(utilization)
numEntries := desc.chunk.Size()
i.metrics.chunkEntries.Observe(float64(numEntries))
i.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()

boundsFrom, boundsTo := desc.chunk.Bounds()
i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds())
i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

i.metrics.flushedChunksBytesStats.Record(compressedSize)
i.metrics.flushedChunksLinesStats.Record(float64(numEntries))
i.metrics.flushedChunksUtilizationStats.Record(utilization)
i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Seconds())
}
142 changes: 37 additions & 105 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"os"
Expand All @@ -16,9 +17,10 @@ import (
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
lokilog "github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/storage/wal"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand All @@ -35,20 +37,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/health/grpc_health_v1"

server_util "github.com/grafana/loki/v3/pkg/util/server"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
indexstore "github.com/grafana/loki/v3/pkg/storage/stores/index"
"github.com/grafana/loki/v3/pkg/util"
)

Expand Down Expand Up @@ -121,7 +116,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester-rf1.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester-rf1.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
f.DurationVar(&cfg.RetainPeriod, "ingester-rf1.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
//f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
// f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester-rf1.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
f.IntVar(&cfg.TargetChunkSize, "ingester-rf1.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB
f.StringVar(&cfg.ChunkEncoding, "ingester-rf1.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding()))
Expand Down Expand Up @@ -159,13 +154,10 @@ type Wrapper interface {
Wrap(wrapped Interface) Interface
}

// Store is the store interface we need on the ingester.
type Store interface {
stores.ChunkWriter
stores.ChunkFetcher
storage.SelectStore
storage.SchemaConfigProvider
indexstore.StatsReader
// Storage is the store interface we need on the ingester.
type Storage interface {
PutObject(ctx context.Context, objectKey string, object io.Reader) error
Stop()
}

// Interface is an interface for the Ingester
Expand All @@ -174,8 +166,6 @@ type Interface interface {
http.Handler

logproto.PusherServer
//logproto.QuerierServer
//logproto.StreamDataServer

CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
Expand Down Expand Up @@ -218,7 +208,7 @@ type Ingester struct {
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher

store Store
store Storage
periodicConfigs []config.PeriodConfig

loopDone sync.WaitGroup
Expand All @@ -240,14 +230,10 @@ type Ingester struct {
terminateOnShutdown bool

// Only used by WAL & flusher to coordinate backpressure during replay.
//replayController *replayController
// replayController *replayController

metrics *ingesterMetrics

chunkFilter chunk.RequestChunkFilterer
extractorWrapper lokilog.SampleExtractorWrapper
pipelineWrapper lokilog.PipelineWrapper

streamRateCalculator *StreamRateCalculator

writeLogManager *writefailures.Manager
Expand All @@ -256,11 +242,25 @@ type Ingester struct {

// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
//recalculateOwnedStreams *recalculateOwnedStreams
// recalculateOwnedStreams *recalculateOwnedStreams
}

// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config,
periodConfigs []config.PeriodConfig,
storageConfig storage.Config,
clientMetrics storage.ClientMetrics,
limits Limits, configs *runtime.TenantConfigs,
registerer prometheus.Registerer,
writeFailuresCfg writefailures.Cfg,
metricsNamespace string,
logger log.Logger,
customStreamsTracker push.UsageTracker, readRing ring.ReadRing,
) (*Ingester, error) {
storage, err := objstore.New(periodConfigs, storageConfig, clientMetrics)
if err != nil {
return nil, err
}
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
Expand All @@ -279,13 +279,13 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
store: storage,
periodicConfigs: periodConfigs,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
//flushOnShutdownSwitch: &OnceSwitch{},
// flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"),
Expand All @@ -298,7 +298,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
segmentWriter: segmentWriter,
},
}
//i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})

// TODO: change flush on shutdown
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester-rf1", "ingester-rf1-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer))
Expand Down Expand Up @@ -334,18 +333,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
return i, nil
}

func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}

func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) {
i.extractorWrapper = wrapper
}

func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) {
i.pipelineWrapper = wrapper
}

// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
Expand Down Expand Up @@ -490,7 +477,7 @@ func (i *Ingester) running(ctx context.Context) error {
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs util.MultiError
//errs.Add(i.wal.Stop())
// errs.Add(i.wal.Stop())

//if i.flushOnShutdownSwitch.Get() {
// i.lifecycler.SetFlushOnShutdown(true)
Expand All @@ -502,7 +489,7 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()

//i.streamRateCalculator.Stop()
// i.streamRateCalculator.Stop()

// In case the flag to terminate on shutdown is set or this instance is marked to release its resources,
// we need to mark the ingester service as "failed", so Loki will shut down entirely.
Expand All @@ -511,6 +498,7 @@ func (i *Ingester) stopping(_ error) error {
i.removeShutdownMarkerFile()
return modules.ErrStopProcess
}
i.store.Stop()
return errs.Err()
}

Expand Down Expand Up @@ -581,7 +569,7 @@ func (i *Ingester) loop() {
func (i *Ingester) doFlushTick() {
i.flushCtx.lock.Lock()

//i.logger.Log("msg", "starting periodic flush")
// i.logger.Log("msg", "starting periodic flush")
// Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used.
currentFlushCtx := i.flushCtx

Expand Down Expand Up @@ -708,7 +696,7 @@ func createShutdownMarker(p string) error {
return err
}

dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777)
if err != nil {
return err
}
Expand All @@ -725,7 +713,7 @@ func removeShutdownMarker(p string) error {
return err
}

dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777)
if err != nil {
return err
}
Expand Down Expand Up @@ -811,7 +799,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
// Fetch a flush context and try to acquire the RLock
// The only time the Write Lock is held is when this context is no longer usable and a new one is being created.
// In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available.
//The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
// The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
currentFlushCtx := i.flushCtx
for !currentFlushCtx.lock.TryRLock() {
select {
Expand Down Expand Up @@ -863,7 +851,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -894,62 +882,6 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration {
return maxLookBack
}

// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
gcr, err := i.getChunkIDs(ctx, req)
err = server_util.ClientGrpcStatusAndError(err)
return gcr, err
}

// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
orgID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID))
pprof.SetGoroutineLabels(ctx)

asyncStoreMaxLookBack := i.asyncStoreMaxLookBack()
if asyncStoreMaxLookBack == 0 {
return &logproto.GetChunkIDsResponse{}, nil
}

reqStart := req.Start
reqStart = adjustQueryStartTime(asyncStoreMaxLookBack, reqStart, time.Now())

// parse the request
start, end := util.RoundToMilliseconds(reqStart, req.End)
matchers, err := syntax.ParseMatchers(req.Matchers, true)
if err != nil {
return nil, err
}

// get chunk references
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil)
if err != nil {
return nil, err
}

// todo (Callum) ingester should maybe store the whole schema config?
s := config.SchemaConfig{
Configs: i.periodicConfigs,
}

// build the response
resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}}
for _, chunks := range chunksGroups {
for _, chk := range chunks {
resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef))
}
}

return &resp, nil
}

// Watch implements grpc_health_v1.HealthCheck.
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
return nil
Expand Down
Loading
Loading