Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
Expand Down Expand Up @@ -47,7 +48,6 @@
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.util.StringUtils;
import org.json.simple.JSONArray;
Expand All @@ -73,7 +73,22 @@
* FileSystem operation executors used by {@link HttpFSServer}.
*/
@InterfaceAudience.Private
public class FSOperations {
public final class FSOperations {

private static int bufferSize = 4096;

private FSOperations() {
// not called
}
/**
* Set the buffer size. The size is set during the initialization of
* HttpFSServerWebApp.
* @param conf the configuration to get the bufferSize
*/
public static void setBufferSize(Configuration conf) {
bufferSize = conf.getInt(HTTPFS_BUFFER_SIZE_KEY,
HTTP_BUFFER_SIZE_DEFAULT);
}

/**
* @param fileStatus a FileStatus object
Expand Down Expand Up @@ -436,10 +451,9 @@ public FSAppend(InputStream is, String path) {
*/
@Override
public Void execute(FileSystem fs) throws IOException {
int bufferSize = fs.getConf().getInt("httpfs.buffer.size", 4096);
OutputStream os = fs.append(path, bufferSize);
IOUtils.copyBytes(is, os, bufferSize, true);
os.close();
long bytes = copyBytes(is, os);
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
return null;
}

Expand Down Expand Up @@ -522,6 +536,7 @@ public FSTruncate(String path, long newLength) {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
boolean result = fs.truncate(path, newLength);
HttpFSServerWebApp.get().getMetrics().incrOpsTruncate();
return toJSON(
StringUtils.toLowerCase(HttpFSFileSystem.TRUNCATE_JSON), result);
}
Expand Down Expand Up @@ -638,16 +653,65 @@ public Void execute(FileSystem fs) throws IOException {
fsPermission = FsCreateModes.create(fsPermission,
new FsPermission(unmaskedPermission));
}
int bufferSize = fs.getConf().getInt(HTTPFS_BUFFER_SIZE_KEY,
HTTP_BUFFER_SIZE_DEFAULT);
OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null);
IOUtils.copyBytes(is, os, bufferSize, true);
os.close();
long bytes = copyBytes(is, os);
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
return null;
}

}

/**
* These copyBytes methods combines the two different flavors used originally.
* One with length and another one with buffer size.
* In this impl, buffer size is determined internally, which is a singleton
* normally set during initialization.
* @param in the inputStream
* @param out the outputStream
* @return the totalBytes
* @throws IOException the exception to be thrown.
*/
public static long copyBytes(InputStream in, OutputStream out)
throws IOException {
return copyBytes(in, out, Long.MAX_VALUE);
}

public static long copyBytes(InputStream in, OutputStream out, long count)
throws IOException {
long totalBytes = 0;

// If bufferSize is not initialized use 4k. This will not happen
// if all callers check and set it.
byte[] buf = new byte[bufferSize];
long bytesRemaining = count;
int bytesRead;

try {
while (bytesRemaining > 0) {
int bytesToRead = (int)
(bytesRemaining < buf.length ? bytesRemaining : buf.length);

bytesRead = in.read(buf, 0, bytesToRead);
if (bytesRead == -1) {
break;
}

out.write(buf, 0, bytesRead);
bytesRemaining -= bytesRead;
totalBytes += bytesRead;
}
return totalBytes;
} finally {
// Originally IOUtils.copyBytes() were called with close=true. So we are
// implementing the same behavior here.
try {
in.close();
} finally {
out.close();
}
}
}

/**
* Executor that performs a delete FileSystemAccess files system operation.
*/
Expand Down Expand Up @@ -680,6 +744,7 @@ public FSDelete(String path, boolean recursive) {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
boolean deleted = fs.delete(path, recursive);
HttpFSServerWebApp.get().getMetrics().incrOpsDelete();
return toJSON(
StringUtils.toLowerCase(HttpFSFileSystem.DELETE_JSON), deleted);
}
Expand Down Expand Up @@ -748,6 +813,7 @@ public FSFileStatus(String path) {
@Override
public Map execute(FileSystem fs) throws IOException {
FileStatus status = fs.getFileStatus(path);
HttpFSServerWebApp.get().getMetrics().incrOpsStat();
return toJson(status);
}

Expand Down Expand Up @@ -776,7 +842,6 @@ public JSONObject execute(FileSystem fs) throws IOException {
json.put(HttpFSFileSystem.HOME_DIR_JSON, homeDir.toUri().getPath());
return json;
}

}

