diff --git a/core/pom.xml b/core/pom.xml index 3300cf485f..7a4cb94472 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,11 @@ LICENSE file. + + org.apache.htrace + htrace-core4 + 4.1.0-incubating + org.codehaus.jackson jackson-mapper-asl diff --git a/core/src/main/java/com/yahoo/ycsb/Client.java b/core/src/main/java/com/yahoo/ycsb/Client.java index 07c46034d6..c7ff4952d4 100644 --- a/core/src/main/java/com/yahoo/ycsb/Client.java +++ b/core/src/main/java/com/yahoo/ycsb/Client.java @@ -27,12 +27,18 @@ import java.util.ArrayList; import java.util.Date; import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import org.apache.htrace.core.Tracer; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.HTraceConfiguration; + import com.yahoo.ycsb.measurements.Measurements; import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter; import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter; @@ -346,7 +352,7 @@ public static StringBuilder format(long seconds) { * @author cooperb * */ -class ClientThread extends Thread +class ClientThread implements Runnable { /** Counts down each of the clients completing. */ private final CountDownLatch _completeLatch; @@ -595,6 +601,18 @@ public class Client /** An optional thread used to track progress and measure JVM stats. */ private static StatusThread statusthread = null; + // HTrace integration related constants. + + /** + * All keys for configuring the tracing system start with this prefix. + */ + private static final String HTRACE_KEY_PREFIX="htrace."; + private static final String CLIENT_WORKLOAD_INIT_SPAN = "Client#workload_init"; + private static final String CLIENT_INIT_SPAN = "Client#init"; + private static final String CLIENT_WORKLOAD_SPAN = "Client#workload"; + private static final String CLIENT_CLEANUP_SPAN = "Client#cleanup"; + private static final String CLIENT_EXPORT_MEASUREMENTS_SPAN = "Client#export_measurements"; + public static void usageMessage() { System.out.println("Usage: java com.yahoo.ycsb.Client [options]"); @@ -901,7 +919,15 @@ else if (args[argindex].compareTo("-p")==0) double targetperthread=((double)target)/((double)threadcount); targetperthreadperms=targetperthread/1000.0; } - + + final Map filteredProperties = new HashMap<>(); + for (String key : props.stringPropertyNames()) { + if (key.startsWith(HTRACE_KEY_PREFIX)) { + filteredProperties.put(key.substring(HTRACE_KEY_PREFIX.length()), props.getProperty(key)); + } + } + final HTraceConfiguration conf = HTraceConfiguration.fromMap(filteredProperties); + //show a warning message that creating the workload is taking a while //but only do so if it is taking longer than 2 seconds //(showing the message right away if the setup wasn't taking very long was confusing people) @@ -922,6 +948,7 @@ public void run() } }; + warningthread.start(); //set up measurements @@ -946,13 +973,13 @@ public void run() System.err.println(); System.err.println("Loading workload..."); - Workload workload=null; + Workload workload = null; try { Class workloadclass = classLoader.loadClass(props.getProperty(WORKLOAD_PROPERTY)); - workload=(Workload)workloadclass.newInstance(); + workload = (Workload)workloadclass.newInstance(); } catch (Exception e) { @@ -961,9 +988,16 @@ public void run() System.exit(0); } + final Tracer tracer = new Tracer.Builder("YCSB " + workload.getClass().getSimpleName()) + .conf(conf) + .build(); + try { - workload.init(props); + try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_INIT_SPAN)) { + workload.init(props); + warningthread.interrupt(); + } } catch (WorkloadException e) { @@ -972,56 +1006,65 @@ public void run() System.exit(0); } - warningthread.interrupt(); - //run the workload System.err.println("Starting test."); + final CountDownLatch completeLatch = new CountDownLatch(threadcount); + final List clients = new ArrayList(threadcount); - int opcount; - if (dotransactions) - { - opcount=Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY,"0")); - } - else - { - if (props.containsKey(INSERT_COUNT_PROPERTY)) + boolean initFailed = false; + try (final TraceScope span = tracer.newScope(CLIENT_INIT_SPAN)) { + + int opcount; + if (dotransactions) { - opcount=Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY,"0")); + opcount=Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY,"0")); } else { - opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT)); + if (props.containsKey(INSERT_COUNT_PROPERTY)) + { + opcount=Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY,"0")); + } + else + { + opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT)); + } } - } - CountDownLatch completeLatch=new CountDownLatch(threadcount); - final List clients=new ArrayList(threadcount); - for (int threadid=0; threadid 0) { - terminator = new TerminatorThread(maxExecutionTime, clients, workload); - terminator.start(); - } + try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_SPAN)) { - int opsDone = 0; - - for (Thread t : clients) - { - try - { - t.join(); - opsDone += ((ClientThread)t).getOpsDone(); + final Map threads = new HashMap(threadcount); + for (ClientThread client : clients) { + threads.put(new Thread(tracer.wrap(client, "ClientThread")), client); } - catch (InterruptedException e) + + st=System.currentTimeMillis(); + + for (Thread t : threads.keySet()) { + t.start(); } - } - long en=System.currentTimeMillis(); + if (maxExecutionTime > 0) { + terminator = new TerminatorThread(maxExecutionTime, threads.keySet(), workload); + terminator.start(); + } - if (terminator != null && !terminator.isInterrupted()) { - terminator.interrupt(); - } + opsDone = 0; - if (status) - { - // wake up status thread if it's asleep - statusthread.interrupt(); - // at this point we assume all the monitored threads are already gone as per above join loop. - try { - statusthread.join(); - } catch (InterruptedException e) { + for (Map.Entry entry : threads.entrySet()) + { + try + { + entry.getKey().join(); + opsDone += entry.getValue().getOpsDone(); + } + catch (InterruptedException e) + { + } } + + en=System.currentTimeMillis(); + } try { - workload.cleanup(); + try (final TraceScope span = tracer.newScope(CLIENT_CLEANUP_SPAN)) { + + if (terminator != null && !terminator.isInterrupted()) { + terminator.interrupt(); + } + + if (status) + { + // wake up status thread if it's asleep + statusthread.interrupt(); + // at this point we assume all the monitored threads are already gone as per above join loop. + try { + statusthread.join(); + } catch (InterruptedException e) { + } + } + + workload.cleanup(); + } } catch (WorkloadException e) { @@ -1096,7 +1154,9 @@ public void run() try { - exportMeasurements(props, opsDone, en - st); + try (final TraceScope span = tracer.newScope(CLIENT_EXPORT_MEASUREMENTS_SPAN)) { + exportMeasurements(props, opsDone, en - st); + } } catch (IOException e) { System.err.println("Could not export measurements, error: " + e.getMessage()); diff --git a/core/src/main/java/com/yahoo/ycsb/DBFactory.java b/core/src/main/java/com/yahoo/ycsb/DBFactory.java index 18f7f5e1f1..2096c0d139 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBFactory.java +++ b/core/src/main/java/com/yahoo/ycsb/DBFactory.java @@ -18,6 +18,7 @@ package com.yahoo.ycsb; import java.util.Properties; +import org.apache.htrace.core.Tracer; /** * Creates a DB layer by dynamically classloading the specified DB class. @@ -25,7 +26,7 @@ public class DBFactory { @SuppressWarnings("unchecked") - public static DB newDB(String dbname, Properties properties) throws UnknownDBException + public static DB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException { ClassLoader classLoader = DBFactory.class.getClassLoader(); @@ -46,7 +47,7 @@ public static DB newDB(String dbname, Properties properties) throws UnknownDBExc ret.setProperties(properties); - return new DBWrapper(ret); + return new DBWrapper(ret, tracer); } } diff --git a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java index 337f4d9b5d..0109c51955 100644 --- a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java +++ b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java @@ -24,6 +24,9 @@ import java.util.Set; import java.util.Vector; +import org.apache.htrace.core.Tracer; +import org.apache.htrace.core.TraceScope; + import com.yahoo.ycsb.measurements.Measurements; /** @@ -32,8 +35,9 @@ */ public class DBWrapper extends DB { - private DB _db; - private Measurements _measurements; + private final DB _db; + private final Measurements _measurements; + private final Tracer _tracer; private boolean reportLatencyForEachError = false; private HashSet latencyTrackedErrors = new HashSet(); @@ -46,10 +50,27 @@ public class DBWrapper extends DB private static final String LATENCY_TRACKED_ERRORS_PROPERTY = "latencytrackederrors"; - public DBWrapper(DB db) + private final String SCOPE_STRING_CLEANUP; + private final String SCOPE_STRING_DELETE; + private final String SCOPE_STRING_INIT; + private final String SCOPE_STRING_INSERT; + private final String SCOPE_STRING_READ; + private final String SCOPE_STRING_SCAN; + private final String SCOPE_STRING_UPDATE; + + public DBWrapper(final DB db, final Tracer tracer) { _db=db; _measurements=Measurements.getMeasurements(); + _tracer = tracer; + final String simple = db.getClass().getSimpleName(); + SCOPE_STRING_CLEANUP = simple + "#cleanup"; + SCOPE_STRING_DELETE = simple + "#delete"; + SCOPE_STRING_INIT = simple + "#init"; + SCOPE_STRING_INSERT = simple + "#insert"; + SCOPE_STRING_READ = simple + "#read"; + SCOPE_STRING_SCAN = simple + "#scan"; + SCOPE_STRING_UPDATE = simple + "#update"; } /** @@ -74,24 +95,26 @@ public Properties getProperties() */ public void init() throws DBException { - _db.init(); - - this.reportLatencyForEachError = Boolean.parseBoolean(getProperties(). - getProperty(REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY, - REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT)); - - if (!reportLatencyForEachError) { - String latencyTrackedErrors = getProperties().getProperty( - LATENCY_TRACKED_ERRORS_PROPERTY, null); - if (latencyTrackedErrors != null) { - this.latencyTrackedErrors = new HashSet(Arrays.asList( - latencyTrackedErrors.split(","))); + try (final TraceScope span = _tracer.newScope(SCOPE_STRING_INIT)) { + _db.init(); + + this.reportLatencyForEachError = Boolean.parseBoolean(getProperties(). + getProperty(REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY, + REPORT_LATENCY_FOR_EACH_ERROR_PROPERTY_DEFAULT)); + + if (!reportLatencyForEachError) { + String latencyTrackedErrors = getProperties().getProperty( + LATENCY_TRACKED_ERRORS_PROPERTY, null); + if (latencyTrackedErrors != null) { + this.latencyTrackedErrors = new HashSet(Arrays.asList( + latencyTrackedErrors.split(","))); + } } - } - System.err.println("DBWrapper: report latency for each error is " + - this.reportLatencyForEachError + " and specific error codes to track" + - " for latency are: " + this.latencyTrackedErrors.toString()); + System.err.println("DBWrapper: report latency for each error is " + + this.reportLatencyForEachError + " and specific error codes to track" + + " for latency are: " + this.latencyTrackedErrors.toString()); + } } /** @@ -100,11 +123,13 @@ public void init() throws DBException */ public void cleanup() throws DBException { - long ist=_measurements.getIntendedtartTimeNs(); - long st = System.nanoTime(); - _db.cleanup(); - long en=System.nanoTime(); - measure("CLEANUP", Status.OK, ist, st, en); + try (final TraceScope span = _tracer.newScope(SCOPE_STRING_CLEANUP)) { + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + _db.cleanup(); + long en=System.nanoTime(); + measure("CLEANUP", Status.OK, ist, st, en); + } } /** @@ -120,13 +145,15 @@ public void cleanup() throws DBException public Status read(String table, String key, Set fields, HashMap result) { - long ist=_measurements.getIntendedtartTimeNs(); - long st = System.nanoTime(); - Status res=_db.read(table,key,fields,result); - long en=System.nanoTime(); - measure("READ", res, ist, st, en); - _measurements.reportStatus("READ", res); - return res; + try (final TraceScope span = _tracer.newScope(SCOPE_STRING_READ)) { + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + Status res=_db.read(table,key,fields,result); + long en=System.nanoTime(); + measure("READ", res, ist, st, en); + _measurements.reportStatus("READ", res); + return res; + } } /** @@ -143,13 +170,15 @@ public Status read(String table, String key, Set fields, public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - long ist=_measurements.getIntendedtartTimeNs(); - long st = System.nanoTime(); - Status res=_db.scan(table,startkey,recordcount,fields,result); - long en=System.nanoTime(); - measure("SCAN", res, ist, st, en); - _measurements.reportStatus("SCAN", res); - return res; + try (final TraceScope span = _tracer.newScope(SCOPE_STRING_SCAN)) { + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + Status res=_db.scan(table,startkey,recordcount,fields,result); + long en=System.nanoTime(); + measure("SCAN", res, ist, st, en); + _measurements.reportStatus("SCAN", res); + return res; + } } private void measure(String op, Status result, long intendedStartTimeNanos, @@ -181,13 +210,15 @@ private void measure(String op, Status result, long intendedStartTimeNanos, public Status update(String table, String key, HashMap values) { - long ist=_measurements.getIntendedtartTimeNs(); - long st = System.nanoTime(); - Status res=_db.update(table,key,values); - long en=System.nanoTime(); - measure("UPDATE", res, ist, st, en); - _measurements.reportStatus("UPDATE", res); - return res; + try (final TraceScope span = _tracer.newScope(SCOPE_STRING_UPDATE)) { + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + Status res=_db.update(table,key,values); + long en=System.nanoTime(); + measure("UPDATE", res, ist, st, en); + _measurements.reportStatus("UPDATE", res); + return res; + } } /** @@ -203,13 +234,15 @@ public Status update(String table, String key, public Status insert(String table, String key, HashMap values) { - long ist=_measurements.getIntendedtartTimeNs(); - long st = System.nanoTime(); - Status res=_db.insert(table,key,values); - long en=System.nanoTime(); - measure("INSERT", res, ist, st, en); - _measurements.reportStatus("INSERT", res); - return res; + try (final TraceScope span = _tracer.newScope(SCOPE_STRING_INSERT)) { + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + Status res=_db.insert(table,key,values); + long en=System.nanoTime(); + measure("INSERT", res, ist, st, en); + _measurements.reportStatus("INSERT", res); + return res; + } } /** @@ -221,12 +254,14 @@ public Status insert(String table, String key, */ public Status delete(String table, String key) { - long ist=_measurements.getIntendedtartTimeNs(); - long st = System.nanoTime(); - Status res=_db.delete(table,key); - long en=System.nanoTime(); - measure("DELETE", res, ist, st, en); - _measurements.reportStatus("DELETE", res); - return res; + try (final TraceScope span = _tracer.newScope(SCOPE_STRING_DELETE)) { + long ist=_measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + Status res=_db.delete(table,key); + long en=System.nanoTime(); + measure("DELETE", res, ist, st, en); + _measurements.reportStatus("DELETE", res); + return res; + } } } diff --git a/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java b/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java index 62212f2c6d..f15de5dfda 100644 --- a/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java +++ b/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java @@ -16,12 +16,11 @@ */ package com.yahoo.ycsb; -import java.util.List; -import java.util.Vector; +import java.util.Collection; /** * A thread that waits for the maximum specified time and then interrupts all the client - * threads passed as the Vector at initialization of this thread. + * threads passed at initialization of this thread. * * The maximum execution time passed is assumed to be in seconds. * @@ -30,12 +29,12 @@ */ public class TerminatorThread extends Thread { - private final List threads; + private final Collection threads; private long maxExecutionTime; private Workload workload; private long waitTimeOutInMS; - public TerminatorThread(long maxExecutionTime, List threads, + public TerminatorThread(long maxExecutionTime, Collection threads, Workload workload) { this.maxExecutionTime = maxExecutionTime; this.threads = threads; diff --git a/workloads/workload_template b/workloads/workload_template index aff1e7c8e1..b66d3b6ef8 100644 --- a/workloads/workload_template +++ b/workloads/workload_template @@ -178,4 +178,26 @@ timeseries.granularity=1000 # core_workload_insertion_retry_limit = 0 # # the following number controls the interval between retries (in seconds): -# core_workload_insertion_retry_interval = 3 \ No newline at end of file +# core_workload_insertion_retry_interval = 3 + +# Distributed Tracing via Apache HTrace (http://htrace.incubator.apache.org/) +# +# Defaults to blank / no tracing +# Below sends to a local file, sampling at 0.1% +# +# htrace.sampler.classes=ProbabilitySampler +# htrace.sampler.fraction=0.001 +# htrace.span.receiver.classes=org.apache.htrace.core.LocalFileSpanReceiver +# htrace.local.file.span.receiver.path=/some/path/to/local/file +# +# To capture all spans, use the AlwaysSampler +# +# htrace.sampler.classes=AlwaysSampler +# +# To send spans to an HTraced receiver, use the below and ensure +# your classpath contains the htrace-htraced jar (i.e. when invoking the ycsb +# command add -cp /path/to/htrace-htraced.jar) +# +# htrace.span.receiver.classes=org.apache.htrace.impl.HTracedSpanReceiver +# htrace.htraced.receiver.address=example.com:9075 +# htrace.htraced.error.log.period.ms=10000