Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
typo's and cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Oct 21, 2018
1 parent f6bdc15 commit 63f6f04
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 17 deletions.
10 changes: 5 additions & 5 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Arc

if inMemory {
// bigtable uses partition ID in the key prefix, so an "update" that changes the partition for
// an existing metricDef will just create a new row in the table and wont remove the old row.
// an existing metricDef will just create a new row in the table and won't remove the old row.
// So we need to explicitly delete the old entry.
if oldPartition != partition {
go func() {
Expand Down Expand Up @@ -255,7 +255,7 @@ func (b *BigtableIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, par

if inMemory {
// bigtable uses partition ID in the key prefix, so an "update" that changes the partition for
// an existing metricDef will just create a new row in the table and wont remove the old row.
// an existing metricDef will just create a new row in the table and won't remove the old row.
// So we need to explicitly delete the old entry.
if oldPartition != partition {
go func() {
Expand Down Expand Up @@ -289,7 +289,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
b.MemoryIdx.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we wont update the LastSave timestamp. The next time
// this will fail and we won't update the LastSave timestamp. The next time
// the metric is seen, the previous lastSave timestamp will still be in place and so
// we will try and save again. This will continue until we are successful or the
// lastSave timestamp become more then 1.5 x UpdateInterval, in which case we will
Expand Down Expand Up @@ -341,7 +341,7 @@ func (b *BigtableIdx) LoadPartition(partition int32, defs []schema.MetricDefinit
return true
}, bigtable.RowFilter(bigtable.FamilyFilter(COLUMN_FAMILY)))
if err != nil {
log.Fatalf("bigtable-idx: failed to load defs form Bigtable. %s", err)
log.Fatalf("bigtable-idx: failed to load defs from Bigtable. %s", err)
}
if marshalErr != nil {
log.Fatalf("bigtable-idx: failed to marshal row to metricDef. %s", marshalErr)
Expand Down Expand Up @@ -390,7 +390,7 @@ func (b *BigtableIdx) processWriteQueue() {
errs, err := b.tbl.ApplyBulk(context.Background(), rowKeys, mutations)
if err != nil {
statQueryInsertFail.Add(len(rowKeys))
log.Errorf("bigtable-idx: Failed to write %d defs to bigtable. they wont be retried. %s", len(rowKeys), err)
log.Errorf("bigtable-idx: Failed to write %d defs to bigtable. they won't be retried. %s", len(rowKeys), err)
complete = true
} else if len(errs) > 0 {
var failedRowKeys []string
Expand Down
1 change: 0 additions & 1 deletion idx/bigtable/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func ConfigSetup() {
btIdx.BoolVar(&CliConfig.CreateCF, "create-cf", CliConfig.CreateCF, "enable the creation of the table and column families")

globalconf.Register("bigtable-idx", btIdx)
return
}

func ConfigProcess() {
Expand Down
6 changes: 3 additions & 3 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (c *CasIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive,

if inMemory {
// Cassandra uses partition id as the partitioning key, so an "update" that changes the partition for
// an existing metricDef will just create a new row in the table and wont remove the old row.
// an existing metricDef will just create a new row in the table and won't remove the old row.
// So we need to explicitly delete the old entry.
if oldPartition != partition {
c.deleteDefAsync(point.MKey, oldPartition)
Expand Down Expand Up @@ -309,7 +309,7 @@ func (c *CasIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partitio

if inMemory {
// Cassandra uses partition id as the partitioning key, so an "update" that changes the partition for
// an existing metricDef will just create a new row in the table and wont remove the old row.
// an existing metricDef will just create a new row in the table and won't remove the old row.
// So we need to explicitly delete the old entry.
if oldPartition != partition {
c.deleteDefAsync(mkey, oldPartition)
Expand Down Expand Up @@ -338,7 +338,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
c.MemoryIdx.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we wont update the LastSave timestamp. The next time
// this will fail and we won't update the LastSave timestamp. The next time
// the metric is seen, the previous lastSave timestamp will still be in place and so
// we will try and save again. This will continue until we are successful or the
// lastSave timestamp become more then 1.5 x UpdateInterval, in which case we will
Expand Down
2 changes: 1 addition & 1 deletion idx/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func TestFind(t *testing.T) {
So(nodes, ShouldHaveLength, 0)
})

Convey("When searching nodes that dont exist", t, func() {
Convey("When searching nodes that don't exist", t, func() {
nodes, err := ix.Find(1, "foo.demo.blah.*", 0)
So(err, ShouldBeNil)
So(nodes, ShouldHaveLength, 0)
Expand Down
2 changes: 1 addition & 1 deletion idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (m *MemoryIdx) Load(defs []schema.MetricDefinition) int {
}

// as we are loading the metricDefs from a persistent store, set the lastSave
// to the lastUpdate timestamp. This wont exactly match the true lastSave Timstamp,
// to the lastUpdate timestamp. This won't exactly match the true lastSave Timstamp,
// but it will be close enough and it will always be true that the lastSave was at
// or after this time. For metrics that are sent at or close to real time (the typical
// use case), then the value will be within a couple of seconds of the true lastSave.
Expand Down
2 changes: 1 addition & 1 deletion idx/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func testFind(t *testing.T) {
So(nodes, ShouldHaveLength, 0)
})

Convey("When searching nodes that dont exist", t, func() {
Convey("When searching nodes that don't exist", t, func() {
nodes, err := ix.Find(1, "foo.demo.blah.*", 0)
So(err, ShouldBeNil)
So(nodes, ShouldHaveLength, 0)
Expand Down
7 changes: 3 additions & 4 deletions store/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func NewStore(cfg *StoreConfig, ttls []uint32, schemaMaxChunkSpan uint32) (*Stor
readLimiter: util.NewLimiter(cfg.ReadConcurrency),
cfg: cfg,
}
s.wg.Add(cfg.WriteConcurrency)
for i := 0; i < cfg.WriteConcurrency; i++ {
s.wg.Add(1)
// Each processWriteQueue thread uses a channel and a buffer for queuing unwritten chunks.
// In total, each processWriteQueue thread should not have more then "write-queue-size" chunks
// that are queued. To ensure this, set the channel size to "write-queue-size" - "write-max-flush-size"
Expand Down Expand Up @@ -398,7 +398,7 @@ func (s *Store) Search(ctx context.Context, key schema.AMKey, ttl, start, end ui
agg = key.Archive.String()
}
// filter the results to just the agg method (Eg raw, min_60, max_1800, etc..) and the timerange we want.
// we fetch all columnFamilies (which are the different TTLS). Typically there will be only one columnFamily
// we fetch all columnFamilies (which are the different TTLs). Typically there will be only one columnFamily
// that has data, unless the TTL of the agg has changed. In which case we want all columnFamilies anyway.
filter := bigtable.ChainFilters(
bigtable.ColumnFilter(agg),
Expand Down Expand Up @@ -431,8 +431,7 @@ func (s *Store) Search(ctx context.Context, key schema.AMKey, ttl, start, end ui
}
chunks++

// This function is called serially so we dont need synchronization around adding to
// itgens.
// This function is called serially so we don't need synchronization here
itgens = append(itgens, *itgen)
}
}
Expand Down
2 changes: 1 addition & 1 deletion store/bigtable/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type StoreConfig struct {
}

func (cfg *StoreConfig) Validate(schemaMaxChunkSpan uint32) error {
// If we dont have any write threads, then WriteMaxFlushSize and WriteQueueSize
// If we don't have any write threads, then WriteMaxFlushSize and WriteQueueSize
// are not used. If we do have write threads, then we need to make sure that
// the the writeMaxFlushSize is not larger then the bigtable hardcoded limit of 100k
// and that the writeQueue size is larger then the maxFlush.
Expand Down

0 comments on commit 63f6f04

Please sign in to comment.