Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class RouterRpcClient {
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");

private static final String CLIENT_IP_STR = "clientIp";
private static final String CLIENT_PORT_STR = "clientPort";

/** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
Expand Down Expand Up @@ -465,7 +466,7 @@ private Object invokeMethod(
+ router.getRouterId());
}

appendClientIpToCallerContextIfAbsent();
appendClientIpPortToCallerContextIfAbsent();

Object ret = null;
if (rpcMonitor != null) {
Expand Down Expand Up @@ -586,25 +587,20 @@ private Object invokeMethod(

/**
* For tracking which is the actual client address.
* It adds trace info "clientIp:ip" to caller context if it's absent.
* It adds trace info "clientIp:ip" and "clientPort:port"
* to caller context if they are absent.
*/
private void appendClientIpToCallerContextIfAbsent() {
String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress();
final CallerContext ctx = CallerContext.getCurrent();
if (isClientIpInfoAbsent(clientIpInfo, ctx)) {
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.append(clientIpInfo)
.setSignature(origSignature)
.build());
}
}

private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientIpInfo);
private void appendClientIpPortToCallerContextIfAbsent() {
CallerContext ctx = CallerContext.getCurrent();
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.appendIfAbsent(CLIENT_IP_STR, Server.getRemoteAddress())
.appendIfAbsent(CLIENT_PORT_STR,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note, we allow the line length up to 100 after HADOOP-17813.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note, we allow the line length up to 100 after HADOOP-17813.

Thank you for reminding me. This is good.

Integer.toString(Server.getRemotePort()))
.setSignature(origSignature)
.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1967,4 +1967,39 @@ public void testSetBalancerBandwidth() throws Exception {
return datanodes.get(0).getBalancerBandwidth() == newBandwidth;
}, 100, 60 * 1000);
}

@Test
public void testAddClientIpPortToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);

// 1. ClientIp and ClientPort are not set on the client.
// Set client context.
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());

// Create a directory via the router.
String dirPath = "/test";
routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);

// The audit log should contains "clientIp:" and "clientPort:".
assertTrue(auditLog.getOutput().contains("clientIp:"));
assertTrue(auditLog.getOutput().contains("clientPort:"));
assertTrue(verifyFileExists(routerFS, dirPath));
auditLog.clearOutput();

// 2. ClientIp and ClientPort are set on the client.
// Reset client context.
CallerContext.setCurrent(
new CallerContext.Builder(
"clientContext,clientIp:1.1.1.1,clientPort:1234").build());

// Create a directory via the router.
routerProtocol.getFileInfo(dirPath);

// The audit log should contains the original clientIp and clientPort
// set by client.
assertTrue(auditLog.getOutput().contains("clientIp:1.1.1.1"));
assertTrue(auditLog.getOutput().contains("clientPort:1234"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ private void logAuditEvent(boolean succeeded,

private void appendClientPortToCallerContextIfAbsent() {
final CallerContext ctx = CallerContext.getCurrent();
if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(),
ctx)) {
if (isClientPortInfoAbsent(ctx)) {
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
Expand All @@ -474,9 +473,9 @@ private void appendClientPortToCallerContextIfAbsent() {
}
}

private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){
private boolean isClientPortInfoAbsent(CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientPortInfo);
|| !ctx.getContext().contains(CLIENT_PORT_STR);
}

/**
Expand Down