Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11577
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Oct 31, 2023
2 parents 3e2471d + f1ce273 commit 1b464f2
Show file tree
Hide file tree
Showing 74 changed files with 1,953 additions and 357 deletions.
88 changes: 44 additions & 44 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -257,36 +257,36 @@ io.grpc:grpc-netty:1.26.0
io.grpc:grpc-protobuf:1.26.0
io.grpc:grpc-protobuf-lite:1.26.0
io.grpc:grpc-stub:1.26.0
io.netty:netty-all:4.1.94.Final
io.netty:netty-buffer:4.1.94.Final
io.netty:netty-codec:4.1.94.Final
io.netty:netty-codec-dns:4.1.94.Final
io.netty:netty-codec-haproxy:4.1.94.Final
io.netty:netty-codec-http:4.1.94.Final
io.netty:netty-codec-http2:4.1.94.Final
io.netty:netty-codec-memcache:4.1.94.Final
io.netty:netty-codec-mqtt:4.1.94.Final
io.netty:netty-codec-redis:4.1.94.Final
io.netty:netty-codec-smtp:4.1.94.Final
io.netty:netty-codec-socks:4.1.94.Final
io.netty:netty-codec-stomp:4.1.94.Final
io.netty:netty-codec-xml:4.1.94.Final
io.netty:netty-common:4.1.94.Final
io.netty:netty-handler:4.1.94.Final
io.netty:netty-handler-proxy:4.1.94.Final
io.netty:netty-resolver:4.1.94.Final
io.netty:netty-resolver-dns:4.1.94.Final
io.netty:netty-transport:4.1.94.Final
io.netty:netty-transport-rxtx:4.1.94.Final
io.netty:netty-transport-sctp:4.1.94.Final
io.netty:netty-transport-udt:4.1.94.Final
io.netty:netty-transport-classes-epoll:4.1.94.Final
io.netty:netty-transport-native-unix-common:4.1.94.Final
io.netty:netty-transport-classes-kqueue:4.1.94.Final
io.netty:netty-resolver-dns-classes-macos:4.1.94.Final
io.netty:netty-transport-native-epoll:4.1.94.Final
io.netty:netty-transport-native-kqueue:4.1.94.Final
io.netty:netty-resolver-dns-native-macos:4.1.94.Final
io.netty:netty-all:4.1.100.Final
io.netty:netty-buffer:4.1.100.Final
io.netty:netty-codec:4.1.100.Final
io.netty:netty-codec-dns:4.1.100.Final
io.netty:netty-codec-haproxy:4.1.100.Final
io.netty:netty-codec-http:4.1.100.Final
io.netty:netty-codec-http2:4.1.100.Final
io.netty:netty-codec-memcache:4.1.100.Final
io.netty:netty-codec-mqtt:4.1.100.Final
io.netty:netty-codec-redis:4.1.100.Final
io.netty:netty-codec-smtp:4.1.100.Final
io.netty:netty-codec-socks:4.1.100.Final
io.netty:netty-codec-stomp:4.1.100.Final
io.netty:netty-codec-xml:4.1.100.Final
io.netty:netty-common:4.1.100.Final
io.netty:netty-handler:4.1.100.Final
io.netty:netty-handler-proxy:4.1.100.Final
io.netty:netty-resolver:4.1.100.Final
io.netty:netty-resolver-dns:4.1.100.Final
io.netty:netty-transport:4.1.100.Final
io.netty:netty-transport-rxtx:4.1.100.Final
io.netty:netty-transport-sctp:4.1.100.Final
io.netty:netty-transport-udt:4.1.100.Final
io.netty:netty-transport-classes-epoll:4.1.100.Final
io.netty:netty-transport-native-unix-common:4.1.100.Final
io.netty:netty-transport-classes-kqueue:4.1.100.Final
io.netty:netty-resolver-dns-classes-macos:4.1.100.Final
io.netty:netty-transport-native-epoll:4.1.100.Final
io.netty:netty-transport-native-kqueue:4.1.100.Final
io.netty:netty-resolver-dns-native-macos:4.1.100.Final
io.opencensus:opencensus-api:0.12.3
io.opencensus:opencensus-contrib-grpc-metrics:0.12.3
io.reactivex:rxjava:1.3.8
Expand Down Expand Up @@ -339,20 +339,20 @@ org.apache.solr:solr-solrj:8.11.2
org.apache.yetus:audience-annotations:0.5.0
org.apache.zookeeper:zookeeper:3.7.2
org.codehaus.jettison:jettison:1.5.4
org.eclipse.jetty:jetty-annotations:9.4.51.v20230217
org.eclipse.jetty:jetty-http:9.4.51.v20230217
org.eclipse.jetty:jetty-io:9.4.51.v20230217
org.eclipse.jetty:jetty-jndi:9.4.51.v20230217
org.eclipse.jetty:jetty-plus:9.4.51.v20230217
org.eclipse.jetty:jetty-security:9.4.51.v20230217
org.eclipse.jetty:jetty-server:9.4.51.v20230217
org.eclipse.jetty:jetty-servlet:9.4.51.v20230217
org.eclipse.jetty:jetty-util:9.4.51.v20230217
org.eclipse.jetty:jetty-util-ajax:9.4.51.v20230217
org.eclipse.jetty:jetty-webapp:9.4.51.v20230217
org.eclipse.jetty:jetty-xml:9.4.51.v20230217
org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.51.v20230217
org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.51.v20230217
org.eclipse.jetty:jetty-annotations:9.4.53.v20231009
org.eclipse.jetty:jetty-http:9.4.53.v20231009
org.eclipse.jetty:jetty-io:9.4.53.v20231009
org.eclipse.jetty:jetty-jndi:9.4.53.v20231009
org.eclipse.jetty:jetty-plus:9.4.53.v20231009
org.eclipse.jetty:jetty-security:9.4.53.v20231009
org.eclipse.jetty:jetty-server:9.4.53.v20231009
org.eclipse.jetty:jetty-servlet:9.4.53.v20231009
org.eclipse.jetty:jetty-util:9.4.53.v20231009
org.eclipse.jetty:jetty-util-ajax:9.4.53.v20231009
org.eclipse.jetty:jetty-webapp:9.4.53.v20231009
org.eclipse.jetty:jetty-xml:9.4.53.v20231009
org.eclipse.jetty.websocket:javax-websocket-client-impl:9.4.53.v20231009
org.eclipse.jetty.websocket:javax-websocket-server-impl:9.4.53.v20231009
org.ehcache:ehcache:3.3.1
org.ini4j:ini4j:0.5.4
org.lz4:lz4-java:1.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ public class CommonConfigurationKeysPublic {
"ipc.server.log.slow.rpc";
public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;

public static final String IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY =
"ipc.server.log.slow.rpc.threshold.ms";
public static final long IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT = 0;

public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY =
"ipc.server.purge.interval";
public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ public final class StoreStatisticNames {
public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
"object_multipart_aborted";

/**
* Object multipart list request.
* Value :{@value}.
*/
public static final String OBJECT_MULTIPART_UPLOAD_LIST =
"object_multipart_list";

/**
* Object put/multipart upload count.
* Value :{@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ public Void run() throws Exception {
private void doGracefulFailover()
throws ServiceFailedException, IOException, InterruptedException {
int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
Preconditions.checkArgument(timeout >= 0, "timeout should be non-negative.");

// Phase 1: pre-flight checks
checkEligibleForFailover();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,22 @@ protected ResponseBuffer initialValue() {
private final long metricsUpdaterInterval;
private final ScheduledExecutorService scheduledExecutorService;

private boolean logSlowRPC = false;
private volatile boolean logSlowRPC = false;
/** Threshold time for log slow rpc. */
private volatile long logSlowRPCThresholdTime;

/**
* Checks if LogSlowRPC is set true.
* @return true, if LogSlowRPC is set true, false, otherwise.
*/
protected boolean isLogSlowRPC() {
public boolean isLogSlowRPC() {
return logSlowRPC;
}

public long getLogSlowRPCThresholdTime() {
return logSlowRPCThresholdTime;
}

public int getNumInProcessHandler() {
return numInProcessHandler.get();
}
Expand All @@ -543,10 +549,16 @@ public long getTotalRequestsPerSecond() {
* @param logSlowRPCFlag input logSlowRPCFlag.
*/
@VisibleForTesting
protected void setLogSlowRPC(boolean logSlowRPCFlag) {
public void setLogSlowRPC(boolean logSlowRPCFlag) {
this.logSlowRPC = logSlowRPCFlag;
}

@VisibleForTesting
public void setLogSlowRPCThresholdTime(long logSlowRPCThresholdMs) {
this.logSlowRPCThresholdTime = rpcMetrics.getMetricsTimeUnit().
convert(logSlowRPCThresholdMs, TimeUnit.MILLISECONDS);
}

private void setPurgeIntervalNanos(int purgeInterval) {
int tmpPurgeInterval = CommonConfigurationKeysPublic.
IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT;
Expand All @@ -568,12 +580,15 @@ public long getPurgeIntervalNanos() {
* @param methodName - RPC Request method name
* @param details - Processing Detail.
*
* if this request took too much time relative to other requests
* we consider that as a slow RPC. 3 is a magic number that comes
* from 3 sigma deviation. A very simple explanation can be found
* by searching for 68-95-99.7 rule. We flag an RPC as slow RPC
* if and only if it falls above 99.7% of requests. We start this logic
* only once we have enough sample size.
* If a request took significant more time than other requests,
* and its processing time is at least `logSlowRPCThresholdMs` we consider that as a slow RPC.
*
* The definition rules for calculating whether the current request took too much time
* compared to other requests are as follows:
* 3 is a magic number that comes from 3 sigma deviation.
* A very simple explanation can be found by searching for 68-95-99.7 rule.
* We flag an RPC as slow RPC if and only if it falls above 99.7% of requests.
* We start this logic only once we have enough sample size.
*/
void logSlowRpcCalls(String methodName, Call call,
ProcessingDetails details) {
Expand All @@ -587,15 +602,14 @@ void logSlowRpcCalls(String methodName, Call call,
final double threeSigma = rpcMetrics.getProcessingMean() +
(rpcMetrics.getProcessingStdDev() * deviation);

long processingTime =
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
final TimeUnit metricsTimeUnit = rpcMetrics.getMetricsTimeUnit();
long processingTime = details.get(Timing.PROCESSING, metricsTimeUnit);
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
LOG.warn(
"Slow RPC : {} took {} {} to process from client {},"
+ " the processing detail is {}",
methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
details.toString());
(processingTime > threeSigma) &&
(processingTime > getLogSlowRPCThresholdTime())) {
LOG.warn("Slow RPC : {} took {} {} to process from client {}, the processing detail is {}," +
" and the threshold time is {} {}.", methodName, processingTime, metricsTimeUnit,
call, details.toString(), getLogSlowRPCThresholdTime(), metricsTimeUnit);
rpcMetrics.incrSlowRpc();
}
}
Expand Down Expand Up @@ -3359,6 +3373,10 @@ protected Server(String bindAddress, int port,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));

this.setLogSlowRPCThresholdTime(conf.getLong(
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT));

this.setPurgeIntervalNanos(conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,15 @@ The switch to turn S3A auditing on or off.
</description>
</property>

<property>
<name>ipc.server.log.slow.rpc.threshold.ms</name>
<value>0</value>
<description>The threshold in milliseconds for logging slow rpc when ipc.server.log.slow.rpc is enabled.
Besides of being much slower than other RPC requests, an RPC request has to take at least the threshold value
defined by this property before it can be considered as slow. By default, this threshold is set to 0 (disabled).
</description>
</property>

<property>
<name>ipc.server.purge.interval</name>
<value>15</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ be used as evidence at the inquest as proof that they made a
conscious decision to choose speed over safety and
that the outcome was their own fault.

Note: the option can be set for an entire filesystem. Again, the safety checks
are there to more closely match the semantics of a classic filesystem,
and to reduce the likelihood that the object store ends up in a state which
diverges so much from the classic directory + tree structur that applications
get confused.

Accordingly: *Use if and only if you are confident that the conditions are met.*

### `fs.s3a.create.header` User-supplied header support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ public void testLogSlowRPC() throws IOException, ServiceException,
TimeoutException, InterruptedException {
//No test with legacy
assumeFalse(testWithLegacy);
server.setLogSlowRPCThresholdTime(SLEEP_DURATION);
TestRpcService2 client = getClient2();
// make 10 K fast calls
for (int x = 0; x < 10000; x++) {
Expand All @@ -370,7 +371,13 @@ public void testLogSlowRPC() throws IOException, ServiceException,
assertThat(rpcMetrics.getProcessingSampleCount()).isGreaterThan(999L);
long before = rpcMetrics.getRpcSlowCalls();

// make a really slow call. Sleep sleeps for 1000ms
// Sleep sleeps for 500ms(less than `logSlowRPCThresholdTime`),
// make sure we never called into Log slow RPC routine.
client.sleep(null, newSleepRequest(SLEEP_DURATION / 2));
long after = rpcMetrics.getRpcSlowCalls();
assertThat(before).isEqualTo(after);

// Make a really slow call. Sleep sleeps for 3000ms.
client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));

// Ensure slow call is logged.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ boolean doWaitForRestart() {
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private final Object nodesLock = new Object();
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null;
Expand Down Expand Up @@ -619,7 +620,9 @@ private void setPipeline(LocatedBlock lb) {

private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
String[] storageIDs) {
this.nodes = nodes;
synchronized (nodesLock) {
this.nodes = nodes;
}
this.storageTypes = storageTypes;
this.storageIDs = storageIDs;
}
Expand Down Expand Up @@ -916,7 +919,10 @@ void waitForAckedSeqno(long seqno) throws IOException {
try (TraceScope ignored = dfsClient.getTracer().
newScope("waitForAckedSeqno")) {
LOG.debug("{} waiting for ack for: {}", this, seqno);
int dnodes = nodes != null ? nodes.length : 3;
int dnodes;
synchronized (nodesLock) {
dnodes = nodes != null ? nodes.length : 3;
}
int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
long begin = Time.monotonicNow();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;

public static final String DFS_NAMENODE_CRM_CHECKLOCKTIME_ENABLE =
"dfs.namenode.crm.checklocktime.enable";
public static final boolean DFS_NAMENODE_CRM_CHECKLOCKTIME_DEFAULT = false;

public static final String DFS_NAMENODE_CRM_MAXLOCKTIME_MS =
"dfs.namenode.crm.maxlocktime.ms";
public static final long DFS_NAMENODE_CRM_MAXLOCKTIME_MS_DEFAULT = 1000;

public static final String DFS_NAMENODE_CRM_SLEEP_TIME_MS =
"dfs.namenode.crm.sleeptime.ms";
public static final long DFS_NAMENODE_CRM_SLEEP_TIME_MS_DEFAULT = 300;

public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT =
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public void close() {
// making any more calls after this point (eg clear the queue)
RPC.stopProxy(proxy);
}
metrics.unregister();
}

protected QJournalProtocol getProxy() throws IOException {
Expand Down
Loading

0 comments on commit 1b464f2

Please sign in to comment.