Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* dst sub-namespace with distcp.
*
* 1. Move data from the source path to the destination path with distcp.
* 2. Update the the mount entry.
* 2. Update the mount entry.
* 3. Delete the source path to trash.
*/
public class RouterFedBalance extends Configured implements Tool {
Expand All @@ -77,7 +77,7 @@ public class RouterFedBalance extends Configured implements Tool {
private static final String TRASH_PROCEDURE = "trash-procedure";

/**
* This class helps building the balance job.
* This class helps to build the balance job.
*/
private class Builder {
/* Force close all open files while there is no diff. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface FederationMBean {

/**
* Get the latest state of all routers.
* @return JSON with all of the known routers or null if failure.
* @return JSON with all the known routers or null if failure.
*/
String getRouters();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {

/** Time for an operation to be received in the Router. */
private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
/** Time for an operation to be send to the Namenode. */
/** Time for an operation to be sent to the Namenode. */
private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>();

/** Configuration for the performance monitor. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public String getMountTable() {

// Dump mount table entries information into JSON
for (MountTable entry : orderedMounts) {
// Sumarize destinations
// Summarize destinations
Set<String> nameservices = new LinkedHashSet<>();
Set<String> paths = new LinkedHashSet<>();
for (RemoteLocation location : entry.getDestinations()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ public synchronized void release() {
*/
public synchronized void close(boolean force) {
if (!force && this.numThreads > 0) {
// this is an erroneous case but we have to close the connection
// this is an erroneous case, but we have to close the connection
// anyway since there will be connection leak if we don't do so
// the connection has been moved out of the pool
LOG.error("Active connection with {} handlers will be closed",
this.numThreads);
}
this.closed = true;
Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
// Nobody should be using this anymore, so it should close right away
RPC.stopProxy(proxy);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ public void start() {
this.creator.start();

// Schedule a task to remove stale connection pools and sockets
long recyleTimeMs = Math.min(
long recycleTimeMs = Math.min(
poolCleanupPeriodMs, connectionCleanupPeriodMs);
LOG.info("Cleaning every {} seconds",
TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs));
TimeUnit.MILLISECONDS.toSeconds(recycleTimeMs));
this.cleaner.scheduleAtFixedRate(
new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS);
new CleanupTask(), 0, recycleTimeMs, TimeUnit.MILLISECONDS);

// Mark the manager as running
this.running = true;
Expand Down Expand Up @@ -364,9 +364,9 @@ void cleanup(ConnectionPool pool) {
long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
int total = pool.getNumConnections();
// Active is a transient status in many cases for a connection since
// the handler thread uses the connection very quickly. Thus the number
// the handler thread uses the connection very quickly. Thus, the number
// of connections with handlers using at the call time is constantly low.
// Recently active is more lasting status and it shows how many
// Recently active is more lasting status, and it shows how many
// connections have been used with a recent time period. (i.e. 30 seconds)
int active = pool.getNumActiveConnectionsRecently();
float poolMinActiveRatio = pool.getMinActiveRatio();
Expand All @@ -376,9 +376,9 @@ void cleanup(ConnectionPool pool) {
// The number should at least be 1
int targetConnectionsCount = Math.max(1,
(int)(poolMinActiveRatio * total) - active);
List<ConnectionContext> conns =
List<ConnectionContext> connections =
pool.removeConnections(targetConnectionsCount);
for (ConnectionContext conn : conns) {
for (ConnectionContext conn : connections) {
conn.close();
}
LOG.debug("Removed connection {} used {} seconds ago. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public static ActiveNamenodeResolver newActiveNamenodeResolver(
}

/**
* Add the the number of children for an existing HdfsFileStatus object.
* Add the number of children for an existing HdfsFileStatus object.
* @param dirStatus HdfsfileStatus object.
* @param children number of children to be added.
* @return HdfsFileStatus with the number of children specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public MountTableRefresherThread(MountTableManager manager,

/**
* Refresh mount table cache of local and remote routers. Local and remote
* routers will be refreshed differently. Lets understand what are the
* routers will be refreshed differently. Let's understand what are the
* local and remote routers and refresh will be done differently on these
* routers. Suppose there are three routers R1, R2 and R3. User want to add
* new mount table entry. He will connect to only one router, not all the
* routers. Suppose He connects to R1 and calls add mount table entry through
* API or CLI. Now in this context R1 is local router, R2 and R3 are remote
* routers. Because add mount table entry is invoked on R1, R1 will update the
* cache locally it need not to make RPC call. But R1 will make RPC calls to
* cache locally it need not make RPC call. But R1 will make RPC calls to
* update cache on R2 and R3.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
TimeUnit.MINUTES.toMillis(1);
/**
* Remote router mount table cache is updated through RouterClient(RPC call).
* To improve performance, RouterClient connections are cached but it should
* To improve performance, RouterClient connections are cached, but it should
* not be kept in cache forever. This property defines the max time a
* connection can be cached.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class RemoteMethod {
private static final Logger LOG = LoggerFactory.getLogger(RemoteMethod.class);


/** List of parameters: static and dynamic values, matchings types. */
/** List of parameters: static and dynamic values, matching types. */
private final Object[] params;
/** List of method parameters types, matches parameters. */
private final Class<?>[] types;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
import org.apache.hadoop.thirdparty.protobuf.BlockingService;

/**
* This class is responsible for handling all of the Admin calls to the HDFS
* This class is responsible for handling all the Admin calls to the HDFS
* router. It is created, started, and stopped by {@link Router}.
*/
public class RouterAdminServer extends AbstractService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
* Check if an exception is caused by an unavailable subcluster or not. It
* also checks the causes.
* @param ioe IOException to check.
* @return If caused by an unavailable subcluster. False if the should not be
* @return If caused by an unavailable subcluster. False if they should not be
* retried (e.g., NSQuotaExceededException).
*/
protected static boolean isUnavailableSubclusterException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private List<MountTable> getMountTableEntries() throws IOException {
* During this time, the quota usage cache will also be updated by
* quota manager:
* 1. Stale paths (entries) will be removed.
* 2. Existing entries will be override and updated.
* 2. Existing entries will be overridden and updated.
* @return List of mount tables which quota was set.
* @throws IOException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public class RouterRpcClient {
/**
* Create a router RPC client to manage remote procedure calls to NNs.
*
* @param conf Hdfs Configuation.
* @param conf Hdfs Configuration.
* @param router A router using this RPC client.
* @param resolver A NN resolver to determine the currently active NN in HA.
* @param monitor Optional performance monitor.
Expand Down Expand Up @@ -444,7 +444,7 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
* @param ugi User group information.
* @param namenodes A prioritized list of namenodes within the same
* nameservice.
* @param method Remote ClientProtcol method to invoke.
* @param method Remote ClientProtocol method to invoke.
* @param params Variable list of parameters matching the method.
* @return The result of invoking the method.
* @throws ConnectException If it cannot connect to any Namenode.
Expand Down Expand Up @@ -1027,7 +1027,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
} catch (Exception e) {
// Unusual error, ClientProtocol calls always use IOException (or
// RemoteException). Re-wrap in IOException for compatibility with
// ClientProtcol.
// ClientProtocol.
LOG.error("Unexpected exception {} proxying {} to {}",
e.getClass(), m.getName(), ns, e);
IOException ioe = new IOException(
Expand Down Expand Up @@ -1449,7 +1449,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
results.add(new RemoteResult<>(location, ioe));
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
LOG.debug("Canot execute {} in {}: {}",
LOG.debug("Cannot execute {} in {}: {}",
m.getName(), location, cause.getMessage());

// Convert into IOException if needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void init(

/**
* Start proxying an operation to the Namenode.
* @return Id of the thread doing the proxying.
* @return id of the thread doing the proxying.
*/
long proxyOp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ protected Response get(
* @param path Path to check.
* @param op Operation to perform.
* @param openOffset Offset for opening a file.
* @param excludeDatanodes Blocks to excluded.
* @param excludeDatanodes Blocks to exclude.
* @param parameters Other parameters.
* @return Redirection URI.
* @throws URISyntaxException If it cannot parse the URI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) {
*
* @param clazz Class of the record to store.
* @param driver State Store driver.
* @param over If the entries should be override if they expire
* @param over If the entries should be overridden if they expire
*/
protected CachedRecordStore(
Class<R> clazz, StateStoreDriver driver, boolean over) {
Expand Down Expand Up @@ -153,7 +153,7 @@ public boolean loadCache(boolean force) throws IOException {
}

/**
* Check if it's time to update the cache. Update it it was never updated.
* Check if it's time to update the cache. Update it was never updated.
*
* @return If it's time to update this cache.
*/
Expand Down Expand Up @@ -206,7 +206,7 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
* Updates the state store with any record overrides we detected, such as an
* expired state.
*
* @param record Record record to be updated.
* @param record record to be updated.
* @throws IOException If the values cannot be updated.
*/
public void overrideExpiredRecord(R record) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public StateStoreDriver getDriver() {
}

/**
* Fetch a unique identifier for this state store instance. Typically it is
* Fetch a unique identifier for this state store instance. Typically, it is
* the address of the router.
*
* @return Unique identifier for this store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected Configuration getConf() {
}

/**
* Gets a unique identifier for the running task/process. Typically the
* Gets a unique identifier for the running task/process. Typically, the
* router address.
*
* @return Unique identifier for the running task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;

/**
* API request for retrieving a all non-expired router registrations present in
* API request for retrieving an all non-expired router registrations present in
* the state store.
*/
public abstract class GetRouterRegistrationsRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public B getBuilder() {

/**
* Get the serialized proto object. If the translator was created from a byte
* stream, returns the intitial byte stream. Otherwise creates a new byte
* stream, returns the initial byte stream. Otherwise, creates a new byte
* stream from the cached builder.
*
* @return Protobuf message object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private String getUsage(String cmd) {

/**
* Usage: validates the maximum number of arguments for a command.
* @param arg List of of command line parameters.
* @param arg List of command line parameters.
*/
private void validateMax(String[] arg) {
if (arg[0].equals("-ls")) {
Expand Down Expand Up @@ -407,7 +407,7 @@ public int run(String[] argv) throws Exception {
System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
printUsage(cmd);
} catch (RemoteException e) {
// This is a error returned by the server.
// This is an error returned by the server.
// Print out the first line of the error message, ignore the stack trace.
exitCode = -1;
debugException = e;
Expand Down Expand Up @@ -807,7 +807,7 @@ public void listMounts(String[] argv, int i) throws IOException {
} else if (argv[i].equals("-d")) { // Check if -d parameter is specified.
detail = true;
if (argv.length == 2) {
path = "/"; // If no path is provide with -ls -d.
path = "/"; // If no path is provided with -ls -d.
} else {
path = argv[++i];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ public void startCluster(Configuration overrideConf) {
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
// Router also uses this configurations as initial values.
// Router also uses these configurations as initial values.
routerConf = new Configuration(overrideConf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void updateActiveNamenode(
break;
}
}
// This operation modifies the list so we need to be careful
// This operation modifies the list, so we need to be careful
synchronized(namenodes) {
Collections.sort(namenodes, new NamenodePriorityComparator());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testCacheUpdateOnNamenodeStateUpdate() throws IOException {
String rpcAddr = namenode.getRpcAddress();
InetSocketAddress inetAddr = getInetSocketAddress(rpcAddr);

// If the namenode state changes and it serves request,
// If the namenode state changes, and it serves request,
// RouterRpcClient calls updateActiveNamenode to update the state to active,
// Check whether correct updated state is returned post update.
namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.junit.Test;

/**
* The the safe mode for the {@link Router} controlled by
* The safe mode for the {@link Router} controlled by
* {@link SafeModeTimer}.
*/
public class TestRouter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ public void testMountTablePermissions() throws Exception {
* @param mount
* target mount table
* @param canRead
* whether can list mount tables under specified mount
* whether you can list mount tables under specified mount
* @param addCommandCode
* expected return code of add command executed for specified mount
* @param rmCommandCode
Expand Down Expand Up @@ -1467,7 +1467,7 @@ public void testUpdateErrorCase() throws Exception {
err.toString().contains("update: /noMount doesn't exist."));
err.reset();

// Check update if non true/false value is passed for readonly.
// Check update if no true/false value is passed for readonly.
argv = new String[] {"-update", src, "-readonly", "check"};
assertEquals(-1, ToolRunner.run(admin, argv));
assertTrue(err.toString(), err.toString().contains("update: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ public String toString() {
}

/**
* Asserts that the results are the expected amount and it has both success
* Asserts that the results are the expected amount, and it has both success
* and failure.
* @param msg Message to show when the assertion fails.
* @param expected Expected number of results.
Expand Down
Loading