Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -50,6 +51,7 @@ public class SFTPFileSystem extends FileSystem {

private SFTPConnectionPool connectionPool;
private URI uri;
private final AtomicBoolean closed = new AtomicBoolean(false);

private static final int DEFAULT_SFTP_PORT = 22;
private static final int DEFAULT_MAX_CONNECTION = 5;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class SFTPFileSystem extends FileSystem {
"Destination path %s already exist, cannot rename!";
public static final String E_FAILED_GETHOME = "Failed to get home directory";
public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
public static final String E_FS_CLOSED = "FileSystem is closed!";

/**
* Set configuration from UI.
Expand Down Expand Up @@ -138,8 +141,9 @@ private void setConfigurationFromURI(URI uriInfo, Configuration conf)
* @throws IOException
*/
private ChannelSftp connect() throws IOException {
Configuration conf = getConf();
checkNotClosed();

Configuration conf = getConf();
String host = conf.get(FS_SFTP_HOST, null);
int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
Expand Down Expand Up @@ -703,6 +707,31 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
}

@Override
public void close() throws IOException {
if (closed.getAndSet(true)) {
return;
}
try {
super.close();
} finally {
if (connectionPool != null) {
connectionPool.shutdown();
}
}
}

/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed.get()) {
throw new IOException(uri + ": " + E_FS_CLOSED);
}
}

@VisibleForTesting
SFTPConnectionPool getConnectionPool() {
return connectionPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,13 @@ public void testMkDirs() throws IOException {
assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
is(1));
}

@Test
public void testCloseFileSystemClosesConnectionPool() throws Exception {
SFTPFileSystem fs = (SFTPFileSystem) sftpFs;
fs.getHomeDirectory();
assertThat(fs.getConnectionPool().getLiveConnCount(), is(1));
fs.close();
assertThat(fs.getConnectionPool().getLiveConnCount(), is(0));
}
}