Skip to content

Commit

Permalink
Merge pull request #529 from busbey/htrace
Browse files Browse the repository at this point in the history
[core] add operation tracing via Apache HTrace.
  • Loading branch information
busbey committed Apr 29, 2016
2 parents 0d3c5b0 + d1e9ca1 commit 7f0259f
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 140 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ LICENSE file.
</properties>

<dependencies>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core4</artifactId>
<version>4.1.0-incubating</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
Expand Down
206 changes: 133 additions & 73 deletions core/src/main/java/com/yahoo/ycsb/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import org.apache.htrace.core.Tracer;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.HTraceConfiguration;

import com.yahoo.ycsb.measurements.Measurements;
import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter;
Expand Down Expand Up @@ -346,7 +352,7 @@ public static StringBuilder format(long seconds) {
* @author cooperb
*
*/
class ClientThread extends Thread
class ClientThread implements Runnable
{
/** Counts down each of the clients completing. */
private final CountDownLatch _completeLatch;
Expand Down Expand Up @@ -595,6 +601,18 @@ public class Client
/** An optional thread used to track progress and measure JVM stats. */
private static StatusThread statusthread = null;

// HTrace integration related constants.

/**
* All keys for configuring the tracing system start with this prefix.
*/
private static final String HTRACE_KEY_PREFIX="htrace.";
private static final String CLIENT_WORKLOAD_INIT_SPAN = "Client#workload_init";
private static final String CLIENT_INIT_SPAN = "Client#init";
private static final String CLIENT_WORKLOAD_SPAN = "Client#workload";
private static final String CLIENT_CLEANUP_SPAN = "Client#cleanup";
private static final String CLIENT_EXPORT_MEASUREMENTS_SPAN = "Client#export_measurements";

public static void usageMessage()
{
System.out.println("Usage: java com.yahoo.ycsb.Client [options]");
Expand Down Expand Up @@ -901,7 +919,15 @@ else if (args[argindex].compareTo("-p")==0)
double targetperthread=((double)target)/((double)threadcount);
targetperthreadperms=targetperthread/1000.0;
}


final Map<String, String> filteredProperties = new HashMap<>();
for (String key : props.stringPropertyNames()) {
if (key.startsWith(HTRACE_KEY_PREFIX)) {
filteredProperties.put(key.substring(HTRACE_KEY_PREFIX.length()), props.getProperty(key));
}
}
final HTraceConfiguration conf = HTraceConfiguration.fromMap(filteredProperties);

//show a warning message that creating the workload is taking a while
//but only do so if it is taking longer than 2 seconds
//(showing the message right away if the setup wasn't taking very long was confusing people)
Expand All @@ -922,6 +948,7 @@ public void run()
}
};


warningthread.start();

//set up measurements
Expand All @@ -946,13 +973,13 @@ public void run()
System.err.println();
System.err.println("Loading workload...");

Workload workload=null;
Workload workload = null;

try
{
Class workloadclass = classLoader.loadClass(props.getProperty(WORKLOAD_PROPERTY));

workload=(Workload)workloadclass.newInstance();
workload = (Workload)workloadclass.newInstance();
}
catch (Exception e)
{
Expand All @@ -961,9 +988,16 @@ public void run()
System.exit(0);
}

final Tracer tracer = new Tracer.Builder("YCSB " + workload.getClass().getSimpleName())
.conf(conf)
.build();

try
{
workload.init(props);
try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_INIT_SPAN)) {
workload.init(props);
warningthread.interrupt();
}
}
catch (WorkloadException e)
{
Expand All @@ -972,56 +1006,65 @@ public void run()
System.exit(0);
}

warningthread.interrupt();

//run the workload

System.err.println("Starting test.");
final CountDownLatch completeLatch = new CountDownLatch(threadcount);
final List<ClientThread> clients = new ArrayList<ClientThread>(threadcount);

int opcount;
if (dotransactions)
{
opcount=Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY,"0"));
}
else
{
if (props.containsKey(INSERT_COUNT_PROPERTY))
boolean initFailed = false;
try (final TraceScope span = tracer.newScope(CLIENT_INIT_SPAN)) {

int opcount;
if (dotransactions)
{
opcount=Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY,"0"));
opcount=Integer.parseInt(props.getProperty(OPERATION_COUNT_PROPERTY,"0"));
}
else
{
opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
if (props.containsKey(INSERT_COUNT_PROPERTY))
{
opcount=Integer.parseInt(props.getProperty(INSERT_COUNT_PROPERTY,"0"));
}
else
{
opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
}
}
}

