Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

mt-update-ttl tweak default concurrency, stats fix, properly use logrus #1167

Merged
merged 3 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions cmd/mt-update-ttl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (

startTs = flag.Int("start-timestamp", 0, "timestamp at which to start, defaults to 0")
endTs = flag.Int("end-timestamp", math.MaxInt32, "timestamp at which to stop, defaults to int max")
numThreads = flag.Int("threads", 1, "number of workers to use to process data")
numThreads = flag.Int("threads", 10, "number of workers to use to process data")

verbose = flag.Bool("verbose", false, "show every record being processed")

Expand Down Expand Up @@ -85,7 +85,7 @@ func main() {
session, err := NewCassandraStore()

if err != nil {
panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err))
log.Fatalf("Failed to instantiate cassandra: %s", err)
}

update(session, ttl, tableIn, tableOut)
Expand Down Expand Up @@ -182,26 +182,26 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, session *gocql.Sessi
query = fmt.Sprintf("INSERT INTO %s (data, key, ts) values(?,?,?) USING TTL %d", tableOut, newTTL)
}
if *verbose {
log.Printf("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n", id, atomic.LoadUint64(&doneRows)+1, tableIn, key, ts, query, data)
log.Infof("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n", id, atomic.LoadUint64(&doneRows)+1, tableIn, key, ts, query, data)
}

err := session.Query(query, data, key, ts).Exec()
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: id=%d failed updating %s %s %d: %q", id, tableOut, key, ts, err)
log.Errorf("id=%d failed updating %s %s %d: %q", id, tableOut, key, ts, err)
}

doneRowsSnap := atomic.AddUint64(&doneRows, 1)
if doneRowsSnap%10000 == 0 {
doneKeysSnap := atomic.LoadUint64(&doneKeys)
completeness := completenessEstimate(token)
log.Printf("WORKING: id=%d processed %d keys, %d rows. (last token: %d, completeness estimate %.1f%%)", id, doneKeysSnap, doneRowsSnap, token, completeness*100)
log.Infof("WORKING: id=%d processed %d keys, %d rows. (last token: %d, completeness estimate %.1f%%)", id, doneKeysSnap, doneRowsSnap, token, completeness*100)
}
}
err := iter.Close()
if err != nil {
doneKeysSnap := atomic.LoadUint64(&doneKeys)
doneRowsSnap := atomic.LoadUint64(&doneRows)
fmt.Fprintf(os.Stderr, "ERROR: id=%d failed querying %s: %q. processed %d keys, %d rows", id, tableIn, err, doneKeysSnap, doneRowsSnap)
log.Errorf("id=%d failed querying %s: %q. processed %d keys, %d rows", id, tableIn, err, doneKeysSnap, doneRowsSnap)
}
atomic.AddUint64(&doneKeys, 1)
}
Expand All @@ -227,11 +227,13 @@ func update(session *gocql.Session, ttl int, tableIn, tableOut string) {
close(jobs)
err := keyItr.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: failed querying %s: %q. processed %d keys, %d rows", tableIn, err, doneKeys, doneRows)
doneKeysSnap := atomic.LoadUint64(&doneKeys)
doneRowsSnap := atomic.LoadUint64(&doneRows)
log.Errorf("failed querying %s: %q. processed %d keys, %d rows", tableIn, err, doneKeysSnap, doneRowsSnap)
wg.Wait()
os.Exit(2)
}

wg.Wait()
log.Printf("DONE. Processed %d keys, %d rows", doneKeys, doneRows)
log.Infof("DONE. Processed %d keys, %d rows", doneKeys, doneRows)
}
2 changes: 1 addition & 1 deletion docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ Flags:
-start-timestamp int
timestamp at which to start, defaults to 0
-threads int
number of workers to use to process data (default 1)
number of workers to use to process data (default 10)
-verbose
show every record being processed
```
Expand Down