@@ -157,13 +157,25 @@ func sendCmdLogicRadix(conn Client, newCmdS []string, enableMultiExec bool, key
157157	datapointsChan  <-  datapoint {! (err  !=  nil ), duration .Microseconds (), cacheHit }
158158}
159159
160+ func  onInvalidations (messages  []rueidis.RedisMessage ) {
161+ 	if  messages  !=  nil  {
162+ 		cscInvalidationMutex .Lock ()
163+ 		for  range  messages  {
164+ 			totalCachedInvalidations ++ 
165+ 		}
166+ 		cscInvalidationMutex .Unlock ()
167+ 	}
168+ 
169+ }
170+ 
160171func  main () {
161172	host  :=  flag .String ("h" , "127.0.0.1" , "Server hostname." )
162173	port  :=  flag .Int ("p" , 12000 , "Server port." )
163174	rps  :=  flag .Int64 ("rps" , 0 , "Max rps. If 0 no limit is applied and the DB is stressed up to maximum." )
164175	rpsburst  :=  flag .Int64 ("rps-burst" , 0 , "Max rps burst. If 0 the allowed burst will be the ammount of clients." )
165176	username  :=  flag .String ("u" , "" , "Username for Redis Auth." )
166177	password  :=  flag .String ("a" , "" , "Password for Redis Auth." )
178+ 	jsonOutFile  :=  flag .String ("json-out-file" , "" , "Results file. If empty will not save." )
167179	seed  :=  flag .Int64 ("random-seed" , 12345 , "random seed to be used." )
168180	clients  :=  flag .Uint64 ("c" , 50 , "number of clients." )
169181	keyspacelen  :=  flag .Uint64 ("r" , 1000000 , "keyspace length. The benchmark will expand the string __key__ inside an argument with a number in the specified range from 0 to keyspacelen-1. The substitution changes every time a command is executed." )
@@ -248,6 +260,7 @@ func main() {
248260	samplesPerClient  :=  * numberRequests  /  * clients 
249261	client_update_tick  :=  1 
250262	latencies  =  hdrhistogram .New (1 , 90000000 , 3 )
263+ 	latenciesTick  =  hdrhistogram .New (1 , 90000000 , 3 )
251264	opts  :=  radix.Dialer {}
252265	if  * password  !=  ""  {
253266		opts .AuthPass  =  * password 
@@ -310,6 +323,10 @@ func main() {
310323		cmd  :=  make ([]string , len (args ))
311324		copy (cmd , args )
312325		if  * cscEnabled  ||  * useRuedis  {
326+ 			var  invalidationFunction  func ([]rueidis.RedisMessage ) =  nil 
327+ 			if  * cscEnabled  {
328+ 				invalidationFunction  =  onInvalidations 
329+ 			}
313330			clientOptions  :=  rueidis.ClientOption {
314331				InitAddress :         []string {connectionStr },
315332				Username :            * username ,
@@ -323,6 +340,7 @@ func main() {
323340				ReadBufferEachConn :  1024 ,
324341				WriteBufferEachConn : 1024 ,
325342				CacheSizeEachConn :   * cscSizeBytes ,
343+ 				OnInvalidations :     invalidationFunction ,
326344			}
327345			clientOptions .Dialer .KeepAlive  =  * clientKeepAlive 
328346			ruedisClient , err  =  rueidis .NewClient (clientOptions )
@@ -350,8 +368,10 @@ func main() {
350368	signal .Notify (c , os .Interrupt )
351369
352370	tick  :=  time .NewTicker (time .Duration (client_update_tick ) *  time .Second )
353- 	closed , _ , duration , totalMessages , _  :=  updateCLI (tick , c , * numberRequests , * loop , datapointsChan )
354- 	messageRate  :=  float64 (totalMessages ) /  float64 (duration .Seconds ())
371+ 	closed , startT , endT , duration , _ , _ , _ , messageRateTs , cacheRateTs , cacheInvalidationsTs , percentilesTs  :=  updateCLI (tick , c , * numberRequests , * loop , datapointsChan , int (* clients ))
372+ 	messageRate  :=  float64 (totalCommands ) /  float64 (duration .Seconds ())
373+ 	CSCHitRate  :=  float64 (totalCached ) /  float64 (duration .Seconds ())
374+ 	CSCInvalidationRate  :=  float64 (totalCachedInvalidations ) /  float64 (duration .Seconds ())
355375	avgMs  :=  float64 (latencies .Mean ()) /  1000.0 
356376	p50IngestionMs  :=  float64 (latencies .ValueAtQuantile (50.0 )) /  1000.0 
357377	p95IngestionMs  :=  float64 (latencies .ValueAtQuantile (95.0 )) /  1000.0 
@@ -362,10 +382,22 @@ func main() {
362382	fmt .Printf ("Total Duration %.3f Seconds\n " , duration .Seconds ())
363383	fmt .Printf ("Total Errors %d\n " , totalErrors )
364384	fmt .Printf ("Throughput summary: %.0f requests per second\n " , messageRate )
385+ 	fmt .Printf ("                    %.0f CSC Hits per second\n " , CSCHitRate )
386+ 	fmt .Printf ("                    %.0f CSC Evicts per second\n " , CSCInvalidationRate )
365387	fmt .Printf ("Latency summary (msec):\n " )
366388	fmt .Printf ("    %9s %9s %9s %9s\n " , "avg" , "p50" , "p95" , "p99" )
367389	fmt .Printf ("    %9.3f %9.3f %9.3f %9.3f\n " , avgMs , p50IngestionMs , p95IngestionMs , p99IngestionMs )
368390
391+ 	testResult  :=  NewTestResult ("" , uint (* clients ), 0 )
392+ 	testResult .FillDurationInfo (startT , endT , duration )
393+ 	testResult .OverallClientLatencies  =  percentilesTs 
394+ 	testResult .OverallCommandRate  =  messageRateTs 
395+ 	testResult .OverallCSCHitRate  =  cacheRateTs 
396+ 	testResult .OverallCSCInvalidationRate  =  cacheInvalidationsTs 
397+ 	_ , overallLatencies  :=  generateLatenciesMap (latencies , duration )
398+ 	testResult .Totals  =  overallLatencies 
399+ 	saveJsonResult (testResult , * jsonOutFile )
400+ 
369401	if  closed  {
370402		return 
371403	}
@@ -378,21 +410,27 @@ func main() {
378410	os .Exit (0 )
379411}
380412
381- func  updateCLI (tick  * time.Ticker , c  chan  os.Signal , message_limit  uint64 , loop  bool , datapointsChan  chan  datapoint ) (bool , time.Time , time.Duration , uint64 , [ ]float64 ) {
413+ func  updateCLI (tick  * time.Ticker , c  chan  os.Signal , message_limit  uint64 , loop  bool , datapointsChan  chan  datapoint ,  totalClients   int ) (bool , time.Time , time.Time , time. Duration , uint64 , uint64 ,  uint64 , [] float64 , [] float64 , [] float64 , [] map [ string ]float64 ) {
382414	var  currentErr  uint64  =  0 
383415	var  currentCount  uint64  =  0 
384416	var  currentCachedCount  uint64  =  0 
385417	start  :=  time .Now ()
386418	prevTime  :=  time .Now ()
387419	prevMessageCount  :=  uint64 (0 )
420+ 	prevMessageCached  :=  uint64 (0 )
421+ 	previousCachedInvalidations  :=  uint64 (0 )
388422	messageRateTs  :=  []float64 {}
423+ 	cacheRateTs  :=  []float64 {}
424+ 	cacheInvalidationsTs  :=  []float64 {}
425+ 	percentilesTs  :=  []map [string ]float64 {}
389426	var  dp  datapoint 
390- 	fmt .Printf ("%26s %7s %25s %25s %7s %25s %25s %7s  %25s\n " , "Test time" , " " , "Total Commands" , "Total Errors" , "" , "Command Rate" , "Client Cache  Hits" , "" , "p50 lat. (msec)" )
427+ 	fmt .Printf ("%26s %7s %25s %25s %7s %25s %25s %25s  %25s\n " , "Test time" , " " , "Total Commands" , "Total Errors" , "" , "Command Rate" , "CSC  Hits/sec " , "CSC Invalidations/sec " , "p50 lat. (msec)" )
391428	for  {
392429		select  {
393430		case  dp  =  <- datapointsChan :
394431			{
395432				latencies .RecordValue (dp .duration_ms )
433+ 				latenciesTick .RecordValue (dp .duration_ms )
396434				if  ! dp .success  {
397435					currentErr ++ 
398436				}
@@ -412,41 +450,48 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop b
412450				now  :=  time .Now ()
413451				took  :=  now .Sub (prevTime )
414452				messageRate  :=  float64 (totalCommands - prevMessageCount ) /  float64 (took .Seconds ())
453+ 				InvalidationMessageRate  :=  float64 (totalCachedInvalidations - previousCachedInvalidations ) /  float64 (took .Seconds ())
454+ 				CacheHitRate  :=  float64 (totalCached - prevMessageCached ) /  float64 (took .Seconds ())
415455				completionPercentStr  :=  "[----%]" 
416456				if  ! loop  {
417457					completionPercent  :=  float64 (totalCommands ) /  float64 (message_limit ) *  100.0 
418458					completionPercentStr  =  fmt .Sprintf ("[%3.1f%%]" , completionPercent )
419459				}
420460				errorPercent  :=  float64 (totalErrors ) /  float64 (totalCommands ) *  100.0 
421- 				cachedPercent  :=  0.0 
422- 				if  totalCached  >  0  {
423- 					cachedPercent  =  float64 (totalCached ) /  float64 (totalCommands ) *  100.0 
424- 				}
425- 
426461				p50  :=  float64 (latencies .ValueAtQuantile (50.0 )) /  1000.0 
427462
428463				if  prevMessageCount  ==  0  &&  totalCommands  !=  0  {
429464					start  =  time .Now ()
430465				}
431466				if  totalCommands  !=  0  {
432467					messageRateTs  =  append (messageRateTs , messageRate )
468+ 					cacheRateTs  =  append (cacheRateTs , CacheHitRate )
469+ 					cacheInvalidationsTs  =  append (cacheInvalidationsTs , InvalidationMessageRate )
470+ 					_ , perTickLatencies  :=  generateLatenciesMap (latenciesTick , took )
471+ 					percentilesTs  =  append (percentilesTs , perTickLatencies )
472+ 					latenciesTick .Reset ()
433473				}
474+ 
434475				prevMessageCount  =  totalCommands 
476+ 				prevMessageCached  =  totalCached 
477+ 				previousCachedInvalidations  =  totalCachedInvalidations 
435478				prevTime  =  now 
436479
437- 				fmt .Printf ("%25.0fs %s %25d %25d [%3.1f%%] %25.0f %25d [%3.1f%%]  %25.3f\t " , time .Since (start ).Seconds (), completionPercentStr , totalCommands , totalErrors , errorPercent , messageRate , totalCached ,  cachedPercent , p50 )
480+ 				fmt .Printf ("%25.0fs %s %25d %25d [%3.1f%%] %25.0f %25.0f %25.0f  %25.3f\t " , time .Since (start ).Seconds (), completionPercentStr , totalCommands , totalErrors , errorPercent , messageRate , CacheHitRate ,  InvalidationMessageRate , p50 )
438481				fmt .Printf ("\r " )
439482				//w.Flush() 
440483				if  message_limit  >  0  &&  totalCommands  >=  uint64 (message_limit ) &&  ! loop  {
441- 					return  true , start , time .Since (start ), totalCommands , messageRateTs 
484+ 					end  :=  time .Now ()
485+ 					return  true , start , end , time .Since (start ), totalCommands , totalCached , totalErrors , messageRateTs , cacheRateTs , cacheInvalidationsTs , percentilesTs 
442486				}
443487
444488				break 
445489			}
446490
447491		case  <- c :
448492			fmt .Println ("\n received Ctrl-c - shutting down" )
449- 			return  true , start , time .Since (start ), totalCommands , messageRateTs 
493+ 			end  :=  time .Now ()
494+ 			return  true , start , end , time .Since (start ), totalCommands , totalCached , totalErrors , messageRateTs , cacheRateTs , cacheInvalidationsTs , percentilesTs 
450495		}
451496	}
452497}
0 commit comments