Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,20 @@ public static int getCallRetryCount() {
}

/** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
* Returns null in case of an error.
*/
public static InetAddress getRemoteIp() {
Call call = CurCall.get();
return (call != null ) ? call.getHostInetAddress() : null;
return (call != null) ? call.getHostInetAddress() : null;
}

/**
* Returns the remote side port when invoked inside an RPC
* Returns 0 in case of an error.
*/
public static int getRemotePort() {
Call call = CurCall.get();
return (call != null) ? call.getRemotePort() : 0;
}

/**
Expand Down Expand Up @@ -409,7 +418,7 @@ public static byte[] getClientId() {
Call call = CurCall.get();
return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
}

/** Returns remote address as a string when invoked inside an RPC.
* Returns null in case of an error.
*/
Expand Down Expand Up @@ -447,15 +456,15 @@ public static int getPriorityLevel() {
return call != null? call.getPriorityLevel() : 0;
}

private String bindAddress;
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
private int readThreads; // number of read threads
private int readerPendingConnectionQueue; // number of connections to queue per read thread
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
final protected RpcMetrics rpcMetrics;
final protected RpcDetailedMetrics rpcDetailedMetrics;

private Configuration conf;
private String portRangeConfig = null;
private SecretManager<TokenIdentifier> secretManager;
Expand Down Expand Up @@ -973,6 +982,9 @@ public UserGroupInformation getRemoteUser() {
public InetAddress getHostInetAddress() {
return null;
}
public int getRemotePort() {
return 0;
}
public String getHostAddress() {
InetAddress addr = getHostInetAddress();
return (addr != null) ? addr.getHostAddress() : null;
Expand Down Expand Up @@ -1130,6 +1142,11 @@ public InetAddress getHostInetAddress() {
return connection.getHostInetAddress();
}

@Override
public int getRemotePort() {
return connection.getRemotePort();
}

@Override
public Void run() throws Exception {
if (!connection.channel.isOpen()) {
Expand Down Expand Up @@ -2011,6 +2028,10 @@ public int getIngressPort() {
return ingressPort;
}

public int getRemotePort() {
return remotePort;
}

public InetAddress getHostInetAddress() {
return addr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.metrics.logger.period.seconds";
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
600;
public static final String DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY =
"dfs.namenode.audit.log.with.remote.port";
public static final boolean DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT =
false;
/**
* The maximum number of getBlocks RPCs data movement utilities can make to
* a NameNode per second. Values &lt;= 0 disable throttling. This affects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ public static String getRemoteAddr(HttpServletRequest request) {
return remoteAddr;
}

public static int getRemotePort(HttpServletRequest request) {
return request.getRemotePort();
}

/**
* Expected user name should be a short name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode;

import java.net.InetAddress;
import java.security.Principal;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;

import java.net.InetAddress;

/**
* Interface defining an audit logger.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
Expand Down Expand Up @@ -397,6 +401,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");

private static final String CLIENT_PORT_STR = "clientPort";
private final String contextFieldSeparator;

boolean isAuditEnabled() {
return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
&& !auditLoggers.isEmpty();
Expand All @@ -411,7 +418,7 @@ private void logAuditEvent(boolean succeeded, String cmd, String src,
String dst, FileStatus stat) throws IOException {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
cmd, src, dst, stat);
cmd, src, dst, stat);
}
}

Expand Down Expand Up @@ -442,6 +449,9 @@ private void logAuditEvent(boolean succeeded,
for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
if (auditLogWithRemotePort) {
appendClientPortToCallerContextIfAbsent();
}
hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst,
status, CallerContext.getCurrent(), ugi, dtSecretManager);
} else {
Expand All @@ -450,6 +460,25 @@ private void logAuditEvent(boolean succeeded,
}
}

private void appendClientPortToCallerContextIfAbsent() {
final CallerContext ctx = CallerContext.getCurrent();
if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(),
ctx)) {
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.setSignature(origSignature)
.build());
}
}

private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientPortInfo);
}

/**
* Logger for audit events, noting successful FSNamesystem operations. Emits
* to FSNamesystem.audit at INFO. Each event causes a set of tab-separated
Expand Down Expand Up @@ -501,6 +530,7 @@ private void logAuditEvent(boolean succeeded,
// underlying logger is disabled, and avoid some unnecessary work.
private final boolean isDefaultAuditLogger;
private final List<AuditLogger> auditLoggers;
private final boolean auditLogWithRemotePort;

/** The namespace tree. */
FSDirectory dir;
Expand Down Expand Up @@ -833,6 +863,12 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog(conf);
}
auditLogWithRemotePort =
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY,
DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT);
this.contextFieldSeparator =
conf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
cond = fsLock.newWriteLockCondition();
cpLock = new ReentrantLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public void doGet(HttpServletRequest request, HttpServletResponse response
@SuppressWarnings("unchecked")
final Map<String,String[]> pmap = request.getParameterMap();
final PrintWriter out = response.getWriter();
final InetAddress remoteAddress =
final InetAddress remoteAddress =
InetAddress.getByName(request.getRemoteAddr());
final ServletContext context = getServletContext();
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);

final UserGroupInformation ugi = getUGI(request, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.namenode;

import java.net.InetAddress;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;

import java.net.InetAddress;

/**
* Extension of {@link AuditLogger}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class NamenodeWebHdfsMethods {
private String scheme;
private Principal userPrincipal;
private String remoteAddr;
private int remotePort;

private @Context ServletContext context;
private @Context HttpServletResponse response;
Expand All @@ -146,6 +147,7 @@ public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request);
remotePort = JspHelper.getRemotePort(request);
supportEZ =
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
}
Expand Down Expand Up @@ -224,6 +226,10 @@ public String getHostAddress() {
return getRemoteAddr();
}
@Override
public int getRemotePort() {
return getRemotePortFromJSPHelper();
}
@Override
public InetAddress getHostInetAddress() {
try {
return InetAddress.getByName(getHostAddress());
Expand Down Expand Up @@ -254,6 +260,10 @@ protected String getRemoteAddr() {
return remoteAddr;
}

protected int getRemotePortFromJSPHelper() {
return remotePort;
}

protected void queueExternalCall(ExternalCall call)
throws IOException, InterruptedException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5056,6 +5056,14 @@
</description>
</property>

<property>
<name>dfs.namenode.audit.log.with.remote.port</name>
<value>false</value>
<description>
If true, adds a port of RPC call to callerContext for all audit log events.
</description>
</property>

<property>
<name>dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction</name>
<value>0.6</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.regex.Pattern;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
Expand All @@ -69,6 +70,7 @@
import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -89,6 +91,20 @@ public class TestAuditLogger {
}

private static final short TEST_PERMISSION = (short) 0654;
private static final Pattern AUDIT_PATTERN = Pattern.compile(
".*allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?");
private static final Pattern AUDIT_WITH_PORT_PATTERN = Pattern.compile(
".*allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?" +
"proto=.*?" +
"callerContext=.*?clientPort\\:(\\d{0,9}).*?");

@Before
public void setup() {
Expand Down Expand Up @@ -544,6 +560,45 @@ public void testBrokenLogger() throws IOException {
}
}

/**
* Test adding remote port to audit log.
*/
@Test
public void testAuditLogWithRemotePort() throws Exception {
// Audit log without remote port by default.
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster1 = new MiniDFSCluster.Builder(conf).build();
try {
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
cluster1.waitClusterUp();
FileSystem fs = cluster1.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
assertTrue(AUDIT_PATTERN.matcher(auditLog.getOutput().trim()).matches());
assertFalse(auditLog.getOutput().contains("clientPort"));
auditLog.clearOutput();
} finally {
cluster1.shutdown();
}

// Audit log with remote port.
conf.setBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY, true);
conf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf).build();
try {
LogCapturer auditLog = LogCapturer.captureLogs(FSNamesystem.auditLog);
cluster2.waitClusterUp();
FileSystem fs = cluster2.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
assertTrue(AUDIT_WITH_PORT_PATTERN.matcher(
auditLog.getOutput().trim()).matches());
auditLog.clearOutput();
} finally {
cluster2.shutdown();
}
}

public static class DummyAuditLogger implements AuditLogger {

static boolean initialized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
// allowed=(true|false) ugi=name ip=/address cmd={cmd} src={path} dst=null perm=null
static final Pattern auditPattern = Pattern.compile(
"allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=.*?\\ssrc=.*?\\sdst=null\\s" +
"perm=.*?");
static final Pattern successPattern = Pattern.compile(
".*allowed=true.*");
Expand Down
Loading