CountDownLatch completeLatch=new CountDownLatch(threadcount);
final List<ClientThread> clients=new ArrayList<ClientThread>(threadcount);
for (int threadid=0; threadid<threadcount; threadid++)
{
DB db=null;
try
{
db=DBFactory.newDB(dbname,props);
}
catch (UnknownDBException e)
for (int threadid=0; threadid<threadcount; threadid++)
{
System.out.println("Unknown DB "+dbname);
System.exit(0);
}
DB db = null;
try
{
db = DBFactory.newDB(dbname, props, tracer);
}
catch (UnknownDBException e)
{
System.out.println("Unknown DB " + dbname);
initFailed = true;
break;
}


int threadopcount = opcount/threadcount;
int threadopcount = opcount / threadcount;

// ensure correct number of operations, in case opcount is not a multiple of threadcount
if (threadid<opcount%threadcount)
{
++threadopcount;
// ensure correct number of operations, in case opcount is not a multiple of threadcount
if (threadid<opcount%threadcount)
{
++threadopcount;
}

ClientThread t=new ClientThread(db,dotransactions,workload,props,threadopcount, targetperthreadperms, completeLatch);

clients.add(t);
}

ClientThread t=new ClientThread(db,dotransactions,workload,props,threadopcount, targetperthreadperms, completeLatch);
}

clients.add(t);
if (initFailed) {
System.err.println("Error initializing datastore bindings.");
System.exit(0);
}

if (status)
Expand All @@ -1038,54 +1081,69 @@ public void run()
statusthread.start();
}

long st=System.currentTimeMillis();

for (Thread t : clients)
{
t.start();
}

Thread terminator = null;
long st;
long en;
int opsDone;

if (maxExecutionTime > 0) {
terminator = new TerminatorThread(maxExecutionTime, clients, workload);
terminator.start();
}
try (final TraceScope span = tracer.newScope(CLIENT_WORKLOAD_SPAN)) {

int opsDone = 0;

for (Thread t : clients)
{
try
{
t.join();
opsDone += ((ClientThread)t).getOpsDone();
final Map<Thread, ClientThread> threads = new HashMap<Thread, ClientThread>(threadcount);
for (ClientThread client : clients) {
threads.put(new Thread(tracer.wrap(client, "ClientThread")), client);
}
catch (InterruptedException e)

st=System.currentTimeMillis();

for (Thread t : threads.keySet())
{
t.start();
}
}

long en=System.currentTimeMillis();
if (maxExecutionTime > 0) {
terminator = new TerminatorThread(maxExecutionTime, threads.keySet(), workload);
terminator.start();
}

if (terminator != null && !terminator.isInterrupted()) {
terminator.interrupt();
}
opsDone = 0;

if (status)
{
// wake up status thread if it's asleep
statusthread.interrupt();
// at this point we assume all the monitored threads are already gone as per above join loop.
try {
statusthread.join();
} catch (InterruptedException e) {
for (Map.Entry<Thread, ClientThread> entry : threads.entrySet())
{
try
{
entry.getKey().join();
opsDone += entry.getValue().getOpsDone();
}
catch (InterruptedException e)
{
}
}

en=System.currentTimeMillis();

}

try
{
workload.cleanup();
try (final TraceScope span = tracer.newScope(CLIENT_CLEANUP_SPAN)) {

if (terminator != null && !terminator.isInterrupted()) {
terminator.interrupt();
}

if (status)
{
// wake up status thread if it's asleep
statusthread.interrupt();
// at this point we assume all the monitored threads are already gone as per above join loop.
try {
statusthread.join();
} catch (InterruptedException e) {
}
}

workload.cleanup();
}
}
catch (WorkloadException e)
{
Expand All @@ -1096,7 +1154,9 @@ public void run()

try
{
exportMeasurements(props, opsDone, en - st);
try (final TraceScope span = tracer.newScope(CLIENT_EXPORT_MEASUREMENTS_SPAN)) {
exportMeasurements(props, opsDone, en - st);
}
} catch (IOException e)
{
System.err.println("Could not export measurements, error: " + e.getMessage());
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/com/yahoo/ycsb/DBFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package com.yahoo.ycsb;

import java.util.Properties;
import org.apache.htrace.core.Tracer;

/**
* Creates a DB layer by dynamically classloading the specified DB class.
*/
public class DBFactory
{
@SuppressWarnings("unchecked")
public static DB newDB(String dbname, Properties properties) throws UnknownDBException
public static DB newDB(String dbname, Properties properties, final Tracer tracer) throws UnknownDBException
{
ClassLoader classLoader = DBFactory.class.getClassLoader();

Expand All @@ -46,7 +47,7 @@ public static DB newDB(String dbname, Properties properties) throws UnknownDBExc

ret.setProperties(properties);

return new DBWrapper(ret);
return new DBWrapper(ret, tracer);
}

}
Loading

0 comments on commit 7f0259f

Please sign in to comment.