Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1132,4 +1132,6 @@ public static void copyFileUnbuffered(File src, File dst) throws IOException {

private static native void copyFileUnbuffered0(String src, String dst)
throws NativeIOException;

public static native String getDiskName(String path) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
#include "exception.h"
/*
* Throw a java.IO.IOException, generating the message from errno.
* NB. this is also used form windows_secure_container_executor.c
*/
extern void throw_ioe(JNIEnv* env, int errnum);

#ifdef UNIX
#include <assert.h>
Expand Down Expand Up @@ -47,6 +52,84 @@
#include <sys/types.h>
#include <unistd.h>
#include "config.h"
#include <signal.h>
#include <linux/kdev_t.h>
#include <string.h>
#include <time.h>
#include <ctype.h>
#include <dirent.h>
#include <sys/utsname.h>
#define MAX_NAME_LEN 128
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)

void read_diskstats_stat_work(JNIEnv *env, char *diskstats, unsigned int pmajor, unsigned int pminor, char *diskName)
{
FILE *fp;
char line[256], dev_name[MAX_NAME_LEN];
int i;
unsigned int ios_pgr, tot_ticks, rq_ticks, wr_ticks, dc_ticks, fl_ticks;
unsigned long rd_ios, rd_merges_or_rd_sec, rd_ticks_or_wr_sec, wr_ios;
unsigned long wr_merges, rd_sec_or_wr_ios, wr_sec;
unsigned long dc_ios, dc_merges, dc_sec, fl_ios;
unsigned int major, minor;

if ((fp = fopen(diskstats, "r")) == NULL) {
throw_ioe(env, errno);
goto cleanup;
}

while (fgets(line, sizeof(line), fp) != NULL) {
/* major minor name rio rmerge rsect ruse wio wmerge wsect wuse running use aveq dcio dcmerge dcsect dcuse flio fltm */
i = sscanf(line, "%u %u %s %lu %lu %lu %lu %lu %lu %lu %u %u %u %u %lu %lu %lu %u %lu %u",
&major, &minor, dev_name,
&rd_ios, &rd_merges_or_rd_sec, &rd_sec_or_wr_ios, &rd_ticks_or_wr_sec,
&wr_ios, &wr_merges, &wr_sec, &wr_ticks, &ios_pgr, &tot_ticks, &rq_ticks,
&dc_ios, &dc_merges, &dc_sec, &dc_ticks,
&fl_ios, &fl_ticks);
if (pmajor == major && pminor == minor) {
strncpy(diskName, dev_name, MAX_NAME_LEN);
fclose(fp);
return;
}
}
if(fclose(fp) != 0) {
throw_ioe(env, errno);
goto cleanup;
}

cleanup:
return;
}

JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_getDiskName
(JNIEnv *env, jclass class, jstring j_str) {
struct stat buf;
const char *c_str = NULL;
char diskName[MAX_NAME_LEN] = {0};
jboolean isCopy;
c_str = (*env)->GetStringUTFChars(env, j_str, &isCopy);
// judge whether this str parameter is null or not should in java code.
if(c_str == NULL) {
return NULL;
}
if (unlikely(stat(c_str, &buf) == -1)) {
throw_ioe(env, errno);
goto cleanup;
} else {
unsigned int major = MAJOR(buf.st_dev);
unsigned int minor = MINOR(buf.st_dev);
read_diskstats_stat_work(env, "/proc/diskstats", major, minor, diskName);
(*env)->ReleaseStringUTFChars(env, j_str, c_str);
return (*env)->NewStringUTF(env, diskName);
}

cleanup:
if (c_str != NULL) {
(*env)->ReleaseStringUTFChars(env, j_str, c_str);
}
return NULL;
}
#endif

#ifdef WINDOWS
Expand Down Expand Up @@ -92,12 +175,6 @@ static jclass pmem_region_clazz = NULL;
static jmethodID pmem_region_ctor = NULL;
#endif

/*
* Throw a java.IO.IOException, generating the message from errno.
* NB. this is also used form windows_secure_container_executor.c
*/
extern void throw_ioe(JNIEnv* env, int errnum);

// Internal functions
#ifdef UNIX
static ssize_t get_pw_buflen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2007,5 +2007,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_LEASE_HARDLIMIT_DEFAULT =
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT;

public static final String DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY =
"dfs.datanode.disk.stat.interval.seconds";
public static final long DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT = 1L;

public static final String
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_KEY =
"dfs.datanode.available-space-volume-choosing-policy.io.util.preference.enable";
public static final boolean
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;
private DataSetLockManager dataSetLockManager;
private DiskIOUtilManager diskIOUtilManager;

private final ExecutorService xferService;

Expand Down Expand Up @@ -490,6 +491,11 @@ private static Tracer createTracer(Configuration conf) {
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
this.xferService =
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
if (Shell.LINUX) {
this.diskIOUtilManager = new DiskIOUtilManager(conf);
} else {
LOG.info("Disk io util manager does not start, only Linux OS release support!");
}
}

/**
Expand Down Expand Up @@ -1190,6 +1196,9 @@ public IOException call() {
conf.set(DFS_DATANODE_DATA_DIR_KEY,
Joiner.on(",").join(effectiveVolumes));
dataDirs = getStorageLocations(conf);
if (diskIOUtilManager != null) {
diskIOUtilManager.setStorageLocations(dataDirs);
}
}
}
}
Expand Down Expand Up @@ -1709,6 +1718,10 @@ void startDataNode(List<StorageLocation> dataDirectories,
synchronized (this) {
this.dataDirs = dataDirectories;
}
if (diskIOUtilManager != null) {
this.diskIOUtilManager.setStorageLocations(dataDirectories);
this.diskIOUtilManager.start();
}
this.dnConf = new DNConf(this);
checkSecureConfig(dnConf, getConf(), resources);

Expand Down Expand Up @@ -2505,6 +2518,10 @@ public void shutdown() {
dataNodeInfoBeanName = null;
}
if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
if (diskIOUtilManager != null) {
diskIOUtilManager.stop();
diskIOUtilManager = null;
}
LOG.info("Shutdown complete.");
synchronized(this) {
// it is already false, but setting it again to avoid a findbug warning.
Expand Down Expand Up @@ -4158,4 +4175,9 @@ boolean isSlownode() {
public BlockPoolManager getBlockPoolManager() {
return blockPoolManager;
}

public int getStorageLocationDiskUtil(StorageLocation location) {
return diskIOUtilManager != null ?
diskIOUtilManager.getStorageLocationDiskIOUtil(location) : 0;
}
}
Loading