diff --git a/README.md b/README.md index 94a7a02..20fb991 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,18 @@ Map env = ImmutableMap. builder() FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader()); ``` +##### Uploading Objects Using Multipart Upload API + +By default s3fs will upload an object to s3 by calling `AmazonS3Client#putObject()`. +If you whant to upload file using the Multipart Upload API : + +```java +Map env = ImmutableMap. builder() + .put(com.upplication.s3fs.AmazonS3Factory.MULTIPART_UPLOAD_ENABLED, "true") + .build() +FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader()); +``` + Complete settings lists: * s3fs_access_key diff --git a/pom.xml b/pom.xml index 2b16861..09cc15b 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.upplication s3fs jar - 2.2.2 + 2.3.0-SNAPSHOT s3fs S3 filesystem provider for Java 7 https://github.com/Upplication/Amazon-S3-FileSystem-NIO2 @@ -62,6 +62,7 @@ 18.0 1.5 1.3.9 + 1.0.1 @@ -101,6 +102,11 @@ jsr305 ${com.google.code.findbugs.jsr305.version} + + com.github.alexmojaki + s3-stream-upload + ${s3.stream.upload.version} + diff --git a/src/main/java/com/upplication/s3fs/S3FileChannel.java b/src/main/java/com/upplication/s3fs/S3FileChannel.java index 95d916d..db9f3a3 100644 --- a/src/main/java/com/upplication/s3fs/S3FileChannel.java +++ b/src/main/java/com/upplication/s3fs/S3FileChannel.java @@ -2,7 +2,6 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.util.IOUtils; import org.apache.tika.Tika; import java.io.*; diff --git a/src/main/java/com/upplication/s3fs/S3FileSystem.java b/src/main/java/com/upplication/s3fs/S3FileSystem.java index 96ed9e4..c5d0c0a 100644 --- a/src/main/java/com/upplication/s3fs/S3FileSystem.java +++ b/src/main/java/com/upplication/s3fs/S3FileSystem.java @@ -15,6 +15,7 @@ import com.amazonaws.services.s3.model.Bucket; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.util.Properties; /** * S3FileSystem with a concrete client configured and ready to use. @@ -24,16 +25,18 @@ public class S3FileSystem extends FileSystem implements Comparable { private final S3FileSystemProvider provider; + private final Properties properties; private final String key; private final AmazonS3 client; private final String endpoint; private int cache; - public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint) { + public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint, Properties props) { this.provider = provider; this.key = key; this.client = client; this.endpoint = endpoint; + this.properties = props; this.cache = 60000; // 1 minute cache for the s3Path } @@ -176,4 +179,8 @@ public int compareTo(S3FileSystem o) { public int getCache() { return cache; } + + public Properties getProperties() { + return properties; + } } \ No newline at end of file diff --git a/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java b/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java index 77aca37..a592f33 100644 --- a/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java +++ b/src/main/java/com/upplication/s3fs/S3FileSystemProvider.java @@ -6,6 +6,7 @@ import com.amazonaws.services.s3.model.Bucket; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectId; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -31,9 +32,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static com.google.common.collect.Sets.difference; import static com.upplication.s3fs.AmazonS3Factory.*; import static java.lang.String.format; +import static java.lang.String.format; /** * Spec: @@ -65,13 +70,21 @@ */ public class S3FileSystemProvider extends FileSystemProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class); public static final String CHARSET_KEY = "s3fs_charset"; public static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory"; + public static final String MULTIPART_UPLOAD_ENABLED = "s3fs_multipart_upload_enabled"; + public static final String MULTIPART_UPLOAD_PART_SIZE = "s3fs_multipart_upload_part_size"; + public static final String MULTIPART_UPLOAD_NUM_STREAMS = "s3fs_multipart_upload_num_streams"; + public static final String MULTIPART_UPLOAD_QUEUE_CAPACITY = "s3fs_multipart_upload_queue_capacity"; + public static final String MULTIPART_UPLOAD_NUM_UPLOAD_THREADS = "s3fs_multipart_upload_num_upload_threads"; private static final ConcurrentMap fileSystems = new ConcurrentHashMap<>(); private static final List PROPS_TO_OVERLOAD = Arrays.asList(ACCESS_KEY, SECRET_KEY, REQUEST_METRIC_COLLECTOR_CLASS, CONNECTION_TIMEOUT, MAX_CONNECTIONS, MAX_ERROR_RETRY, PROTOCOL, PROXY_DOMAIN, PROXY_HOST, PROXY_PASSWORD, PROXY_PORT, PROXY_USERNAME, PROXY_WORKSTATION, SOCKET_SEND_BUFFER_SIZE_HINT, SOCKET_RECEIVE_BUFFER_SIZE_HINT, SOCKET_TIMEOUT, - USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS); + USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS, + MULTIPART_UPLOAD_ENABLED, MULTIPART_UPLOAD_PART_SIZE, MULTIPART_UPLOAD_NUM_STREAMS, + MULTIPART_UPLOAD_QUEUE_CAPACITY, MULTIPART_UPLOAD_NUM_UPLOAD_THREADS); private S3Utils s3Utils = new S3Utils(); private Cache cache = new Cache(); @@ -95,6 +108,9 @@ public FileSystem newFileSystem(URI uri, Map env) { // create the filesystem with the final properties, store and return S3FileSystem fileSystem = createFileSystem(uri, props); fileSystems.put(fileSystem.getKey(), fileSystem); + + LOGGER.debug("New file system created. url:{}, props:{}", uri, props); + return fileSystem; } @@ -302,6 +318,8 @@ public Path getPath(URI uri) { @Override public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter filter) throws IOException { + LOGGER.debug("New directory stream. path:{}, filter:{}", dir, filter); + final S3Path s3Path = toS3Path(dir); return new DirectoryStream() { @Override @@ -316,8 +334,47 @@ public Iterator iterator() { }; } + private S3MultipartUploadOutputStream createMultipartUploadOutputStream(final S3Path s3Path, Set opts) throws IOException { + final S3ObjectId objectId = s3Path.toS3ObjectId(); + final Set options = Sets.newHashSet(opts); + final S3FileSystem fileSystem = s3Path.getFileSystem(); + final Properties properties = fileSystem.getProperties(); + final AmazonS3 client = s3Path.getFileSystem().getClient(); + final boolean createOpt = options.remove(StandardOpenOption.CREATE); + final boolean createNewOpt = options.remove(StandardOpenOption.CREATE_NEW); + final S3MultipartUploadOutputStream stream = new S3MultipartUploadOutputStream(client, objectId, properties); + + // validate options + if (options.isEmpty()) { + return stream; + } + + // Remove irrelevant/ignored options + options.remove(StandardOpenOption.WRITE); + options.remove(StandardOpenOption.SPARSE); + options.remove(StandardOpenOption.TRUNCATE_EXISTING); + + if (!options.isEmpty()) { + throw new UnsupportedOperationException(format("Unsupported operation: %s", options)); + } + + if (createNewOpt && fileSystem.provider().exists(s3Path)) { + fileSystem.provider().delete(s3Path); + } + + if (!createOpt && fileSystem.provider().exists(s3Path)) { + throw new FileAlreadyExistsException(format("Target already exists: %s", s3Path)); + } + + return stream; + } + @Override public InputStream newInputStream(Path path, OpenOption... options) throws IOException { + LOGGER.debug("New input stream. path:{}, options:{}", path, options); + + System.out.println("newInputStream"); + S3Path s3Path = toS3Path(path); String key = s3Path.getKey(); @@ -342,14 +399,46 @@ public InputStream newInputStream(Path path, OpenOption... options) throws IOExc @Override public SeekableByteChannel newByteChannel(Path path, Set options, FileAttribute... attrs) throws IOException { - S3Path s3Path = toS3Path(path); - return new S3SeekableByteChannel(s3Path, options); + LOGGER.debug("New byte channel. path:{}, options:{}", path, options); + + final S3Path s3Path = toS3Path(path); + final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options); + + if (!multipartEnabled) { + + LOGGER.debug("Using S3SeekableByteChannel"); + + return new S3SeekableByteChannel(s3Path, options); + } + + LOGGER.debug("Using S3MultipartFileChannel"); + + final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options); + final FileChannel channel = new S3MultipartUploadChannel(outputStream); + + return channel; } @Override public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { - S3Path s3Path = toS3Path(path); - return new S3FileChannel(s3Path, options); + LOGGER.debug("New file channel. path:{}, filter:{}", path, options); + + final S3Path s3Path = toS3Path(path); + final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options); + + if (!multipartEnabled) { + + LOGGER.debug("Using S3FileChannel"); + + return new S3FileChannel(s3Path, options); + } + + LOGGER.debug("Using S3MultipartFileChannel"); + + final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options); + final FileChannel channel = new S3MultipartUploadChannel(outputStream); + + return channel; } /** @@ -359,6 +448,8 @@ public FileChannel newFileChannel(Path path, Set options, */ @Override public void createDirectory(Path dir, FileAttribute... attrs) throws IOException { + LOGGER.debug("Create directory. path:{}, attrs:{}", dir, attrs); + S3Path s3Path = toS3Path(dir); Preconditions.checkArgument(attrs.length == 0, "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO if (exists(s3Path)) @@ -378,6 +469,8 @@ public void createDirectory(Path dir, FileAttribute... attrs) throws IOExcept @Override public void delete(Path path) throws IOException { + LOGGER.debug("Delete path:{}", path); + S3Path s3Path = toS3Path(path); if (Files.notExists(s3Path)) throw new NoSuchFileException("the path: " + this + " not exists"); @@ -393,6 +486,8 @@ public void delete(Path path) throws IOException { @Override public void copy(Path source, Path target, CopyOption... options) throws IOException { + LOGGER.debug("Copy {} to target. options:{}", source, target, options); + if (isSameFile(source, target)) return; @@ -424,6 +519,8 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep @Override public void move(Path source, Path target, CopyOption... options) throws IOException { + LOGGER.debug("Move {} to target. options:{}", source, target, options); + if (options != null && Arrays.asList(options).contains(StandardCopyOption.ATOMIC_MOVE)) throw new AtomicMoveNotSupportedException(source.toString(), target.toString(), "Atomic not supported"); copy(source, target, options); @@ -550,7 +647,7 @@ public void setAttribute(Path path, String attribute, Object value, LinkOption.. * @return S3FileSystem never null */ public S3FileSystem createFileSystem(URI uri, Properties props) { - return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost()); + return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost(), props); } protected AmazonS3 getAmazonS3(URI uri, Properties props) { @@ -634,4 +731,16 @@ public Cache getCache() { public void setCache(Cache cache) { this.cache = cache; } + + private boolean isMultipartUploadCapable(final S3Path s3Path, final Set options) { + // Not supported options + if (options.contains(StandardOpenOption.READ) || options.contains(StandardOpenOption.APPEND)) { + return false; + } + + final S3FileSystem fileSystem = s3Path.getFileSystem(); + final Properties properties = fileSystem.getProperties(); + + return Boolean.parseBoolean(properties.getProperty(MULTIPART_UPLOAD_ENABLED, "false")); + } } \ No newline at end of file diff --git a/src/main/java/com/upplication/s3fs/S3MultipartUploadChannel.java b/src/main/java/com/upplication/s3fs/S3MultipartUploadChannel.java new file mode 100644 index 0000000..d46ea50 --- /dev/null +++ b/src/main/java/com/upplication/s3fs/S3MultipartUploadChannel.java @@ -0,0 +1,115 @@ +package com.upplication.s3fs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.NonReadableChannelException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +public class S3MultipartUploadChannel extends FileChannel { + private final WritableByteChannel writableChannel; + + private final S3MultipartUploadOutputStream outputStream; + + private long size; + + public S3MultipartUploadChannel(final S3MultipartUploadOutputStream outputStream) { + this.outputStream = outputStream; + this.writableChannel = Channels.newChannel(outputStream); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + throw new NonReadableChannelException(); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + throw new NonReadableChannelException(); + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + throw new NonReadableChannelException(); + } + + @Override + public int write(ByteBuffer src) throws IOException { + synchronized (writableChannel) { + final int written = this.writableChannel.write(src); + + size += written; + + return written; + } + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public long position() throws IOException { + return size; + } + + @Override + public FileChannel position(long newPosition) throws IOException { + return this; + } + + @Override + public long size() throws IOException { + return size; + } + + @Override + public FileChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void force(boolean metaData) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public long transferTo(long position, long count, WritableByteChannel target) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + protected void implCloseChannel() throws IOException { + outputStream.close(); + } +} diff --git a/src/main/java/com/upplication/s3fs/S3MultipartUploadOutputStream.java b/src/main/java/com/upplication/s3fs/S3MultipartUploadOutputStream.java new file mode 100644 index 0000000..efaace9 --- /dev/null +++ b/src/main/java/com/upplication/s3fs/S3MultipartUploadOutputStream.java @@ -0,0 +1,82 @@ +package com.upplication.s3fs; + +import alex.mojaki.s3upload.MultiPartOutputStream; +import alex.mojaki.s3upload.StreamTransferManager; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.S3ObjectId; +import java.io.IOException; + +import java.io.OutputStream; +import java.util.Properties; + +import static com.upplication.s3fs.S3FileSystemProvider.MULTIPART_UPLOAD_NUM_UPLOAD_THREADS; +import static com.upplication.s3fs.S3FileSystemProvider.MULTIPART_UPLOAD_QUEUE_CAPACITY; +import static com.upplication.s3fs.S3FileSystemProvider.MULTIPART_UPLOAD_NUM_STREAMS; +import static com.upplication.s3fs.S3FileSystemProvider.MULTIPART_UPLOAD_PART_SIZE; + +public class S3MultipartUploadOutputStream extends OutputStream { + + private final MultiPartOutputStream outputStream; + + private final StreamTransferManager manager; + + private static final int DEFAULT_NUM_UPLOAD_THREADS = 1; + + private static final int DEFAULT_QUEUE_CAPACITY = 1; + + private static final int DEFAULT_NUM_STREAMS = 1; + + private static final int DEFAULT_PART_SIZE = 5; + + public S3MultipartUploadOutputStream(final AmazonS3 s3Client, final S3ObjectId objectId, final Properties properties) { + this( + createStreamTransferManager(s3Client, objectId, properties), + objectId + ); + } + + public S3MultipartUploadOutputStream( + final StreamTransferManager manager, + final S3ObjectId objectId) { + + this.manager = manager; + this.outputStream = manager.getMultiPartOutputStreams().get(0); + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + + try { + outputStream.checkSize(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + this.outputStream.close(); + this.manager.complete(); + } + + private static StreamTransferManager createStreamTransferManager( + final AmazonS3 s3Client, + final S3ObjectId objectId, + final Properties properties) { + + return new StreamTransferManager( + objectId.getBucket(), + objectId.getKey(), + s3Client, + getIntValue(properties, MULTIPART_UPLOAD_NUM_STREAMS, DEFAULT_NUM_STREAMS), + getIntValue(properties, MULTIPART_UPLOAD_NUM_UPLOAD_THREADS, DEFAULT_NUM_UPLOAD_THREADS), + getIntValue(properties, MULTIPART_UPLOAD_QUEUE_CAPACITY, DEFAULT_QUEUE_CAPACITY), + getIntValue(properties, MULTIPART_UPLOAD_PART_SIZE, DEFAULT_PART_SIZE) + ); + } + + private static int getIntValue(final Properties properties, final String name, final int defaultValue) { + return Integer.parseInt(properties.getProperty(name, String.valueOf(defaultValue))); + } +} \ No newline at end of file diff --git a/src/main/java/com/upplication/s3fs/S3Path.java b/src/main/java/com/upplication/s3fs/S3Path.java index e67660e..8fcf153 100644 --- a/src/main/java/com/upplication/s3fs/S3Path.java +++ b/src/main/java/com/upplication/s3fs/S3Path.java @@ -1,5 +1,6 @@ package com.upplication.s3fs; +import com.amazonaws.services.s3.model.S3ObjectId; import com.google.common.base.*; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -131,6 +132,10 @@ public String getKey() { return key; } + public S3ObjectId toS3ObjectId() { + return new S3ObjectId(fileStore.getBucket().getName(), getKey()); + } + @Override public S3FileSystem getFileSystem() { return this.fileSystem; diff --git a/src/test/java/com/upplication/s3fs/FileSystemProvider/MultipartUploadNewByteChannelTest.java b/src/test/java/com/upplication/s3fs/FileSystemProvider/MultipartUploadNewByteChannelTest.java new file mode 100644 index 0000000..9248b52 --- /dev/null +++ b/src/test/java/com/upplication/s3fs/FileSystemProvider/MultipartUploadNewByteChannelTest.java @@ -0,0 +1,91 @@ +package com.upplication.s3fs.FileSystemProvider; + +import com.google.common.collect.ImmutableMap; +import com.upplication.s3fs.S3FileSystemProvider; +import com.upplication.s3fs.S3MultipartUploadChannel; +import com.upplication.s3fs.S3SeekableByteChannel; +import com.upplication.s3fs.S3UnitTestBase; +import com.upplication.s3fs.util.*; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.*; +import java.nio.file.spi.FileSystemProvider; +import java.security.SecureRandom; +import java.util.EnumSet; + +import static org.junit.Assert.*; + +public class MultipartUploadNewByteChannelTest extends S3UnitTestBase { + + @Test + public void testNewByteChannelMultipartUpload() throws IOException, InterruptedException { + final byte[] bytes = randomMegaBytes(1); + final FileSystem fileSystem = createFileSystem(true); + final FileSystemProvider provider = fileSystem.provider(); + final Path path = fileSystem.getPath("/test-bucket/test-path"); + final AmazonS3ClientMock client = AmazonS3MockFactory.getAmazonClientMock(); + + client.bucket("test-bucket"); + + final ByteBuffer buffer = ByteBuffer.wrap(bytes); + final SeekableByteChannel channel = provider.newByteChannel(path, EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.CREATE)); + + channel.write(buffer); + channel.close(); + + assertEquals(S3MultipartUploadChannel.class, channel.getClass()); + assertEquals(bytes.length, channel.size()); + } + + @Test + public void testUseDefaultChannelForReading() throws IOException, InterruptedException { + final FileSystem fileSystem = createFileSystem(true); + final FileSystemProvider provider = fileSystem.provider(); + final Path path = fileSystem.getPath("/test-bucket/test-path"); + final AmazonS3ClientMock client = AmazonS3MockFactory.getAmazonClientMock(); + + client.bucket("test-bucket").file("test-path"); + + final SeekableByteChannel channel = provider.newByteChannel(path, EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ)); + + channel.close(); + + assertEquals(S3SeekableByteChannel.class, channel.getClass()); + } + + @Test + public void testUseDefaultChannelWhenDisabled() throws IOException, InterruptedException { + final FileSystem fileSystem = createFileSystem(false); + final FileSystemProvider provider = fileSystem.provider(); + final Path path = fileSystem.getPath("/test-bucket/test-path"); + final AmazonS3ClientMock client = AmazonS3MockFactory.getAmazonClientMock(); + + client.bucket("test-bucket"); + + final SeekableByteChannel channel = provider.newByteChannel(path, EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.CREATE)); + + assertEquals(S3SeekableByteChannel.class, channel.getClass()); + } + + private byte[] randomMegaBytes(final int sizeInMegabytes) throws IOException { + final byte[] bytes = new byte[sizeInMegabytes * 1024 * 1024]; + final SecureRandom random = new SecureRandom(); + + random.nextBytes(bytes); + + return bytes; + } + + private FileSystem createFileSystem(final boolean multipartEnabled) throws IOException { + final S3FileSystemProvider provider = getS3fsProvider(); + final FileSystem fileSystem = provider.newFileSystem( + S3EndpointConstant.S3_GLOBAL_URI_TEST, + ImmutableMap.of(S3FileSystemProvider.MULTIPART_UPLOAD_ENABLED, String.valueOf(multipartEnabled)) + ); + + return fileSystem; + } +} \ No newline at end of file diff --git a/src/test/java/com/upplication/s3fs/FileSystemProvider/MultipartUploadNewFileChannelTest.java b/src/test/java/com/upplication/s3fs/FileSystemProvider/MultipartUploadNewFileChannelTest.java new file mode 100644 index 0000000..4ea6175 --- /dev/null +++ b/src/test/java/com/upplication/s3fs/FileSystemProvider/MultipartUploadNewFileChannelTest.java @@ -0,0 +1,90 @@ +package com.upplication.s3fs.FileSystemProvider; + +import com.google.common.collect.ImmutableMap; +import com.upplication.s3fs.S3FileChannel; +import com.upplication.s3fs.S3FileSystemProvider; +import com.upplication.s3fs.S3MultipartUploadChannel; +import com.upplication.s3fs.S3UnitTestBase; +import com.upplication.s3fs.util.*; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.*; +import java.nio.file.spi.FileSystemProvider; +import java.security.SecureRandom; +import java.util.EnumSet; + +import static org.junit.Assert.*; + +public class MultipartUploadNewFileChannelTest extends S3UnitTestBase { + private FileSystem createFileSystem(final boolean multipartEnabled) throws IOException { + final S3FileSystemProvider provider = getS3fsProvider(); + final FileSystem fileSystem = provider.newFileSystem( + S3EndpointConstant.S3_GLOBAL_URI_TEST, + ImmutableMap.of(S3FileSystemProvider.MULTIPART_UPLOAD_ENABLED, String.valueOf(multipartEnabled)) + ); + + return fileSystem; + } + + @Test + public void testNewFileChannelMultipartUpload() throws IOException, InterruptedException { + final byte[] bytes = randomMegaBytes(1); + final FileSystem fileSystem = createFileSystem(true); + final FileSystemProvider provider = fileSystem.provider(); + final Path path = fileSystem.getPath("/test-bucket/test-path"); + final AmazonS3ClientMock client = AmazonS3MockFactory.getAmazonClientMock(); + + client.bucket("test-bucket"); + + final ByteBuffer buffer = ByteBuffer.wrap(bytes); + final FileChannel channel = provider.newFileChannel(path, EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.CREATE)); + + channel.write(buffer); + channel.close(); + + assertEquals(S3MultipartUploadChannel.class, channel.getClass()); + assertEquals(bytes.length, channel.size()); + } + + @Test + public void testUseDefaultChannelForReading() throws IOException, InterruptedException { + final FileSystem fileSystem = createFileSystem(true); + final FileSystemProvider provider = fileSystem.provider(); + final Path path = fileSystem.getPath("/test-bucket/test-path"); + final AmazonS3ClientMock client = AmazonS3MockFactory.getAmazonClientMock(); + + client.bucket("test-bucket").file("test-path"); + + final FileChannel channel = provider.newFileChannel(path, EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.READ)); + + channel.close(); + + assertEquals(S3FileChannel.class, channel.getClass()); + } + + @Test + public void testUseDefaultChannelWhenDisabled() throws IOException, InterruptedException { + final FileSystem fileSystem = createFileSystem(false); + final FileSystemProvider provider = fileSystem.provider(); + final Path path = fileSystem.getPath("/test-bucket/test-path"); + final AmazonS3ClientMock client = AmazonS3MockFactory.getAmazonClientMock(); + + client.bucket("test-bucket"); + + final FileChannel channel = provider.newFileChannel(path, EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.CREATE)); + + assertEquals(S3FileChannel.class, channel.getClass()); + } + + private byte[] randomMegaBytes(final int sizeInMegabytes) throws IOException { + final byte[] bytes = new byte[sizeInMegabytes * 1024 * 1024]; + final SecureRandom random = new SecureRandom(); + + random.nextBytes(bytes); + + return bytes; + } +} \ No newline at end of file diff --git a/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelIT.java b/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelIT.java index 4d4d388..920b467 100644 --- a/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelIT.java +++ b/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelIT.java @@ -16,12 +16,10 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.*; import java.util.EnumSet; -import java.util.Properties; import java.util.UUID; import static com.upplication.s3fs.util.S3EndpointConstant.S3_GLOBAL_URI_IT; import static org.junit.Assert.*; -import static org.mockito.Mockito.*; public class NewByteChannelIT extends S3UnitTestBase { diff --git a/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelTest.java b/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelTest.java index 2778c53..4795612 100644 --- a/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelTest.java +++ b/src/test/java/com/upplication/s3fs/FileSystemProvider/NewByteChannelTest.java @@ -14,12 +14,10 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.*; import java.util.EnumSet; -import java.util.Properties; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; public class NewByteChannelTest extends S3UnitTestBase { diff --git a/src/test/java/com/upplication/s3fs/FileSystemsIT.java b/src/test/java/com/upplication/s3fs/FileSystemsIT.java index 9d82336..2b0c5ec 100644 --- a/src/test/java/com/upplication/s3fs/FileSystemsIT.java +++ b/src/test/java/com/upplication/s3fs/FileSystemsIT.java @@ -1,24 +1,17 @@ package com.upplication.s3fs; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.github.marschall.memoryfilesystem.MemoryFileSystemBuilder; -import com.upplication.s3fs.util.CopyDirVisitor; import com.upplication.s3fs.util.EnvironmentBuilder; import org.junit.Before; import org.junit.Test; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.OutputStream; import java.net.URI; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.*; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.EnumSet; -import java.util.Map; -import java.util.UUID; + +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; import static com.upplication.s3fs.util.S3EndpointConstant.S3_GLOBAL_URI_IT; +import java.nio.file.FileSystemNotFoundException; import static org.junit.Assert.*; public class FileSystemsIT { diff --git a/src/test/java/com/upplication/s3fs/MultipartUploadIT.java b/src/test/java/com/upplication/s3fs/MultipartUploadIT.java new file mode 100644 index 0000000..9e12aeb --- /dev/null +++ b/src/test/java/com/upplication/s3fs/MultipartUploadIT.java @@ -0,0 +1,87 @@ +package com.upplication.s3fs; + +import com.google.common.collect.ImmutableMap; +import com.upplication.s3fs.util.EnvironmentBuilder; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.UUID; + +import java.nio.file.FileSystem; +import java.nio.file.FileSystemNotFoundException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.security.SecureRandom; + +import static com.upplication.s3fs.util.S3EndpointConstant.S3_GLOBAL_URI_IT; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import static org.junit.Assert.*; + +public class MultipartUploadIT { + private static final String BUCKET = EnvironmentBuilder.getBucket(); + private static final URI S3_URI = EnvironmentBuilder.getS3URI(S3_GLOBAL_URI_IT); + + private FileSystem fileSystem; + + @Before + public void setup() throws IOException { + System.clearProperty(S3FileSystemProvider.AMAZON_S3_FACTORY_CLASS); + + this.fileSystem = build(); + } + + private static FileSystem build() throws IOException { + try { + FileSystems.getFileSystem(S3_URI).close(); + return createNewFileSystem(); + } catch (FileSystemNotFoundException e) { + return createNewFileSystem(); + } + } + + private static FileSystem createNewFileSystem() throws IOException { + return FileSystems.newFileSystem( + S3_URI, + ImmutableMap.builder() + .put(S3FileSystemProvider.MULTIPART_UPLOAD_ENABLED, "true") + .putAll(EnvironmentBuilder.getRealEnv()) + .build() + ); + } + + @Test + public void testUploadUsingByteChannel() throws IOException { + final Path path = fileSystem.getPath(BUCKET, UUID.randomUUID().toString()); + final byte[] bytes = randomMegaBytes(5); + + Files.write(path, bytes, StandardOpenOption.CREATE_NEW); + + assertTrue(Files.exists(path)); + } + + @Test + public void testUploadUsingFileChannel() throws IOException { + final byte[] bytes = randomMegaBytes(10); + final Path path = fileSystem.getPath(BUCKET, UUID.randomUUID().toString()); + + try (final FileChannel channel = FileChannel.open(path, StandardOpenOption.WRITE)) { + channel.write(ByteBuffer.wrap(bytes)); + } + + assertTrue(Files.exists(path)); + } + + private byte[] randomMegaBytes(final int sizeInMegabytes) throws IOException { + final byte[] bytes = new byte[sizeInMegabytes * 1024 * 1024]; + final SecureRandom random = new SecureRandom(); + + random.nextBytes(bytes); + + return bytes; + } +} diff --git a/src/test/java/com/upplication/s3fs/S3FileSystemTest.java b/src/test/java/com/upplication/s3fs/S3FileSystemTest.java index a3b5c7d..434a0f4 100644 --- a/src/test/java/com/upplication/s3fs/S3FileSystemTest.java +++ b/src/test/java/com/upplication/s3fs/S3FileSystemTest.java @@ -208,11 +208,11 @@ public void comparables() throws IOException { S3FileSystem s3fs4 = (S3FileSystem) provider.newFileSystem(URI.create("s3://accessKey:secretKey@mirror2.amazon.test"), null); S3FileSystem s3fs6 = (S3FileSystem) provider.newFileSystem(URI.create("s3://access_key:secret_key@mirror1.amazon.test/"), null); AmazonS3ClientMock amazonClientMock = AmazonS3MockFactory.getAmazonClientMock(); - S3FileSystem s3fs7 = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test"); - S3FileSystem s3fs8 = new S3FileSystem(provider, null, amazonClientMock, null); - S3FileSystem s3fs9 = new S3FileSystem(provider, null, amazonClientMock, null); - S3FileSystem s3fs10 = new S3FileSystem(provider, "somekey", amazonClientMock, null); - S3FileSystem s3fs11 = new S3FileSystem(provider, "access-key@mirror2.amazon.test", amazonClientMock, "mirror2.amazon.test"); + S3FileSystem s3fs7 = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test", null); + S3FileSystem s3fs8 = new S3FileSystem(provider, null, amazonClientMock, null, null); + S3FileSystem s3fs9 = new S3FileSystem(provider, null, amazonClientMock, null, null); + S3FileSystem s3fs10 = new S3FileSystem(provider, "somekey", amazonClientMock, null, null); + S3FileSystem s3fs11 = new S3FileSystem(provider, "access-key@mirror2.amazon.test", amazonClientMock, "mirror2.amazon.test", null); // FIXME: review the hashcode creation. assertEquals(1483378423, s3fs1.hashCode()); @@ -252,7 +252,7 @@ public void comparables() throws IOException { public void key2Parts() { S3FileSystemProvider provider = new S3FileSystemProvider(); AmazonS3ClientMock amazonClientMock = AmazonS3MockFactory.getAmazonClientMock(); - S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test"); + S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test", null); try { String[] parts = s3fs.key2Parts("/bucket/folder with spaces/file"); assertEquals("", parts[0]); @@ -272,7 +272,7 @@ public void key2Parts() { public void parts2Key() { S3FileSystemProvider provider = new S3FileSystemProvider(); AmazonS3ClientMock amazonClientMock = AmazonS3MockFactory.getAmazonClientMock(); - S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test"); + S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test", null); S3Path path = s3fs.getPath("/bucket", "folder with spaces", "file"); try { assertEquals("folder with spaces/file", path.getKey()); @@ -293,7 +293,7 @@ public void urlWithSpecialCharacters() throws IOException { AmazonS3 amazonS3Client = AmazonS3ClientBuilder.standard() .withRegion(Regions.EU_WEST_1) .build(); - S3FileSystem s3FileSystem = new S3FileSystem(null, null, amazonS3Client, "mirror"); + S3FileSystem s3FileSystem = new S3FileSystem(null, null, amazonS3Client, "mirror", null); S3Path path = new S3Path(s3FileSystem, fileName); String url = amazonS3Client.getUrl("bucket", path.getKey()).toString(); @@ -309,7 +309,7 @@ public void urlWithSpaceCharacters() throws IOException { AmazonS3 amazonS3Client = AmazonS3ClientBuilder.standard() .withRegion(Regions.EU_WEST_1) .build(); - S3FileSystem s3FileSystem = new S3FileSystem(null, null, amazonS3Client, "mirror"); + S3FileSystem s3FileSystem = new S3FileSystem(null, null, amazonS3Client, "mirror", null); S3Path path = new S3Path(s3FileSystem, fileName); String url = amazonS3Client.getUrl("bucket", path.getKey()).toString(); @@ -321,7 +321,7 @@ public void urlWithSpaceCharacters() throws IOException { public void createDirectory() throws IOException { S3FileSystemProvider provider = new S3FileSystemProvider(); AmazonS3ClientMock amazonClientMock = AmazonS3MockFactory.getAmazonClientMock(); - S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test"); + S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test", null); try { S3Path folder = s3fs.getPath("/bucket", "folder"); provider.createDirectory(folder); @@ -339,7 +339,7 @@ public void createDirectory() throws IOException { public void createDirectoryWithAttributes() throws IOException { S3FileSystemProvider provider = new S3FileSystemProvider(); AmazonS3ClientMock amazonClientMock = AmazonS3MockFactory.getAmazonClientMock(); - S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test"); + S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test", null); try { S3Path folder = s3fs.getPath("/bucket", "folder"); provider.createDirectory(folder, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxrw"))); @@ -356,7 +356,7 @@ public void createDirectoryWithAttributes() throws IOException { public void isSameFile() throws IOException { S3FileSystemProvider provider = new S3FileSystemProvider(); AmazonS3ClientMock amazonClientMock = AmazonS3MockFactory.getAmazonClientMock(); - S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test"); + S3FileSystem s3fs = new S3FileSystem(provider, null, amazonClientMock, "mirror1.amazon.test", null); try { S3Path folder = s3fs.getPath("/bucket", "folder"); S3Path sameFolder = s3fs.getPath("/bucket", "folder"); diff --git a/src/test/java/com/upplication/s3fs/util/AmazonS3ClientMock.java b/src/test/java/com/upplication/s3fs/util/AmazonS3ClientMock.java index 792c886..b9682bf 100644 --- a/src/test/java/com/upplication/s3fs/util/AmazonS3ClientMock.java +++ b/src/test/java/com/upplication/s3fs/util/AmazonS3ClientMock.java @@ -37,13 +37,12 @@ import com.amazonaws.HttpMethod; import com.amazonaws.regions.Region; import com.amazonaws.services.s3.AbstractAmazonS3; -import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.S3ResponseMetadata; import com.amazonaws.services.s3.model.*; import com.amazonaws.services.s3.waiters.AmazonS3Waiters; import com.amazonaws.util.StringUtils; -import org.apache.http.MethodNotSupportedException; +import java.io.ByteArrayOutputStream; public class AmazonS3ClientMock extends AbstractAmazonS3 { /** @@ -63,6 +62,7 @@ public class AmazonS3ClientMock extends AbstractAmazonS3 { private Path base; private Map bucketOwners = new HashMap<>(); + private final Map multipartUploads = new HashMap<>(); public AmazonS3ClientMock(Path base) { this.base = base; @@ -637,6 +637,28 @@ private Path find(String bucketName) { return base.resolve(bucketName); } + public static class S3MultipartUpload { + private final S3Element s3Element; + private final List parts; + + public S3MultipartUpload(final S3Element s3Element) { + this.s3Element = s3Element; + this.parts = new ArrayList<>(); + } + + public S3Element getS3Element() { + return s3Element; + } + + public List getParts() { + return parts; + } + + public void addPart(final byte[] part) { + parts.add(part); + } + } + public static class S3Element { private S3Object s3Object; @@ -1217,12 +1239,40 @@ public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrl @Override public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request) throws AmazonClientException { - throw new UnsupportedOperationException(); + final String uploadId = UUID.randomUUID().toString(); + final InitiateMultipartUploadResult result = new InitiateMultipartUploadResult(); + + result.setUploadId(uploadId); + result.setKey(request.getKey()); + result.setBucketName(request.getBucketName()); + + final InputStream inputStream = new ByteArrayInputStream(new byte[0]); + final S3Element element = parse(inputStream, request.getBucketName(), request.getKey()); + + multipartUploads.put(uploadId, new S3MultipartUpload(element)); + + return result; } @Override public UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException { - throw new UnsupportedOperationException(); + final String uploadId = request.getUploadId(); + final UploadPartResult result = new UploadPartResult(); + + if ( ! multipartUploads.containsKey(uploadId)) { + throw new AmazonServiceException("Unknown uploadId : " + uploadId); + } + + try { + multipartUploads.get(uploadId).addPart(IOUtils.toByteArray(request.getInputStream())); + + result.setETag(UUID.randomUUID().toString()); + result.setPartNumber(request.getPartNumber()); + + return result; + } catch (IOException e) { + throw new AmazonServiceException("Fail to upload part", e); + } } @Override @@ -1232,12 +1282,41 @@ public PartListing listParts(ListPartsRequest request) throws AmazonClientExcept @Override public void abortMultipartUpload(AbortMultipartUploadRequest request) throws AmazonClientException { - throw new UnsupportedOperationException(); + multipartUploads.remove(request.getUploadId()); } @Override public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) throws AmazonClientException { - throw new UnsupportedOperationException(); + final String uploadId = request.getUploadId(); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream( ); + final CompleteMultipartUploadResult result = new CompleteMultipartUploadResult(); + + if ( ! multipartUploads.containsKey(uploadId)) { + throw new AmazonServiceException("Unknown uploadId : " + uploadId); + } + + final S3MultipartUpload upload = multipartUploads.remove(uploadId); + final S3Element element = upload.getS3Element(); + final S3Object s3Object = element.getS3Object(); + final ObjectMetadata metadata = s3Object.getObjectMetadata(); + + for (final byte[] part : upload.getParts()) { + try { + outputStream.write(part); + } catch (IOException e) { + throw new AmazonServiceException("Fail to complete upload", e); + } + } + + final byte[] bytes = outputStream.toByteArray(); + final InputStream inputStream = new ByteArrayInputStream(bytes); + + s3Object.setObjectContent(inputStream); + metadata.setContentLength(bytes.length); + + persist(s3Object.getBucketName(), element); + + return result; } @Override