Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func TestAlertmanager(t *testing.T) {
alertmanager := e2ecortex.NewAlertmanager(
"alertmanager",
mergeFlags(
AlertmanagerFlags,
AlertmanagerLocalFlags,
AlertmanagerFlags(),
AlertmanagerLocalFlags(),
),
"",
)
Expand Down Expand Up @@ -56,15 +56,14 @@ func TestAlertmanagerStoreAPI(t *testing.T) {
require.NoError(t, err)
defer s.Close()

minio := e2edb.NewMinio(9000, AlertmanagerS3Flags["-alertmanager.storage.s3.buckets"])
flags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())

minio := e2edb.NewMinio(9000, flags["-alertmanager.storage.s3.buckets"])
require.NoError(t, s.StartAndWaitReady(minio))

am := e2ecortex.NewAlertmanager(
"alertmanager",
mergeFlags(
AlertmanagerFlags,
AlertmanagerS3Flags,
),
flags,
"",
)

Expand Down
16 changes: 9 additions & 7 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s

// Start Cortex table-manager (running on current version since the backward compatibility
// test is about testing a rolling update of other services).
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand All @@ -87,7 +87,7 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s

// Start other Cortex components (ingester running on previous version).
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1))

// Wait until the distributor has updated the ring.
Expand All @@ -104,7 +104,7 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags(), map[string]string{
"-ingester.join-after": "10s",
}), "")
// Start ingester-2 on new version, to ensure the transfer is backward compatible.
Expand All @@ -117,7 +117,8 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
checkQueries(t, consul, distributor,
expectedVector,
previousImage,
flagsForOldImage, ChunksStorageFlags,
flagsForOldImage,
ChunksStorageFlags(),
now,
s,
1,
Expand All @@ -135,15 +136,15 @@ func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previo
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(dynamo, consul))

