diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index b60f7546309f..01b4eab86c4e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -97,6 +97,13 @@ public final class OzoneConfigKeys { public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT = 9855; + /** + * Flag to enable ratis streaming on filesystem writes. + */ + public static final String OZONE_FS_DATASTREAM_ENABLE = + "ozone.fs.datastream.enable"; + public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = false; + /** * When set to true, allocate a random free port for ozone container, so that * a mini cluster is able to launch multiple containers on a node. diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 54f8e5cba7b3..31ed276b19a2 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3030,4 +3030,11 @@ will create intermediate directories. + + ozone.fs.datastream.enable + false + OZONE, DATANODE + To enable/disable filesystem write via ratis streaming. + + diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index 61cbb69081ce..9bcfb11d4b37 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -859,6 +859,14 @@ public OzoneOutputStream createFile(String keyName, long size, overWrite, recursive); } + public OzoneDataStreamOutput createStreamFile(String keyName, long size, + ReplicationConfig replicationConfig, boolean overWrite, + boolean recursive) throws IOException { + return proxy + .createStreamFile(volumeName, name, keyName, size, replicationConfig, + overWrite, recursive); + } + /** * List the status for a file or a directory and its contents. * diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index 6de6d226e72a..b0fe9d462059 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -680,6 +680,11 @@ OzoneOutputStream createFile(String volumeName, String bucketName, String keyName, long size, ReplicationConfig replicationConfig, boolean overWrite, boolean recursive) throws IOException; + @SuppressWarnings("checkstyle:parameternumber") + OzoneDataStreamOutput createStreamFile(String volumeName, String bucketName, + String keyName, long size, ReplicationConfig replicationConfig, + boolean overWrite, boolean recursive) throws IOException; + /** * List the status for a file or a directory and its contents. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 43bb3ec775cd..1b530e736147 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1322,6 +1322,26 @@ public OzoneOutputStream createFile(String volumeName, String bucketName, replicationConfig); } + @Override + public OzoneDataStreamOutput createStreamFile(String volumeName, + String bucketName, String keyName, long size, + ReplicationConfig replicationConfig, boolean overWrite, boolean recursive) + throws IOException { + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(size) + .setReplicationConfig(replicationConfig) + .setAcls(getAclList()) + .setLatestVersionLocation(getLatestVersionLocation) + .build(); + OpenKeySession keySession = + ozoneManagerClient.createFile(keyArgs, overWrite, recursive); + return createDataStreamOutput(keySession, UUID.randomUUID().toString(), + replicationConfig); + } + @Override public List listStatus(String volumeName, String bucketName, String keyName, boolean recursive, String startKey, long numEntries) diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index b9668257eb37..70750bb6f8eb 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneKey; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -219,6 +220,37 @@ public OzoneFSOutputStream createFile(String key, short replication, } } + @Override + public OzoneFSDataStreamOutput createStreamFile(String key, short replication, + boolean overWrite, boolean recursive) throws IOException { + incrementCounter(Statistic.OBJECTS_CREATED, 1); + try { + OzoneDataStreamOutput ozoneDataStreamOutput = null; + if (replication == ReplicationFactor.ONE.getValue() + || replication == ReplicationFactor.THREE.getValue()) { + + ReplicationConfig customReplicationConfig = + ReplicationConfig.adjustReplication(replicationConfig, replication); + ozoneDataStreamOutput = bucket + .createStreamFile(key, 0, customReplicationConfig, overWrite, + recursive); + } else { + ozoneDataStreamOutput = bucket + .createStreamFile(key, 0, replicationConfig, overWrite, recursive); + } + return new OzoneFSDataStreamOutput( + ozoneDataStreamOutput.getByteBufStreamOutput()); + } catch (OMException ex) { + if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS + || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) { + throw new FileAlreadyExistsException( + ex.getResult().name() + ": " + ex.getMessage()); + } else { + throw ex; + } + } + } + @Override public void renameKey(String key, String newKeyName) throws IOException { incrementCounter(Statistic.OBJECTS_RENAMED, 1); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java index c920747b70ee..1592032c5f7f 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -259,6 +260,13 @@ public FSDataOutputStream createNonRecursive(Path path, private FSDataOutputStream createOutputStream(String key, short replication, boolean overwrite, boolean recursive) throws IOException { + boolean isRatisStreamingEnabled = getConf().getBoolean( + OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE, + OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT); + if (isRatisStreamingEnabled){ + return new FSDataOutputStream(adapter.createStreamFile(key, + replication, overwrite, recursive), statistics); + } return new FSDataOutputStream(adapter.createFile(key, replication, overwrite, recursive), statistics); } diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java index 43406b7ed2c7..e5d8bca949fe 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java @@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneKey; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.om.exceptions.OMException; @@ -339,6 +340,43 @@ public OzoneFSOutputStream createFile(String pathStr, short replication, } } + @Override + public OzoneFSDataStreamOutput createStreamFile(String pathStr, + short replication, boolean overWrite, boolean recursive) + throws IOException { + incrementCounter(Statistic.OBJECTS_CREATED, 1); + OFSPath ofsPath = new OFSPath(pathStr); + if (ofsPath.isRoot() || ofsPath.isVolume() || ofsPath.isBucket()) { + throw new IOException("Cannot create file under root or volume."); + } + String key = ofsPath.getKeyName(); + try { + // Hadoop CopyCommands class always sets recursive to true + OzoneBucket bucket = getBucket(ofsPath, recursive); + OzoneDataStreamOutput ozoneDataStreamOutput = null; + if (replication == ReplicationFactor.ONE.getValue() + || replication == ReplicationFactor.THREE.getValue()) { + + ozoneDataStreamOutput = bucket.createStreamFile(key, 0, + ReplicationConfig.adjustReplication(replicationConfig, replication), + overWrite, recursive); + } else { + ozoneDataStreamOutput = bucket + .createStreamFile(key, 0, replicationConfig, overWrite, recursive); + } + return new OzoneFSDataStreamOutput( + ozoneDataStreamOutput.getByteBufStreamOutput()); + } catch (OMException ex) { + if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS + || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) { + throw new FileAlreadyExistsException( + ex.getResult().name() + ": " + ex.getMessage()); + } else { + throw ex; + } + } + } + @Override public void renameKey(String key, String newKeyName) throws IOException { throw new IOException("OFS doesn't support renameKey, use rename instead."); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java index 35065f070112..aea8fe2288a8 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.ozone.OFSPath; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.om.exceptions.OMException; @@ -235,6 +236,13 @@ public FSDataOutputStream createNonRecursive(Path path, private FSDataOutputStream createOutputStream(String key, short replication, boolean overwrite, boolean recursive) throws IOException { + boolean isRatisStreamingEnabled = getConf().getBoolean( + OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE, + OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLE_DEFAULT); + if (isRatisStreamingEnabled){ + return new FSDataOutputStream(adapter.createStreamFile(key, + replication, overwrite, recursive), statistics); + } return new FSDataOutputStream(adapter.createFile(key, replication, overwrite, recursive), statistics); } diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java index 0258f6973d03..d34c97b115d9 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java @@ -44,6 +44,9 @@ public interface OzoneClientAdapter { OzoneFSOutputStream createFile(String key, short replication, boolean overWrite, boolean recursive) throws IOException; + OzoneFSDataStreamOutput createStreamFile(String key, short replication, + boolean overWrite, boolean recursive) throws IOException; + void renameKey(String key, String newKeyName) throws IOException; // Users should use rename instead of renameKey in OFS. diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java new file mode 100644 index 000000000000..515dbca92b42 --- /dev/null +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.ozone; + +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * The ByteBuffer output stream for Ozone file system. + */ +public class OzoneFSDataStreamOutput extends OutputStream + implements ByteBufferStreamOutput { + + private final ByteBufferStreamOutput byteBufferStreamOutput; + + public OzoneFSDataStreamOutput( + ByteBufferStreamOutput byteBufferStreamOutput) { + this.byteBufferStreamOutput = byteBufferStreamOutput; + } + + /** + * Try to write the [off:off + len) slice in ByteBuf b to DataStream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @throws IOException if an I/O error occurs. + */ + @Override + public void write(ByteBuffer b, int off, int len) + throws IOException { + byteBufferStreamOutput.write(b, off, len); + } + + /** + * Writes the specified byte to this output stream. The general + * contract for write is that one byte is written + * to the output stream. The byte to be written is the eight + * low-order bits of the argument b. The 24 + * high-order bits of b are ignored. + *

+ * Subclasses of OutputStream must provide an + * implementation for this method. + * + * @param b the byte. + * @throws IOException if an I/O error occurs. In particular, + * an IOException may be thrown if the + * output stream has been closed. + */ + @Override + public void write(int b) throws IOException { + byte[] singleBytes = new byte[1]; + singleBytes[0] = (byte) b; + byteBufferStreamOutput.write(ByteBuffer.wrap(singleBytes)); + } + + /** + * Flushes this DataStream output and forces any buffered output bytes + * to be written out. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void flush() throws IOException { + byteBufferStreamOutput.flush(); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + *

As noted in {@link AutoCloseable#close()}, cases where the + * close may fail require careful attention. It is strongly advised + * to relinquish the underlying resources and to internally + * mark the {@code Closeable} as closed, prior to throwing + * the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + byteBufferStreamOutput.close(); + } +}