Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public final class OzoneConfigKeys {
public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT
= 9855;

/**
* Flag to enable ratis streaming on filesystem writes.
*/
public static final String OZONE_FS_DATASTREAM_ENABLE =
"ozone.fs.datastream.enable";
public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = false;

/**
* When set to true, allocate a random free port for ozone container, so that
* a mini cluster is able to launch multiple containers on a node.
Expand Down
7 changes: 7 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3030,4 +3030,11 @@
will create intermediate directories.
</description>
</property>
<property>
<name>ozone.fs.datastream.enable</name>
<value>false</value>
<tag>OZONE, DATANODE</tag>
<description> To enable/disable filesystem write via ratis streaming.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,14 @@ public OzoneOutputStream createFile(String keyName, long size,
overWrite, recursive);
}

public OzoneDataStreamOutput createStreamFile(String keyName, long size,
ReplicationConfig replicationConfig, boolean overWrite,
boolean recursive) throws IOException {
return proxy
.createStreamFile(volumeName, name, keyName, size, replicationConfig,
overWrite, recursive);
}

/**
* List the status for a file or a directory and its contents.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ OzoneOutputStream createFile(String volumeName, String bucketName,
String keyName, long size, ReplicationConfig replicationConfig,
boolean overWrite, boolean recursive) throws IOException;

@SuppressWarnings("checkstyle:parameternumber")
OzoneDataStreamOutput createStreamFile(String volumeName, String bucketName,
String keyName, long size, ReplicationConfig replicationConfig,
boolean overWrite, boolean recursive) throws IOException;


/**
* List the status for a file or a directory and its contents.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,26 @@ public OzoneOutputStream createFile(String volumeName, String bucketName,
replicationConfig);
}

@Override
public OzoneDataStreamOutput createStreamFile(String volumeName,
String bucketName, String keyName, long size,
ReplicationConfig replicationConfig, boolean overWrite, boolean recursive)
throws IOException {
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setDataSize(size)
.setReplicationConfig(replicationConfig)
.setAcls(getAclList())
.setLatestVersionLocation(getLatestVersionLocation)
.build();
OpenKeySession keySession =
ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
return createDataStreamOutput(keySession, UUID.randomUUID().toString(),
replicationConfig);
}

@Override
public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
String keyName, boolean recursive, String startKey, long numEntries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
Expand Down Expand Up @@ -219,6 +220,37 @@ public OzoneFSOutputStream createFile(String key, short replication,
}
}

@Override
public OzoneFSDataStreamOutput createStreamFile(String key, short replication,
boolean overWrite, boolean recursive) throws IOException {
incrementCounter(Statistic.OBJECTS_CREATED, 1);
try {
OzoneDataStreamOutput ozoneDataStreamOutput = null;
if (replication == ReplicationFactor.ONE.getValue()
|| replication == ReplicationFactor.THREE.getValue()) {

ReplicationConfig customReplicationConfig =
ReplicationConfig.adjustReplication(replicationConfig, replication);
ozoneDataStreamOutput = bucket
.createStreamFile(key, 0, customReplicationConfig, overWrite,
recursive);
} else {
ozoneDataStreamOutput = bucket
.createStreamFile(key, 0, replicationConfig, overWrite, recursive);
}
return new OzoneFSDataStreamOutput(
ozoneDataStreamOutput.getByteBufStreamOutput());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileAlreadyExistsException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}

@Override
public void renameKey(String key, String newKeyName) throws IOException {
incrementCounter(Statistic.OBJECTS_RENAMED, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -259,6 +260,13 @@ public FSDataOutputStream createNonRecursive(Path path,

private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
boolean isRatisStreamingEnabled = getConf().getBoolean(
OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
if (isRatisStreamingEnabled){
return new FSDataOutputStream(adapter.createStreamFile(key,
replication, overwrite, recursive), statistics);
}
return new FSDataOutputStream(adapter.createFile(key,
replication, overwrite, recursive), statistics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand Down Expand Up @@ -339,6 +340,43 @@ public OzoneFSOutputStream createFile(String pathStr, short replication,
}
}

@Override
public OzoneFSDataStreamOutput createStreamFile(String pathStr,
short replication, boolean overWrite, boolean recursive)
throws IOException {
incrementCounter(Statistic.OBJECTS_CREATED, 1);
OFSPath ofsPath = new OFSPath(pathStr);
if (ofsPath.isRoot() || ofsPath.isVolume() || ofsPath.isBucket()) {
throw new IOException("Cannot create file under root or volume.");
}
String key = ofsPath.getKeyName();
try {
// Hadoop CopyCommands class always sets recursive to true
OzoneBucket bucket = getBucket(ofsPath, recursive);
OzoneDataStreamOutput ozoneDataStreamOutput = null;
if (replication == ReplicationFactor.ONE.getValue()
|| replication == ReplicationFactor.THREE.getValue()) {

ozoneDataStreamOutput = bucket.createStreamFile(key, 0,
ReplicationConfig.adjustReplication(replicationConfig, replication),
overWrite, recursive);
} else {
ozoneDataStreamOutput = bucket
.createStreamFile(key, 0, replicationConfig, overWrite, recursive);
}
return new OzoneFSDataStreamOutput(
ozoneDataStreamOutput.getByteBufStreamOutput());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileAlreadyExistsException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}

@Override
public void renameKey(String key, String newKeyName) throws IOException {
throw new IOException("OFS doesn't support renameKey, use rename instead.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.OFSPath;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand Down Expand Up @@ -235,6 +236,13 @@ public FSDataOutputStream createNonRecursive(Path path,

private FSDataOutputStream createOutputStream(String key, short replication,
boolean overwrite, boolean recursive) throws IOException {
boolean isRatisStreamingEnabled = getConf().getBoolean(
OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE,
OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT);
if (isRatisStreamingEnabled){
return new FSDataOutputStream(adapter.createStreamFile(key,
replication, overwrite, recursive), statistics);
}
return new FSDataOutputStream(adapter.createFile(key,
replication, overwrite, recursive), statistics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public interface OzoneClientAdapter {
OzoneFSOutputStream createFile(String key, short replication,
boolean overWrite, boolean recursive) throws IOException;

OzoneFSDataStreamOutput createStreamFile(String key, short replication,
boolean overWrite, boolean recursive) throws IOException;

void renameKey(String key, String newKeyName) throws IOException;

// Users should use rename instead of renameKey in OFS.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;

import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* The ByteBuffer output stream for Ozone file system.
*/
public class OzoneFSDataStreamOutput extends OutputStream
implements ByteBufferStreamOutput {

private final ByteBufferStreamOutput byteBufferStreamOutput;

public OzoneFSDataStreamOutput(
ByteBufferStreamOutput byteBufferStreamOutput) {
this.byteBufferStreamOutput = byteBufferStreamOutput;
}

/**
* Try to write the [off:off + len) slice in ByteBuf b to DataStream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @throws IOException if an I/O error occurs.
*/
@Override
public void write(ByteBuffer b, int off, int len)
throws IOException {
byteBufferStreamOutput.write(b, off, len);
}

/**
* Writes the specified byte to this output stream. The general
* contract for <code>write</code> is that one byte is written
* to the output stream. The byte to be written is the eight
* low-order bits of the argument <code>b</code>. The 24
* high-order bits of <code>b</code> are ignored.
* <p>
* Subclasses of <code>OutputStream</code> must provide an
* implementation for this method.
*
* @param b the <code>byte</code>.
* @throws IOException if an I/O error occurs. In particular,
* an <code>IOException</code> may be thrown if the
* output stream has been closed.
*/
@Override
public void write(int b) throws IOException {
byte[] singleBytes = new byte[1];
singleBytes[0] = (byte) b;
byteBufferStreamOutput.write(ByteBuffer.wrap(singleBytes));
}

/**
* Flushes this DataStream output and forces any buffered output bytes
* to be written out.
*
* @throws IOException if an I/O error occurs.
*/
@Override
public void flush() throws IOException {
byteBufferStreamOutput.flush();
}

/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
byteBufferStreamOutput.close();
}
}