Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,51 @@ public class OzoneClientConfig {
private static final Logger LOG =
LoggerFactory.getLogger(OzoneClientConfig.class);

public static final String OZONE_READ_SHORT_CIRCUIT = "ozone.client.read.short-circuit";
public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false;
Comment on lines +42 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe this is redundant; See: shortCircuitEnabled.

Suggested change
public static final String OZONE_READ_SHORT_CIRCUIT = "ozone.client.read.short-circuit";
public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false;

public static final String OZONE_DOMAIN_SOCKET_PATH = "ozone.domain.socket.path";
public static final String OZONE_DOMAIN_SOCKET_PATH_DEFAULT = "/var/lib/ozone/dn_socket";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document this property somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add a document about this feature. Will explain all these new properties there.

public static final String SHORT_CIRCUIT_PREFIX = "read.short-circuit.";
public static final int DATA_TRANSFER_VERSION = 28;

@Config(key = "read.short-circuit",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Whether read short-circuit is enabled or not",
tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
private boolean shortCircuitEnabled = OZONE_READ_SHORT_CIRCUIT_DEFAULT;

@Config(key = SHORT_CIRCUIT_PREFIX + "buffer.size",
defaultValue = "128KB",
type = ConfigType.SIZE,
description = "Buffer size of reader/writer.",
tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
private int shortCircuitBufferSize = 128 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious to know how the default value is determined.

The HDFS "dfs.client.read.shortcircuit.buffer.size" which is similar, is 1MB in size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The short-circuit channel only exchange getBlock request and response. The value is determined by how big will a request and a response of a 256MB block size, 4MB chunk size, 16KB checksum size block. The request is round 500 bytes, and the response is around
30 + 53 * 64 + 6 * 16384 ~ 100k
block size (exclude chunks) - 30 bytes
chunk size (one checksums) - 53 bytes
one checksum size - 6 bytes


@Config(key = SHORT_CIRCUIT_PREFIX + "disable.interval",
defaultValue = "600",
type = ConfigType.LONG,
description = "If some unknown IO error happens on Domain socket read, short circuit read will be disabled " +
"temporary for this period of time(seconds).",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
"temporary for this period of time(seconds).",
"temporarily for this period of time(seconds).",

tags = { ConfigTag.CLIENT })
private long shortCircuitReadDisableInterval = 60 * 10;

public long getShortCircuitReadDisableInterval() {
return shortCircuitReadDisableInterval;
}

public void setShortCircuitReadDisableInterval(long value) {
shortCircuitReadDisableInterval = value;
}

public int getShortCircuitBufferSize() {
return shortCircuitBufferSize;
}

public void setShortCircuitBufferSize(int size) {
this.shortCircuitBufferSize = size;
}

/**
* Enum for indicating what mode to use when combining chunk and block
* checksums to define an aggregate FileChecksum. This should be considered
Expand Down Expand Up @@ -558,4 +603,12 @@ public void setMaxConcurrentWritePerKey(int maxConcurrentWritePerKey) {
public int getMaxConcurrentWritePerKey() {
return this.maxConcurrentWritePerKey;
}

public boolean isShortCircuitEnabled() {
return shortCircuitEnabled;
}

public void setShortCircuit(boolean enabled) {
shortCircuitEnabled = enabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.DomainSocketFactory;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
Expand All @@ -41,6 +43,8 @@ public static void enableErrorInjection(ErrorInjector injector) {
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
private final boolean securityEnabled;
private boolean shortCircuitEnabled;
private DomainSocketFactory domainSocketFactory;

public XceiverClientCreator(ConfigurationSource conf) {
this(conf, null);
Expand All @@ -56,13 +60,26 @@ public XceiverClientCreator(ConfigurationSource conf, ClientTrustManager trustMa
if (securityEnabled) {
Preconditions.checkNotNull(trustManager);
}
shortCircuitEnabled = conf.getBoolean(OzoneClientConfig.OZONE_READ_SHORT_CIRCUIT,
OzoneClientConfig.OZONE_READ_SHORT_CIRCUIT_DEFAULT);
if (shortCircuitEnabled) {
domainSocketFactory = DomainSocketFactory.getInstance(conf);
}
}

public boolean isSecurityEnabled() {
return securityEnabled;
}

public boolean isShortCircuitEnabled() {
return shortCircuitEnabled && domainSocketFactory.isServiceReady();
}

protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
return newClient(pipeline, null);
}

protected XceiverClientSpi newClient(Pipeline pipeline, DatanodeDetails dn) throws IOException {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
Expand All @@ -74,6 +91,9 @@ protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
case EC:
client = new ECXceiverClientGrpc(pipeline, conf, trustManager);
break;
case SHORT_CIRCUIT:
client = new XceiverClientShortCircuit(pipeline, conf, dn);
break;
case CHAINED:
default:
throw new IOException("not implemented " + pipeline.getType());
Expand All @@ -97,7 +117,14 @@ public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClie
}

@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline) throws IOException {
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline);
}

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline);
}

Expand All @@ -108,7 +135,7 @@ public void releaseClientForReadData(XceiverClientSpi xceiverClient, boolean inv

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware) throws IOException {
return newClient(pipeline);
return newClient(pipeline, null);
}

@Override
Expand All @@ -117,7 +144,10 @@ public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClie
}

@Override
public void close() throws Exception {
// clients are not tracked, closing each client is the responsibility of users of this class
public void close() {
// clients are not tracked, closing each client is the responsibility of users of this classclass
if (domainSocketFactory != null) {
domainSocketFactory.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public interface XceiverClientFactory extends AutoCloseable {
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException;
default XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException {
return acquireClientForReadData(pipeline, false);
}

/**
* Releases a read XceiverClientSpi after use.
Expand All @@ -73,10 +75,20 @@ void releaseClientForReadData(XceiverClientSpi client,
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware)
default XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware) throws IOException {
return acquireClient(pipeline, topologyAware, false);
}

XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException;

XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException;

void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient,
boolean topologyAware);

default boolean isShortCircuitEnabled() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
* how it works, and how it is integrated with the Ozone client.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline;
private final ConfigurationSource config;
Expand Down Expand Up @@ -133,6 +133,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
this.trustManager = trustManager;
this.getBlockDNcache = new ConcurrentHashMap<>();
LOG.info("{} is created", XceiverClientGrpc.class.getSimpleName());
}

/**
Expand Down Expand Up @@ -246,6 +247,10 @@ public synchronized void close() {
}
}

public boolean isClosed() {
return closed;
}

@Override
public Pipeline getPipeline() {
return pipeline;
Expand Down
Loading