/**
Expand Down Expand Up @@ -814,6 +879,7 @@ public FSListStatus(String path, String filter) throws IOException {
@Override
public Map execute(FileSystem fs) throws IOException {
FileStatus[] fileStatuses = fs.listStatus(path, filter);
HttpFSServerWebApp.get().getMetrics().incrOpsListing();
return toJson(fileStatuses, fs.getFileStatus(path).isFile());
}

Expand Down Expand Up @@ -905,6 +971,7 @@ public JSONObject execute(FileSystem fs) throws IOException {
new FsPermission(unmaskedPermission));
}
boolean mkdirs = fs.mkdirs(path, fsPermission);
HttpFSServerWebApp.get().getMetrics().incrOpsMkdir();
return toJSON(HttpFSFileSystem.MKDIRS_JSON, mkdirs);
}

Expand Down Expand Up @@ -937,8 +1004,8 @@ public FSOpen(String path) {
*/
@Override
public InputStream execute(FileSystem fs) throws IOException {
int bufferSize = HttpFSServerWebApp.get().getConfig().getInt(
HTTPFS_BUFFER_SIZE_KEY, HTTP_BUFFER_SIZE_DEFAULT);
// Only updating ops count. bytesRead is updated in InputStreamEntity
HttpFSServerWebApp.get().getMetrics().incrOpsOpen();
return fs.open(path, bufferSize);
}

Expand Down Expand Up @@ -976,6 +1043,7 @@ public FSRename(String path, String toPath) {
@Override
public JSONObject execute(FileSystem fs) throws IOException {
boolean renamed = fs.rename(path, toPath);
HttpFSServerWebApp.get().getMetrics().incrOpsRename();
return toJSON(HttpFSFileSystem.RENAME_JSON, renamed);
}

Expand Down Expand Up @@ -1944,6 +2012,7 @@ public Void execute(FileSystem fs) throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.access(path, mode);
HttpFSServerWebApp.get().getMetrics().incrOpsCheckAccess();
} else {
throw new UnsupportedOperationException("checkaccess is "
+ "not supported for HttpFs on " + fs.getClass()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
import org.apache.hadoop.lib.server.ServerException;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.servlet.ServerWebApp;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.util.JvmPauseMonitor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,6 +60,7 @@ public class HttpFSServerWebApp extends ServerWebApp {
public static final String CONF_ADMIN_GROUP = "admin.group";

private static HttpFSServerWebApp SERVER;
private static HttpFSServerMetrics metrics;

private String adminGroup;

Expand Down Expand Up @@ -102,6 +107,7 @@ public void init() throws ServerException {
LOG.info("Connects to Namenode [{}]",
get().get(FileSystemAccess.class).getFileSystemConfiguration().
getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
setMetrics(getConfig());
}

/**
Expand All @@ -110,9 +116,22 @@ public void init() throws ServerException {
@Override
public void destroy() {
SERVER = null;
if (metrics != null) {
metrics.shutdown();
}
super.destroy();
}

private static void setMetrics(Configuration config) {
LOG.info("Initializing HttpFSServerMetrics");
metrics = HttpFSServerMetrics.create(config, "HttpFSServer");
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(config);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
FSOperations.setBufferSize(config);
DefaultMetricsSystem.initialize("HttpFSServer");
}
/**
* Returns HttpFSServer server singleton, configuration and services are
* accessible through it.
Expand All @@ -123,6 +142,14 @@ public static HttpFSServerWebApp get() {
return SERVER;
}

/**
* gets the HttpFSServerMetrics instance.
* @return the HttpFSServerMetrics singleton.
*/
public static HttpFSServerMetrics getMetrics() {
return metrics;
}

/**
* Returns HttpFSServer admin group.
*
Expand Down
Loading