Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [BUGFIX] Fix configuration for TLS server validation, TLS skip verify was hardcoded to true for all TLS configurations and prevented validation of server certificates. #3030
* [BUGFIX] Fixes the Alertmanager panicking when no `-alertmanager.web.external-url` is provided. #3017
* [BUGFIX] Fixes the registration of the Alertmanager API metrics `cortex_alertmanager_alerts_received_total` and `cortex_alertmanager_alerts_invalid_total`. #3065
* [BUGFIX] Ingester: If push request contained both valid and invalid samples, valid samples were ingested but not stored to WAL of the chunks storage. This has been fixed. #3067

## 1.3.0 / 2020-08-21

Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,6 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
}
}

if firstPartialErr != nil {
// grpcForwardableError turns the error into a string so it no longer references `req`
return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)
}

if record != nil {
// Log the record only if there was no error in ingestion.
if err := i.wal.Log(record); err != nil {
Expand All @@ -445,6 +440,11 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
recordPool.Put(record)
}

if firstPartialErr != nil {
// grpcForwardableError turns the error into a string so it no longer references `req`
return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)
}

return &client.WriteResponse{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (w *walWrapper) Stop() {
}

func (w *walWrapper) Log(record *WALRecord) error {
if record == nil {
if record == nil || (len(record.Series) == 0 && len(record.Samples) == 0) {
return nil
}
select {
Expand Down
37 changes: 37 additions & 0 deletions pkg/ingester/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package ingester

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"testing"
Expand All @@ -14,6 +16,8 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -70,6 +74,39 @@ func TestWAL(t *testing.T) {

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
}

cfg.WALConfig.WALEnabled = true
cfg.WALConfig.CheckpointEnabled = true

// Start a new ingester and recover the WAL.
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)

userID := userIDs[0]
sampleStream := testData[userID][0]
lastSample := sampleStream.Values[len(sampleStream.Values)-1]

// In-order and out of order sample in the same request.
metric := client.FromLabelAdaptersToLabels(client.FromMetricsToLabelAdapters(sampleStream.Metric))
outOfOrderSample := client.Sample{TimestampMs: int64(lastSample.Timestamp - 10), Value: 99}
inOrderSample := client.Sample{TimestampMs: int64(lastSample.Timestamp + 10), Value: 999}

ctx := user.InjectOrgID(context.Background(), userID)
_, err = ing.Push(ctx, client.ToWriteRequest(
[]labels.Labels{metric, metric},
[]client.Sample{outOfOrderSample, inOrderSample}, nil, client.API))
require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(makeMetricValidationError(sampleOutOfOrder, metric,
fmt.Errorf("sample timestamp out of order; last timestamp: %v, incoming timestamp: %v", lastSample.Timestamp, model.Time(outOfOrderSample.TimestampMs))), userID).Error()), err)

// We should have logged the in-order sample.
testData[userID][0].Values = append(testData[userID][0].Values, model.SamplePair{
Timestamp: model.Time(inOrderSample.TimestampMs),
Value: model.SampleValue(inOrderSample.Value),
})

// Check samples after restart from WAL.
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)
retrieveTestSamples(t, ing, userIDs, testData)
}

func TestCheckpointRepair(t *testing.T) {
Expand Down