1212import json
1313import textwrap
1414import logging
15+ import csv
16+ import shlex
1517
1618class FrameworkTest :
1719 ##########################################################################################
@@ -43,6 +45,14 @@ class FrameworkTest:
4345 echo ""
4446 {wrk} {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}"
4547 sleep 5
48+
49+ echo ""
50+ echo "---------------------------------------------------------"
51+ echo " Synchronizing time"
52+ echo "---------------------------------------------------------"
53+ echo ""
54+ ntpdate -s pool.ntp.org
55+
4656 for c in {interval}
4757 do
4858 echo ""
@@ -51,7 +61,10 @@ class FrameworkTest:
5161 echo " {wrk} {headers} -d {duration} -c $c --timeout $c -t $(($c>{max_threads}?{max_threads}:$c)) \" http://{server_host}:{port}{url}\" -s ~/pipeline.lua -- {pipeline}"
5262 echo "---------------------------------------------------------"
5363 echo ""
64+ STARTTIME=$(date +"%s")
5465 {wrk} {headers} -d {duration} -c $c --timeout $c -t "$(($c>{max_threads}?{max_threads}:$c))" http://{server_host}:{port}{url} -s ~/pipeline.lua -- {pipeline}
66+ echo "STARTTIME $STARTTIME"
67+ echo "ENDTIME $(date +"%s")"
5568 sleep 2
5669 done
5770 """
@@ -75,6 +88,14 @@ class FrameworkTest:
7588 echo ""
7689 wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}2"
7790 sleep 5
91+
92+ echo ""
93+ echo "---------------------------------------------------------"
94+ echo " Synchronizing time"
95+ echo "---------------------------------------------------------"
96+ echo ""
97+ ntpdate -s pool.ntp.org
98+
7899 for c in {interval}
79100 do
80101 echo ""
@@ -83,7 +104,10 @@ class FrameworkTest:
83104 echo " wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} \" http://{server_host}:{port}{url}$c\" "
84105 echo "---------------------------------------------------------"
85106 echo ""
107+ STARTTIME=$(date +"%s")
86108 wrk {headers} -d {duration} -c {max_concurrency} --timeout {max_concurrency} -t {max_threads} "http://{server_host}:{port}{url}$c"
109+ echo "STARTTIME $STARTTIME"
110+ echo "ENDTIME $(date +"%s")"
87111 sleep 2
88112 done
89113 """
@@ -540,8 +564,11 @@ def benchmark(self, out, err):
540564 pass
541565 if self .json_url_passed :
542566 remote_script = self .__generate_concurrency_script (self .json_url , self .port , self .accept_json )
567+ self .__begin_logging (self .JSON )
543568 self .__run_benchmark (remote_script , output_file , err )
569+ self .__end_logging ()
544570 results = self .__parse_test (self .JSON )
571+ print results
545572 self .benchmarker .report_results (framework = self , test = self .JSON , results = results ['results' ])
546573 out .write ( "Complete\n " )
547574 out .flush ()
@@ -565,7 +592,9 @@ def benchmark(self, out, err):
565592 pass
566593 if self .db_url_passed :
567594 remote_script = self .__generate_concurrency_script (self .db_url , self .port , self .accept_json )
595+ self .__begin_logging (self .DB )
568596 self .__run_benchmark (remote_script , output_file , err )
597+ self .__end_logging ()
569598 results = self .__parse_test (self .DB )
570599 self .benchmarker .report_results (framework = self , test = self .DB , results = results ['results' ])
571600 out .write ( "Complete\n " )
@@ -589,7 +618,9 @@ def benchmark(self, out, err):
589618 pass
590619 if self .query_url_passed :
591620 remote_script = self .__generate_query_script (self .query_url , self .port , self .accept_json )
621+ self .__begin_logging (self .QUERY )
592622 self .__run_benchmark (remote_script , output_file , err )
623+ self .__end_logging ()
593624 results = self .__parse_test (self .QUERY )
594625 self .benchmarker .report_results (framework = self , test = self .QUERY , results = results ['results' ])
595626 out .write ( "Complete\n " )
@@ -610,7 +641,9 @@ def benchmark(self, out, err):
610641 pass
611642 if self .fortune_url_passed :
612643 remote_script = self .__generate_concurrency_script (self .fortune_url , self .port , self .accept_html )
644+ self .__begin_logging (self .FORTUNE )
613645 self .__run_benchmark (remote_script , output_file , err )
646+ self .__end_logging ()
614647 results = self .__parse_test (self .FORTUNE )
615648 self .benchmarker .report_results (framework = self , test = self .FORTUNE , results = results ['results' ])
616649 out .write ( "Complete\n " )
@@ -631,7 +664,9 @@ def benchmark(self, out, err):
631664 pass
632665 if self .update_url_passed :
633666 remote_script = self .__generate_query_script (self .update_url , self .port , self .accept_json )
667+ self .__begin_logging (self .UPDATE )
634668 self .__run_benchmark (remote_script , output_file , err )
669+ self .__end_logging ()
635670 results = self .__parse_test (self .UPDATE )
636671 self .benchmarker .report_results (framework = self , test = self .UPDATE , results = results ['results' ])
637672 out .write ( "Complete\n " )
@@ -652,7 +687,9 @@ def benchmark(self, out, err):
652687 pass
653688 if self .plaintext_url_passed :
654689 remote_script = self .__generate_concurrency_script (self .plaintext_url , self .port , self .accept_plaintext , wrk_command = "wrk" , intervals = [256 ,1024 ,4096 ,16384 ], pipeline = "16" )
690+ self .__begin_logging (self .PLAINTEXT )
655691 self .__run_benchmark (remote_script , output_file , err )
692+ self .__end_logging ()
656693 results = self .__parse_test (self .PLAINTEXT )
657694 self .benchmarker .report_results (framework = self , test = self .PLAINTEXT , results = results ['results' ])
658695 out .write ( "Complete\n " )
@@ -788,6 +825,15 @@ def __parse_test(self, test_type):
788825 m = re .search ("Non-2xx or 3xx responses: ([0-9]+)" , line )
789826 if m != None :
790827 rawData ['5xx' ] = int (m .group (1 ))
828+ if "STARTTIME" in line :
829+ m = re .search ("[0-9]+" , line )
830+ rawData ["startTime" ] = int (m .group (0 ))
831+ if "ENDTIME" in line :
832+ m = re .search ("[0-9]+" , line )
833+ rawData ["endTime" ] = int (m .group (0 ))
834+ stats = self .__parse_stats (test_type , rawData ["startTime" ], rawData ["endTime" ], 1 )
835+ with open (self .benchmarker .stats_file (self .name , test_type ) + ".json" , "w" ) as stats_file :
836+ json .dump (stats , stats_file )
791837
792838
793839 return results
@@ -902,6 +948,77 @@ def requires_database(self):
902948 self .contains_type (self .DB ) or
903949 self .contains_type (self .QUERY ) or
904950 self .contains_type (self .UPDATE ))
951+ ############################################################
952+ # __begin_logging
953+ # Starts a thread to monitor the resource usage, to be synced with the client's time
954+ # TODO: MySQL and InnoDB are possible. Figure out how to implement them.
955+ ############################################################
956+ def __begin_logging (self , test_name ):
957+ output_file = "{file_name}" .format (file_name = self .benchmarker .get_stats_file (self .name , test_name ))
958+ dstat_string = "dstat -afilmprsT --aio --fs --ipc --lock --raw --socket --tcp \
959+ --raw --socket --tcp --udp --unix --vm --disk-util \
960+ --rpc --rpcd --output {output_file}" .format (output_file = output_file )
961+ cmd = shlex .split (dstat_string )
962+ dev_null = open (os .devnull , "w" )
963+ self .subprocess_handle = subprocess .Popen (cmd , stdout = dev_null )
964+ ##############################################################
965+ # End __begin_logging
966+ ##############################################################
967+
968+ ##############################################################
969+ # Begin __end_logging
970+ # Stops the logger thread and blocks until shutdown is complete.
971+ ##############################################################
972+ def __end_logging (self ):
973+ self .subprocess_handle .terminate ()
974+ self .subprocess_handle .communicate ()
975+ ##############################################################
976+ # End __end_logging
977+ ##############################################################
978+
979+ ##############################################################
980+ # Begin __parse_stats
981+ # For each test type, process all the statistics, and return a multi-layered dictionary
982+ # that has a structure as follows:
983+ # (timestamp)
984+ # | (main header) - group that the stat is in
985+ # | | (sub header) - title of the stat
986+ # | | | (stat) - the stat itself, usually a floating point number
987+ ##############################################################
988+ def __parse_stats (self , test_type , start_time , end_time , interval ):
989+ stats_dict = dict ()
990+ stats_file = self .benchmarker .stats_file (self .name , test_type )
991+ with open (stats_file ) as stats :
992+ while (stats .next () != "\n " ):
993+ pass
994+ stats_reader = csv .reader (stats )
995+ h1 = stats_reader .next ()
996+ h2 = stats_reader .next ()
997+ time_row = h2 .index ("epoch" )
998+ int_counter = 0
999+ for row in stats_reader :
1000+ time = float (row [time_row ])
1001+ int_counter += 1
1002+ if time < start_time :
1003+ continue
1004+ elif time > end_time :
1005+ return stats_dict
1006+ if int_counter % interval != 0 :
1007+ continue
1008+ row_dict = dict ()
1009+ for nextheader in h1 :
1010+ if nextheader != "" :
1011+ row_dict [nextheader ] = dict ()
1012+ header = ""
1013+ for item_num in range (len (row )):
1014+ if (len (h1 [item_num ]) != 0 ):
1015+ header = h1 [item_num ]
1016+ row_dict [header ][h2 [item_num ]] = row [item_num ]
1017+ stats_dict [time ] = row_dict
1018+ return stats_dict
1019+ ##############################################################
1020+ # End __parse_stats
1021+ ##############################################################
9051022
9061023 ##########################################################################################
9071024 # Constructor
0 commit comments