Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -600,6 +601,21 @@ public OzoneOutputStream createKey(String key, long size,
.createKey(volumeName, name, key, size, replicationConfig, keyMetadata);
}

/**
* Creates a new key in the bucket, with default replication type RATIS and
* with replication factor THREE.
*
* @param key Name of the key to be created.
* @param size Size of the data the key will point to.
* @return OzoneOutputStream to which the data has to be written.
* @throws IOException
*/
public OzoneDataStreamOutput createStreamKey(String key, long size)
throws IOException {
return createStreamKey(key, size, defaultReplication,
Collections.emptyMap());
}

/**
* Creates a new key in the bucket.
*
Expand All @@ -610,12 +626,13 @@ public OzoneOutputStream createKey(String key, long size,
* @throws IOException
*/
public OzoneDataStreamOutput createStreamKey(String key, long size,
ReplicationConfig replicationConfig,
Map<String, String> keyMetadata)
ReplicationConfig replicationConfig, Map<String, String> keyMetadata)
throws IOException {
return proxy
.createStreamKey(volumeName, name, key, size, replicationConfig,
keyMetadata);
if (replicationConfig == null) {
replicationConfig = defaultReplication;
}
return proxy.createStreamKey(volumeName, name, key, size,
replicationConfig, keyMetadata);
}

/**
Expand Down Expand Up @@ -958,9 +975,8 @@ public OzoneOutputStream createFile(String keyName, long size,
public OzoneDataStreamOutput createStreamFile(String keyName, long size,
ReplicationConfig replicationConfig, boolean overWrite,
boolean recursive) throws IOException {
return proxy
.createStreamFile(volumeName, name, keyName, size, replicationConfig,
overWrite, recursive);
return proxy.createStreamFile(volumeName, name, keyName, size,
replicationConfig, overWrite, recursive);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@

package org.apache.hadoop.ozone.client.rpc;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
Expand All @@ -42,8 +50,10 @@
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -208,4 +218,70 @@ public void testMultiPartUploadWithStream() throws IOException {
Assert.assertEquals(valueLength, partInfo.getSize());

}

@Test
public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException {
// create a local dir
final String dir = GenericTestUtils.getTempPath(
getClass().getSimpleName());
GenericTestUtils.assertDirCreation(new File(dir));

// create a local file
final int chunkSize = 1024;
final byte[] data = new byte[8 * chunkSize];
ThreadLocalRandom.current().nextBytes(data);
final File file = new File(dir, "data");
try (FileOutputStream out = new FileOutputStream(file)) {
out.write(data);
}

// create a volume
final String volumeName = "vol-" + UUID.randomUUID();
getStore().createVolume(volumeName);
final OzoneVolume volume = getStore().getVolume(volumeName);

// create a bucket
final String bucketName = "buck-" + UUID.randomUUID();
final BucketArgs bucketArgs = BucketArgs.newBuilder()
.setDefaultReplicationConfig(
new DefaultReplicationConfig(ReplicationType.RATIS, THREE))
.build();
volume.createBucket(bucketName, bucketArgs);
final OzoneBucket bucket = volume.getBucket(bucketName);

// upload a key from the local file using memory-mapped buffers
final String keyName = "key-" + UUID.randomUUID();
try (RandomAccessFile raf = new RandomAccessFile(file, "r");
OzoneDataStreamOutput out = bucket.createStreamKey(
keyName, data.length)) {
final FileChannel channel = raf.getChannel();
long off = 0;
for (long len = raf.length(); len > 0;) {
final long writeLen = Math.min(len, chunkSize);
final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY,
off, writeLen);
out.write(mapped);
off += writeLen;
len -= writeLen;
}
}

// verify the key details
final OzoneKeyDetails keyDetails = bucket.getKey(keyName);
Assertions.assertEquals(keyName, keyDetails.getName());
Assertions.assertEquals(data.length, keyDetails.getDataSize());

// verify the key content
final byte[] buffer = new byte[data.length];
try (OzoneInputStream in = keyDetails.getContent()) {
for (int off = 0; off < data.length;) {
final int n = in.read(buffer, off, data.length - off);
if (n < 0) {
break;
}
off += n;
}
}
Assertions.assertArrayEquals(data, buffer);
}
}