flagsForNewImage := mergeFlags(ChunksStorageFlags, map[string]string{
flagsForNewImage := mergeFlags(ChunksStorageFlags(), map[string]string{
"-distributor.replication-factor": "3",
})

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

// Start Cortex table-manager (running on current version since the backward compatibility
// test is about testing a rolling update of other services).
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand Down Expand Up @@ -174,7 +175,8 @@ func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previo
checkQueries(t, consul, distributor,
expectedVector,
previousImage,
flagsForPreviousImage, flagsForNewImage,
flagsForPreviousImage,
flagsForNewImage,
now,
s,
3,
Expand Down
2 changes: 1 addition & 1 deletion integration/chunks_delete_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestDeleteSeriesAllIndexBackends(t *testing.T) {
storeConfigs[i] = storeConfig{From: oldestStoreStartTime.Add(time.Duration(i) * perStoreDuration).UTC().Format("2006-01-02"), IndexStore: store}
}

flags := mergeFlags(ChunksStorageFlags, map[string]string{
flags := mergeFlags(ChunksStorageFlags(), map[string]string{
"-cassandra.addresses": cassandra.NetworkHTTPEndpoint(),
"-cassandra.keyspace": "tests", // keyspace gets created on startup if it does not exist
"-cassandra.replication-factor": "1",
Expand Down
2 changes: 1 addition & 1 deletion integration/chunks_storage_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
storeConfigs[i] = storeConfig{From: oldestStoreStartTime.Add(time.Duration(i) * perStoreDuration).Format("2006-01-02"), IndexStore: store}
}

storageFlags := mergeFlags(ChunksStorageFlags, map[string]string{
storageFlags := mergeFlags(ChunksStorageFlags(), map[string]string{
"-cassandra.addresses": cassandra.NetworkHTTPEndpoint(),
"-cassandra.keyspace": "tests", // keyspace gets created on startup if it does not exist
"-cassandra.replication-factor": "1",
Expand Down
86 changes: 49 additions & 37 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,46 +90,56 @@ receivers:
var (
cortexSchemaConfigYaml = buildSchemaConfigWith([]storeConfig{{From: "2019-03-20", IndexStore: "aws-dynamo"}})

AlertmanagerFlags = map[string]string{
"-alertmanager.configs.poll-interval": "1s",
"-alertmanager.web.external-url": "http://localhost/api/prom",
AlertmanagerFlags = func() map[string]string {
return map[string]string{
"-alertmanager.configs.poll-interval": "1s",
"-alertmanager.web.external-url": "http://localhost/api/prom",
}
}

AlertmanagerLocalFlags = map[string]string{
"-alertmanager.storage.type": "local",
"-alertmanager.storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
AlertmanagerLocalFlags = func() map[string]string {
return map[string]string{
"-alertmanager.storage.type": "local",
"-alertmanager.storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
}
}

AlertmanagerS3Flags = map[string]string{
"-alertmanager.storage.type": "s3",
"-alertmanager.storage.s3.buckets": "cortex-alerts",
"-alertmanager.storage.s3.force-path-style": "true",
"-alertmanager.storage.s3.url": fmt.Sprintf("s3://%s:%s@%s-minio-9000.:9000", e2edb.MinioAccessKey, e2edb.MinioSecretKey, networkName),
AlertmanagerS3Flags = func() map[string]string {
return map[string]string{
"-alertmanager.storage.type": "s3",
"-alertmanager.storage.s3.buckets": "cortex-alerts",
"-alertmanager.storage.s3.force-path-style": "true",
"-alertmanager.storage.s3.url": fmt.Sprintf("s3://%s:%s@%s-minio-9000.:9000", e2edb.MinioAccessKey, e2edb.MinioSecretKey, networkName),
}
}

RulerConfigs = map[string]string{
"-ruler.enable-sharding": "false",
"-ruler.poll-interval": "2s",
"-experimental.ruler.enable-api": "true",
"-ruler.storage.type": "s3",
"-ruler.storage.s3.buckets": "cortex-rules",
"-ruler.storage.s3.force-path-style": "true",
"-ruler.storage.s3.url": fmt.Sprintf("s3://%s:%s@%s-minio-9000.:9000", e2edb.MinioAccessKey, e2edb.MinioSecretKey, networkName),
RulerFlags = func() map[string]string {
return map[string]string{
"-ruler.enable-sharding": "false",
"-ruler.poll-interval": "2s",
"-experimental.ruler.enable-api": "true",
"-ruler.storage.type": "s3",
"-ruler.storage.s3.buckets": "cortex-rules",
"-ruler.storage.s3.force-path-style": "true",
"-ruler.storage.s3.url": fmt.Sprintf("s3://%s:%s@%s-minio-9000.:9000", e2edb.MinioAccessKey, e2edb.MinioSecretKey, networkName),
}
}

BlocksStorageFlags = map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "s3",
"-blocks-storage.tsdb.block-ranges-period": "1m",
"-blocks-storage.bucket-store.sync-interval": "5s",
"-blocks-storage.tsdb.retention-period": "5m",
"-blocks-storage.tsdb.ship-interval": "1m",
"-blocks-storage.tsdb.head-compaction-interval": "1s",
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-blocks-storage.s3.bucket-name": bucketName,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",
BlocksStorageFlags = func() map[string]string {
return map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "s3",
"-blocks-storage.tsdb.block-ranges-period": "1m",
"-blocks-storage.bucket-store.sync-interval": "5s",
"-blocks-storage.tsdb.retention-period": "5m",
"-blocks-storage.tsdb.ship-interval": "1m",
"-blocks-storage.tsdb.head-compaction-interval": "1s",
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-blocks-storage.s3.bucket-name": bucketName,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",
}
}

BlocksStorageConfig = buildConfigFromTemplate(`
Expand Down Expand Up @@ -163,11 +173,13 @@ blocks_storage:
MinioEndpoint: fmt.Sprintf("%s-minio-9000:9000", networkName),
})

ChunksStorageFlags = map[string]string{
"-dynamodb.url": fmt.Sprintf("dynamodb://u:p@%s-dynamodb.:8000", networkName),
"-table-manager.poll-interval": "1m",
"-schema-config-file": filepath.Join(e2e.ContainerSharedDir, cortexSchemaConfigFile),
"-table-manager.retention-period": "168h",
ChunksStorageFlags = func() map[string]string {
return map[string]string{
"-dynamodb.url": fmt.Sprintf("dynamodb://u:p@%s-dynamodb.:8000", networkName),
"-table-manager.poll-interval": "1m",
"-schema-config-file": filepath.Join(e2e.ContainerSharedDir, cortexSchemaConfigFile),
"-table-manager.retention-period": "168h",
}
}

ChunksStorageConfig = buildConfigFromTemplate(`
Expand Down
8 changes: 4 additions & 4 deletions integration/ingester_flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) {
// Start Cortex components.
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags(), "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags(), map[string]string{
"-ingester.max-transfer-retries": "0",
}), "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(distributor, querier, ingester, tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand Down
4 changes: 2 additions & 2 deletions integration/ingester_hand_over_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
)

func TestIngesterHandOverWithChunksStorage(t *testing.T) {
runIngesterHandOverTest(t, ChunksStorageFlags, func(t *testing.T, s *e2e.Scenario) {
runIngesterHandOverTest(t, ChunksStorageFlags(), func(t *testing.T, s *e2e.Scenario) {
dynamo := e2edb.NewDynamoDB()
require.NoError(t, s.StartAndWaitReady(dynamo))

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand Down
2 changes: 1 addition & 1 deletion integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestIngesterSharding(t *testing.T) {
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags
flags := BlocksStorageFlags()
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.sharding-strategy"] = testData.shardingStrategy
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)
Expand Down
2 changes: 1 addition & 1 deletion integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newSingleBinary(name string, join string) *e2ecortex.CortexService {

serv := e2ecortex.NewSingleBinary(
name,
mergeFlags(ChunksStorageFlags, flags),
mergeFlags(ChunksStorageFlags(), flags),
"",
8000,
)
Expand Down
6 changes: 3 additions & 3 deletions integration/querier_remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func TestQuerierRemoteRead(t *testing.T) {
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
flags := mergeFlags(ChunksStorageFlags, map[string]string{})
flags := mergeFlags(ChunksStorageFlags(), map[string]string{})

// Start dependencies.
dynamo := e2edb.NewDynamoDB()

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, dynamo))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand All @@ -63,7 +63,7 @@ func TestQuerierRemoteRead(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
Expand Down
10 changes: 4 additions & 6 deletions integration/querier_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,17 @@ func runQuerierShardingTest(t *testing.T, sharding bool) {
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

minio := e2edb.NewMinio(9000, BlocksStorageFlags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

flags := BlocksStorageFlags

flags = mergeFlags(flags, map[string]string{
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-querier.max-outstanding-requests-per-tenant": strconv.Itoa(numQueries), // To avoid getting errors.
})

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

if sharding {
// Use only single querier for each user.
flags["-frontend.max-queriers-per-tenant"] = "1"
Expand Down
5 changes: 2 additions & 3 deletions integration/querier_streaming_mixed_ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
chunksFlags := mergeFlags(ChunksStorageFlags, map[string]string{})

blockFlags := mergeFlags(BlocksStorageFlags, map[string]string{
chunksFlags := ChunksStorageFlags()
blockFlags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.tsdb.head-compaction-interval": "1m",
"-store-gateway.sharding-enabled": "false",
Expand Down
14 changes: 7 additions & 7 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags, map[string]string{
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags, map[string]string{
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) {

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags, map[string]string{
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
Expand Down Expand Up @@ -509,15 +509,15 @@ func TestQuerierWithChunksStorage(t *testing.T) {
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
flags := mergeFlags(ChunksStorageFlags, map[string]string{})
flags := ChunksStorageFlags()

// Start dependencies.
dynamo := e2edb.NewDynamoDB()

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, dynamo))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
tableManager := e2ecortex.NewTableManager("table-manager", flags, "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand Down Expand Up @@ -626,15 +626,15 @@ func TestHashCollisionHandling(t *testing.T) {
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
flags := mergeFlags(ChunksStorageFlags, map[string]string{})
flags := ChunksStorageFlags()

// Start dependencies.
dynamo := e2edb.NewDynamoDB()

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, dynamo))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags(), "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand Down
Loading