Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Make partition concurrency configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
shanson7 committed Apr 4, 2019
1 parent 477a4a5 commit 43c49ef
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
9 changes: 6 additions & 3 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,20 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
func (c *CasIdx) rebuildIndex() {
log.Info("cassandra-idx: Rebuilding Memory Index from metricDefinitions in Cassandra")
pre := time.Now()
gate := make(chan struct{}, c.cfg.initLoadConcurrency)
var wg sync.WaitGroup
var num int
for _, partition := range cluster.Manager.GetPartitions() {
wg.Add(1)
go func(p int32) {
log.Infof("Loading partition %s", p)
gate <- struct{}{}
defer func() {
wg.Done()
<-gate
}()
var defs []schema.MetricDefinition
defs = c.LoadPartitions([]int32{p}, defs, pre)
num += c.MemoryIndex.LoadPartition(p, defs)
wg.Done()
log.Infof("Done loading partition %s", p)
}(partition)
}
wg.Wait()
Expand Down
3 changes: 3 additions & 0 deletions idx/cassandra/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type IdxConfig struct {
numConns int
protoVer int
disableInitialHostLookup bool
initLoadConcurrency int
}

// NewIdxConfig returns IdxConfig with default values set.
Expand All @@ -64,6 +65,7 @@ func NewIdxConfig() *IdxConfig {
auth: false,
username: "cassandra",
password: "cassandra",
initLoadConcurrency: 1,
}
}

Expand Down Expand Up @@ -92,6 +94,7 @@ func ConfigSetup() *flag.FlagSet {
casIdx.BoolVar(&CliConfig.updateCassIdx, "update-cassandra-index", CliConfig.updateCassIdx, "synchronize index changes to cassandra. not all your nodes need to do this.")
casIdx.DurationVar(&CliConfig.updateInterval, "update-interval", CliConfig.updateInterval, "frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates")
casIdx.DurationVar(&CliConfig.pruneInterval, "prune-interval", CliConfig.pruneInterval, "Interval at which the index should be checked for stale series.")
casIdx.IntVar(&CliConfig.initLoadConcurrency, "init-load-concurrency", CliConfig.initLoadConcurrency, "Number of partitions to load concurrently on startup.")
casIdx.IntVar(&CliConfig.protoVer, "protocol-version", CliConfig.protoVer, "cql protocol version to use")
casIdx.BoolVar(&CliConfig.createKeyspace, "create-keyspace", CliConfig.createKeyspace, "enable the creation of the index keyspace and tables, only one node needs this")
casIdx.StringVar(&CliConfig.schemaFile, "schema-file", CliConfig.schemaFile, "File containing the needed schemas in case database needs initializing")
Expand Down

0 comments on commit 43c49ef

Please sign in to comment.