Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
instead of specifying tables, specify the old and new TTL
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Dec 11, 2018
1 parent 9b2d8e9 commit cb02c1b
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions cmd/mt-update-ttl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ func main() {
flag.BoolVar(&verbose, "verbose", false, "show every record being processed")

flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-update-ttl [flags] ttl table-in [table-out]")
fmt.Fprintln(os.Stderr, "mt-update-ttl [flags] ttl-old ttl-new")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Adjusts the data in Cassandra to use a new TTL value. The TTL is applied counting from the timestamp of the data")
fmt.Fprintln(os.Stderr, "If table-out not specified or same as table-in, will update in place. Otherwise will not touch input table and store results in table-out")
fmt.Fprintln(os.Stderr, "Unless you disable create-keyspace, table-out is created when necessary")
fmt.Fprintln(os.Stderr, "Not supported yet: for the per-ttl tables as of 0.7, automatically putting data in the right table")
fmt.Fprintln(os.Stderr, "Automatically resolves the corresponding tables based on ttl value. If the table stays the same, will update in place. Otherwise will copy to the new table, not touching the input data")
fmt.Fprintln(os.Stderr, "Unless you disable create-keyspace, tables are created as needed")
fmt.Println("Flags:")
flag.PrintDefaults()
os.Exit(-1)
Expand All @@ -82,27 +81,26 @@ func main() {

stats.NewDevnull() // make sure metrics don't pile up without getting discarded

if flag.NArg() < 2 || flag.NArg() > 3 {
if flag.NArg() != 2 {
flag.Usage()
os.Exit(2)
}

ttl := dur.MustParseNDuration("ttl", flag.Arg(0))
tableIn, tableOut := flag.Arg(1), flag.Arg(1)
if flag.NArg() == 3 {
tableOut = flag.Arg(2)
}
ttlIn := dur.MustParseNDuration("ttl", flag.Arg(0))
ttlOut := dur.MustParseNDuration("ttl", flag.Arg(1))

// note: cassandraStore will not be aware via its TTLTables attribute of the other, pre-existing tables,
// only of the table we're copying to. but that's ok because we don't exercise any functionality that
// needs that
store, err := cassandra.NewCassandraStore(cassandra.CliConfig, []uint32{ttl})
store, err := cassandra.NewCassandraStore(cassandra.CliConfig, []uint32{ttlIn, ttlOut})

if err != nil {
log.Fatalf("Failed to instantiate cassandra: %s", err)
}

update(store, int(ttl), tableIn, tableOut)
tableIn, tableOut := store.TTLTables[ttlIn].Name, store.TTLTables[ttlOut].Name

update(store, int(ttlOut), tableIn, tableOut)
}

func getTTL(now, ts, ttl int) int {
Expand All @@ -126,7 +124,7 @@ func completenessEstimate(token int64) float64 {
return ((float64(token) / float64(maxToken)) + 1) / 2
}

func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.CassandraStore, startTime, endTime, ttl int, tableIn, tableOut string) {
func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.CassandraStore, startTime, endTime, ttlOut int, tableIn, tableOut string) {
defer wg.Done()
var token int64
var ts int
Expand All @@ -138,7 +136,7 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas
for key := range jobs {
iter := store.Session.Query(queryTpl, key, startTime, endTime).Iter()
for iter.Scan(&token, &ts, &data) {
newTTL := getTTL(int(time.Now().Unix()), ts, ttl)
newTTL := getTTL(int(time.Now().Unix()), ts, ttlOut)
if tableIn == tableOut {
query = fmt.Sprintf("UPDATE %s USING TTL %d SET data = ? WHERE key = ? AND ts = ?", tableIn, newTTL)
} else {
Expand Down Expand Up @@ -178,7 +176,7 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas
}
}

func update(store *cassandra.CassandraStore, ttl int, tableIn, tableOut string) {
func update(store *cassandra.CassandraStore, ttlOut int, tableIn, tableOut string) {

keyItr := store.Session.Query(fmt.Sprintf("SELECT distinct key FROM %s", tableIn)).Iter()

Expand All @@ -187,7 +185,7 @@ func update(store *cassandra.CassandraStore, ttl int, tableIn, tableOut string)
var wg sync.WaitGroup
wg.Add(numThreads)
for i := 0; i < numThreads; i++ {
go worker(i, jobs, &wg, store, startTs, endTs, ttl, tableIn, tableOut)
go worker(i, jobs, &wg, store, startTs, endTs, ttlOut, tableIn, tableOut)
}

var key string
Expand Down

0 comments on commit cb02c1b

Please sign in to comment.