diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java index d51dfa416313..422943fff042 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.ozone.OzoneConsts; import java.util.Map; @@ -51,6 +52,11 @@ public final class ContainerClientMetrics { private MutableCounterLong totalWriteChunkCalls; @Metric private MutableCounterLong totalWriteChunkBytes; + private MutableQuantiles[] listBlockLatency; + private MutableQuantiles[] getBlockLatency; + private MutableQuantiles[] getCommittedBlockLengthLatency; + private MutableQuantiles[] readChunkLatency; + private MutableQuantiles[] getSmallFileLatency; private final Map writeChunkCallsByPipeline; private final Map writeChunkBytesByPipeline; private final Map writeChunksCallsByLeaders; @@ -84,6 +90,36 @@ private ContainerClientMetrics() { writeChunkCallsByPipeline = new ConcurrentHashMap<>(); writeChunkBytesByPipeline = new ConcurrentHashMap<>(); writeChunksCallsByLeaders = new ConcurrentHashMap<>(); + + listBlockLatency = new MutableQuantiles[3]; + getBlockLatency = new MutableQuantiles[3]; + getCommittedBlockLengthLatency = new MutableQuantiles[3]; + readChunkLatency = new MutableQuantiles[3]; + getSmallFileLatency = new MutableQuantiles[3]; + int[] intervals = {60, 300, 900}; + for (int i = 0; i < intervals.length; i++) { + int interval = intervals[i]; + listBlockLatency[i] = registry + .newQuantiles("listBlockLatency" + interval + + "s", "ListBlock latency in microseconds", "ops", + "latency", interval); + getBlockLatency[i] = registry + .newQuantiles("getBlockLatency" + interval + + "s", "GetBlock latency in microseconds", "ops", + "latency", interval); + getCommittedBlockLengthLatency[i] = registry + .newQuantiles("getCommittedBlockLengthLatency" + interval + + "s", "GetCommittedBlockLength latency in microseconds", + "ops", "latency", interval); + readChunkLatency[i] = registry + .newQuantiles("readChunkLatency" + interval + + "s", "ReadChunk latency in microseconds", "ops", + "latency", interval); + getSmallFileLatency[i] = registry + .newQuantiles("getSmallFileLatency" + interval + + "s", "GetSmallFile latency in microseconds", "ops", + "latency", interval); + } } public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) { @@ -111,7 +147,48 @@ public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) { totalWriteChunkBytes.incr(chunkSizeBytes); } - MutableCounterLong getTotalWriteChunkBytes() { + public void addListBlockLatency(long latency) { + for (MutableQuantiles q : listBlockLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addGetBlockLatency(long latency) { + for (MutableQuantiles q : getBlockLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addGetCommittedBlockLengthLatency(long latency) { + for (MutableQuantiles q : getCommittedBlockLengthLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addReadChunkLatency(long latency) { + for (MutableQuantiles q : readChunkLatency) { + if (q != null) { + q.add(latency); + } + } + } + + public void addGetSmallFileLatency(long latency) { + for (MutableQuantiles q : getSmallFileLatency) { + if (q != null) { + q.add(latency); + } + } + } + + @VisibleForTesting + public MutableCounterLong getTotalWriteChunkBytes() { return totalWriteChunkBytes; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 03b7844cc941..72754d1f1cf6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -29,6 +29,9 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -64,6 +67,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.security.token.Token; @@ -128,6 +132,10 @@ public static ListBlockResponseProto listBlock(XceiverClientSpi xceiverClient, if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -146,14 +154,17 @@ static T tryEachDatanode(Pipeline pipeline, try { return op.apply(d); } catch (IOException e) { + Span span = GlobalTracer.get().activeSpan(); if (e instanceof StorageContainerException) { StorageContainerException sce = (StorageContainerException)e; // Block token expired. There's no point retrying other DN. // Throw the exception to request a new block token right away. if (sce.getResult() == BLOCK_TOKEN_VERIFICATION_FAILED) { + span.log("block token verification failed at DN " + d); throw e; } } + span.log("failed to connect to DN " + d); excluded.add(d); if (excluded.size() < pipeline.size()) { LOG.warn(toErrorMessage.apply(d) @@ -211,6 +222,10 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, List validators, ContainerCommandRequestProto.Builder builder, DatanodeDetails datanode) throws IOException { + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } final ContainerCommandRequestProto request = builder .setDatanodeUuid(datanode.getUuidString()).build(); ContainerCommandResponseProto response = @@ -246,6 +261,10 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request, getValidatorList()); @@ -319,10 +338,19 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( builder.setEncodedToken(token.encodeToUrlString()); } - return tryEachDatanode(xceiverClient.getPipeline(), - d -> readChunk(xceiverClient, chunk, blockID, - validators, builder, d), - d -> toErrorMessage(chunk, blockID, d)); + Span span = GlobalTracer.get() + .buildSpan("readChunk").start(); + try (Scope ignored = GlobalTracer.get().activateSpan(span)) { + span.setTag("offset", chunk.getOffset()) + .setTag("length", chunk.getLen()) + .setTag("block", blockID.toString()); + return tryEachDatanode(xceiverClient.getPipeline(), + d -> readChunk(xceiverClient, chunk, blockID, + validators, builder, d), + d -> toErrorMessage(chunk, blockID, d)); + } finally { + span.finish(); + } } private static ContainerProtos.ReadChunkResponseProto readChunk( @@ -330,10 +358,15 @@ private static ContainerProtos.ReadChunkResponseProto readChunk( List validators, ContainerCommandRequestProto.Builder builder, DatanodeDetails d) throws IOException { - final ContainerCommandRequestProto request = builder - .setDatanodeUuid(d.getUuidString()).build(); + ContainerCommandRequestProto.Builder requestBuilder = builder + .setDatanodeUuid(d.getUuidString()); + Span span = GlobalTracer.get().activeSpan(); + String traceId = TracingUtil.exportSpan(span); + if (traceId != null) { + requestBuilder = requestBuilder.setTraceID(traceId); + } ContainerCommandResponseProto reply = - xceiverClient.sendCommand(request, validators); + xceiverClient.sendCommand(requestBuilder.build(), validators); final ReadChunkResponseProto response = reply.getReadChunk(); final long readLen = getLen(response); if (readLen != chunk.getLen()) { @@ -515,6 +548,11 @@ public static void createContainer(XceiverClientSpi client, if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } + request.setCmdType(ContainerProtos.Type.CreateContainer); request.setContainerID(containerID); request.setCreateContainer(createRequest.build()); @@ -544,6 +582,10 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } client.sendCommand(request.build(), getValidatorList()); } @@ -566,6 +608,10 @@ public static void closeContainer(XceiverClientSpi client, if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } client.sendCommand(request.build(), getValidatorList()); } @@ -589,6 +635,10 @@ public static ReadContainerResponseProto readContainer( if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } ContainerCommandResponseProto response = client.sendCommand(request.build(), getValidatorList()); @@ -624,6 +674,10 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = client.sendCommand(request, getValidatorList()); @@ -694,6 +748,10 @@ public static List toValidatorList(Validator validator) { if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } ContainerCommandRequestProto request = builder.build(); Map responses = xceiverClient.sendCommandOnAllNodes(request); @@ -719,6 +777,10 @@ public static List toValidatorList(Validator validator) { if (encodedToken != null) { request.setEncodedToken(encodedToken); } + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + request.setTraceID(traceId); + } Map responses = client.sendCommandOnAllNodes(request.build()); for (Map.Entry entry : diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index b968d407232c..29bd847319ea 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -139,6 +139,16 @@ public static boolean isTracingEnabled( ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT); } + /** + * Execute {@code runnable} inside an activated new span. + */ + public static void executeInNewSpan(String spanName, + CheckedRunnable runnable) throws E { + Span span = GlobalTracer.get() + .buildSpan(spanName).start(); + executeInSpan(span, runnable); + } + /** * Execute {@code supplier} inside an activated new span. */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 178a9919c114..f84cc55dc173 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -322,6 +322,8 @@ public void onRemoval( this.blockInputStreamFactory = BlockInputStreamFactoryImpl .getInstance(byteBufferPool, ecReconstructExecutor); this.clientMetrics = ContainerClientMetrics.acquire(); + + TracingUtil.initTracing("client", conf); } public XceiverClientFactory getXceiverClientManager() { diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java index 1fcb1554b6c3..3ba291ae0fd0 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.ozone; import com.google.common.base.Preconditions; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CreateFlag; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.ozone.OFSPath; @@ -239,7 +242,12 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { statistics.incrementReadOps(1); LOG.trace("open() path: {}", path); final String key = pathToKey(path); - return new FSDataInputStream(createFSInputStream(adapter.readFile(key))); + return TracingUtil.executeInNewSpan("ofs open", + () -> { + Span span = GlobalTracer.get().activeSpan(); + span.setTag("path", key); + return new FSDataInputStream(createFSInputStream(adapter.readFile(key))); + }); } protected InputStream createFSInputStream(InputStream inputStream) { @@ -263,7 +271,8 @@ public FSDataOutputStream create(Path f, FsPermission permission, incrementCounter(Statistic.INVOCATION_CREATE, 1); statistics.incrementWriteOps(1); final String key = pathToKey(f); - return createOutputStream(key, replication, overwrite, true); + return TracingUtil.executeInNewSpan("ofs create", + () -> createOutputStream(key, replication, overwrite, true)); } @Override @@ -277,8 +286,10 @@ public FSDataOutputStream createNonRecursive(Path path, incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1); statistics.incrementWriteOps(1); final String key = pathToKey(path); - return createOutputStream(key, - replication, flags.contains(CreateFlag.OVERWRITE), false); + return TracingUtil.executeInNewSpan("ofs createNonRecursive", + () -> + createOutputStream(key, + replication, flags.contains(CreateFlag.OVERWRITE), false)); } private OutputStream selectOutputStream(String key, short replication, @@ -374,6 +385,14 @@ boolean processKeyPath(List keyPathList) throws IOException { */ @Override public boolean rename(Path src, Path dst) throws IOException { + return TracingUtil.executeInNewSpan("ofs rename", + () -> renameInSpan(src, dst)); + } + + private boolean renameInSpan(Path src, Path dst) throws IOException { + Span span = GlobalTracer.get().activeSpan(); + span.setTag("src", src.toString()) + .setTag("dst", dst.toString()); incrementCounter(Statistic.INVOCATION_RENAME, 1); statistics.incrementWriteOps(1); if (src.equals(dst)) { @@ -526,8 +545,8 @@ protected void rename(final Path src, final Path dst, @Override public Path createSnapshot(Path path, String snapshotName) throws IOException { - String snapshot = getAdapter() - .createSnapshot(pathToKey(path), snapshotName); + String snapshot = TracingUtil.executeInNewSpan("ofs createSnapshot", + () -> getAdapter().createSnapshot(pathToKey(path), snapshotName)); return new Path(OzoneFSUtils.trimPathToDepth(path, PATH_DEPTH_TO_BUCKET), OM_SNAPSHOT_INDICATOR + OZONE_URI_DELIMITER + snapshot); } @@ -541,7 +560,8 @@ public void renameSnapshot(Path path, String snapshotOldName, String snapshotNew @Override public void deleteSnapshot(Path path, String snapshotName) throws IOException { - adapter.deleteSnapshot(pathToKey(path), snapshotName); + TracingUtil.executeInNewSpan("ofs deleteSnapshot", + () -> adapter.deleteSnapshot(pathToKey(path), snapshotName)); } private class DeleteIterator extends OzoneListingIterator { @@ -672,6 +692,11 @@ private boolean innerDelete(Path f, boolean recursive) throws IOException { */ @Override public boolean delete(Path f, boolean recursive) throws IOException { + return TracingUtil.executeInNewSpan("ofs delete", + () -> deleteInSpan(f, recursive)); + } + + private boolean deleteInSpan(Path f, boolean recursive) throws IOException { incrementCounter(Statistic.INVOCATION_DELETE, 1); statistics.incrementWriteOps(1); LOG.debug("Delete path {} - recursive {}", f, recursive); @@ -889,7 +914,8 @@ private boolean o3Exists(final Path f) throws IOException { @Override public FileStatus[] listStatus(Path f) throws IOException { - return convertFileStatusArr(listStatusAdapter(f)); + return TracingUtil.executeInNewSpan("ofs listStatus", + () -> convertFileStatusArr(listStatusAdapter(f))); } private FileStatus[] convertFileStatusArr( @@ -946,7 +972,8 @@ public Path getWorkingDirectory() { @Override public Token getDelegationToken(String renewer) throws IOException { - return adapter.getDelegationToken(renewer); + return TracingUtil.executeInNewSpan("ofs getDelegationToken", + () -> adapter.getDelegationToken(renewer)); } /** @@ -1014,7 +1041,8 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { if (isEmpty(key)) { return false; } - return mkdir(f); + return TracingUtil.executeInNewSpan("ofs mkdirs", + () -> mkdir(f)); } @Override @@ -1025,7 +1053,8 @@ public long getDefaultBlockSize() { @Override public FileStatus getFileStatus(Path f) throws IOException { - return convertFileStatus(getFileStatusAdapter(f)); + return TracingUtil.executeInNewSpan("ofs getFileStatus", + () -> convertFileStatus(getFileStatusAdapter(f))); } public FileStatusAdapter getFileStatusAdapter(Path f) throws IOException { @@ -1096,7 +1125,8 @@ public boolean exists(Path f) throws IOException { public FileChecksum getFileChecksum(Path f, long length) throws IOException { incrementCounter(Statistic.INVOCATION_GET_FILE_CHECKSUM); String key = pathToKey(f); - return adapter.getFileChecksum(key, length); + return TracingUtil.executeInNewSpan("ofs getFileChecksum", + () -> adapter.getFileChecksum(key, length)); } @Override @@ -1508,6 +1538,11 @@ FileStatus convertFileStatus(FileStatusAdapter fileStatusAdapter) { @Override public ContentSummary getContentSummary(Path f) throws IOException { + return TracingUtil.executeInNewSpan("ofs getContentSummary", + () -> getContentSummaryInSpan(f)); + } + + private ContentSummary getContentSummaryInSpan(Path f) throws IOException { FileStatusAdapter status = getFileStatusAdapter(f); if (status.isFile()) { @@ -1583,7 +1618,8 @@ public void setTimes(Path f, long mtime, long atime) throws IOException { if (key.equals("NONE")) { throw new FileNotFoundException("File not found. path /NONE."); } - adapter.setTimes(key, mtime, atime); + TracingUtil.executeInNewSpan("ofs setTimes", + () -> adapter.setTimes(key, mtime, atime)); } protected boolean setSafeModeUtil(SafeModeAction action, @@ -1595,6 +1631,7 @@ protected boolean setSafeModeUtil(SafeModeAction action, statistics.incrementWriteOps(1); } LOG.trace("setSafeMode() action:{}", action); - return getAdapter().setSafeMode(action, isChecked); + return TracingUtil.executeInNewSpan("ofs setSafeMode", + () -> getAdapter().setSafeMode(action, isChecked)); } } diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java index 918640799c71..35ee20d56c34 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java @@ -23,6 +23,9 @@ import java.nio.ByteBuffer; import java.nio.ReadOnlyBufferException; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; @@ -30,6 +33,7 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.tracing.TracingUtil; /** * The input stream for Ozone file system. @@ -52,25 +56,40 @@ public OzoneFSInputStream(InputStream inputStream, Statistics statistics) { @Override public int read() throws IOException { - int byteRead = inputStream.read(); - if (statistics != null && byteRead >= 0) { - statistics.incrementBytesRead(1); + Span span = GlobalTracer.get() + .buildSpan("OzoneFSInputStream.read").start(); + try (Scope scope = GlobalTracer.get().activateSpan(span)) { + int byteRead = inputStream.read(); + if (statistics != null && byteRead >= 0) { + statistics.incrementBytesRead(1); + } + return byteRead; + } finally { + span.finish(); } - return byteRead; } @Override public int read(byte[] b, int off, int len) throws IOException { - int bytesRead = inputStream.read(b, off, len); - if (statistics != null && bytesRead >= 0) { - statistics.incrementBytesRead(bytesRead); + Span span = GlobalTracer.get() + .buildSpan("OzoneFSInputStream.read").start(); + try (Scope scope = GlobalTracer.get().activateSpan(span)) { + span.setTag("offset", off) + .setTag("length", len); + int bytesRead = inputStream.read(b, off, len); + if (statistics != null && bytesRead >= 0) { + statistics.incrementBytesRead(bytesRead); + } + return bytesRead; + } finally { + span.finish(); } - return bytesRead; } @Override public synchronized void close() throws IOException { - inputStream.close(); + TracingUtil.executeInNewSpan("OzoneFSInputStream.close", + inputStream::close); } @Override @@ -101,6 +120,11 @@ public int available() throws IOException { */ @Override public int read(ByteBuffer buf) throws IOException { + return TracingUtil.executeInNewSpan("OzoneFSInputStream.read(ByteBuffer)", + () -> readInTrace(buf)); + } + + private int readInTrace(ByteBuffer buf) throws IOException { if (buf.isReadOnly()) { throw new ReadOnlyBufferException(); } diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java index 141a40469419..c5f62d6f68ba 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java @@ -18,7 +18,10 @@ package org.apache.hadoop.fs.ozone; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import java.io.IOException; @@ -42,17 +45,24 @@ public OzoneFSOutputStream(OzoneOutputStream outputStream) { @Override public void write(int b) throws IOException { - outputStream.write(b); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.write", + () -> outputStream.write(b)); } @Override public void write(byte[] b, int off, int len) throws IOException { - outputStream.write(b, off, len); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.write", + () -> { + Span span = GlobalTracer.get().activeSpan(); + span.setTag("length", len); + outputStream.write(b, off, len); + }); } @Override public synchronized void flush() throws IOException { - outputStream.flush(); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.flush", + outputStream::flush); } @Override @@ -67,7 +77,8 @@ public void hflush() throws IOException { @Override public void hsync() throws IOException { - outputStream.hsync(); + TracingUtil.executeInNewSpan("OzoneFSOutputStream.hsync", + outputStream::hsync); } protected OzoneOutputStream getWrappedOutputStream() { diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index 7561e20a875d..c377128d2940 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.ozone; +import io.opentracing.util.GlobalTracer; import org.apache.hadoop.fs.LeaseRecoverable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.SafeMode; @@ -29,6 +30,7 @@ import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.security.token.DelegationTokenIssuer; import java.io.IOException; @@ -124,6 +126,11 @@ public boolean hasPathCapability(final Path path, final String capability) @Override public boolean recoverLease(final Path f) throws IOException { + return TracingUtil.executeInNewSpan("ofs recoverLease", + () -> recoverLeaseTraced(f)); + } + private boolean recoverLeaseTraced(final Path f) throws IOException { + GlobalTracer.get().activeSpan().setTag("path", f.toString()); statistics.incrementWriteOps(1); LOG.trace("recoverLease() path:{}", f); Path qualifiedPath = makeQualified(f); @@ -133,6 +140,12 @@ public boolean recoverLease(final Path f) throws IOException { @Override public boolean isFileClosed(Path f) throws IOException { + return TracingUtil.executeInNewSpan("ofs isFileClosed", + () -> isFileClosedTraced(f)); + } + + private boolean isFileClosedTraced(Path f) throws IOException { + GlobalTracer.get().activeSpan().setTag("path", f.toString()); statistics.incrementWriteOps(1); LOG.trace("isFileClosed() path:{}", f); Path qualifiedPath = makeQualified(f);