diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index d171a43f8a64..b0a8e965c4bb 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -600,6 +601,21 @@ public OzoneOutputStream createKey(String key, long size, .createKey(volumeName, name, key, size, replicationConfig, keyMetadata); } + /** + * Creates a new key in the bucket, with default replication type RATIS and + * with replication factor THREE. + * + * @param key Name of the key to be created. + * @param size Size of the data the key will point to. + * @return OzoneOutputStream to which the data has to be written. + * @throws IOException + */ + public OzoneDataStreamOutput createStreamKey(String key, long size) + throws IOException { + return createStreamKey(key, size, defaultReplication, + Collections.emptyMap()); + } + /** * Creates a new key in the bucket. * @@ -610,12 +626,13 @@ public OzoneOutputStream createKey(String key, long size, * @throws IOException */ public OzoneDataStreamOutput createStreamKey(String key, long size, - ReplicationConfig replicationConfig, - Map keyMetadata) + ReplicationConfig replicationConfig, Map keyMetadata) throws IOException { - return proxy - .createStreamKey(volumeName, name, key, size, replicationConfig, - keyMetadata); + if (replicationConfig == null) { + replicationConfig = defaultReplication; + } + return proxy.createStreamKey(volumeName, name, key, size, + replicationConfig, keyMetadata); } /** @@ -958,9 +975,8 @@ public OzoneOutputStream createFile(String keyName, long size, public OzoneDataStreamOutput createStreamFile(String keyName, long size, ReplicationConfig replicationConfig, boolean overWrite, boolean recursive) throws IOException { - return proxy - .createStreamFile(volumeName, name, keyName, size, replicationConfig, - overWrite, recursive); + return proxy.createStreamFile(volumeName, name, keyName, size, + replicationConfig, overWrite, recursive); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index 65551bc4e932..ca26ca177cc6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -18,21 +18,29 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.HashMap; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hdds.client.DefaultReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.BucketArgs; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; @@ -42,8 +50,10 @@ import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.Assert; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -208,4 +218,70 @@ public void testMultiPartUploadWithStream() throws IOException { Assert.assertEquals(valueLength, partInfo.getSize()); } + + @Test + public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException { + // create a local dir + final String dir = GenericTestUtils.getTempPath( + getClass().getSimpleName()); + GenericTestUtils.assertDirCreation(new File(dir)); + + // create a local file + final int chunkSize = 1024; + final byte[] data = new byte[8 * chunkSize]; + ThreadLocalRandom.current().nextBytes(data); + final File file = new File(dir, "data"); + try (FileOutputStream out = new FileOutputStream(file)) { + out.write(data); + } + + // create a volume + final String volumeName = "vol-" + UUID.randomUUID(); + getStore().createVolume(volumeName); + final OzoneVolume volume = getStore().getVolume(volumeName); + + // create a bucket + final String bucketName = "buck-" + UUID.randomUUID(); + final BucketArgs bucketArgs = BucketArgs.newBuilder() + .setDefaultReplicationConfig( + new DefaultReplicationConfig(ReplicationType.RATIS, THREE)) + .build(); + volume.createBucket(bucketName, bucketArgs); + final OzoneBucket bucket = volume.getBucket(bucketName); + + // upload a key from the local file using memory-mapped buffers + final String keyName = "key-" + UUID.randomUUID(); + try (RandomAccessFile raf = new RandomAccessFile(file, "r"); + OzoneDataStreamOutput out = bucket.createStreamKey( + keyName, data.length)) { + final FileChannel channel = raf.getChannel(); + long off = 0; + for (long len = raf.length(); len > 0;) { + final long writeLen = Math.min(len, chunkSize); + final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY, + off, writeLen); + out.write(mapped); + off += writeLen; + len -= writeLen; + } + } + + // verify the key details + final OzoneKeyDetails keyDetails = bucket.getKey(keyName); + Assertions.assertEquals(keyName, keyDetails.getName()); + Assertions.assertEquals(data.length, keyDetails.getDataSize()); + + // verify the key content + final byte[] buffer = new byte[data.length]; + try (OzoneInputStream in = keyDetails.getContent()) { + for (int off = 0; off < data.length;) { + final int n = in.read(buffer, off, data.length - off); + if (n < 0) { + break; + } + off += n; + } + } + Assertions.assertArrayEquals(data, buffer); + } }