diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index d45f68fceb2c..382451b1296c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; @@ -416,6 +417,16 @@ public static boolean isReadOnly( } } + /** + * Returns true if the container is in open to write state + * (OPEN or RECOVERING). + * + * @param state - container state + */ + public static boolean isOpenToWriteState(State state) { + return state == State.OPEN || state == State.RECOVERING; + } + /** * Not all datanode container cmd protocol has embedded ozone block token. * Block token are issued by Ozone Manager and return to Ozone client to diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index a10e1cccb10d..b5365820e3d8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -416,6 +417,23 @@ public static PutSmallFileResponseProto writeSmallFile( return response.getPutSmallFile(); } + /** + * createRecoveringContainer call that creates a container on the datanode. + * Currently this is used for EC reconstruction containers. When EC + * reconstruction coordinator reconstructing the containers, the in progress + * containers would be created as "RECOVERING" state containers. + * @param client - client + * @param containerID - ID of container + * @param encodedToken - encodedToken if security is enabled + * @throws IOException + */ + @InterfaceStability.Evolving + public static void createRecoveringContainer(XceiverClientSpi client, + long containerID, String encodedToken) throws IOException { + createContainerInternal(client, containerID, encodedToken, + ContainerProtos.ContainerDataProto.State.RECOVERING); + } + /** * createContainer call that creates a container on the datanode. * @param client - client @@ -425,11 +443,26 @@ public static PutSmallFileResponseProto writeSmallFile( */ public static void createContainer(XceiverClientSpi client, long containerID, String encodedToken) throws IOException { + createContainerInternal(client, containerID, encodedToken, null); + } + /** + * createContainer call that creates a container on the datanode. + * @param client - client + * @param containerID - ID of container + * @param encodedToken - encodedToken if security is enabled + * @param state - state of the container + * @throws IOException + */ + private static void createContainerInternal(XceiverClientSpi client, + long containerID, String encodedToken, + ContainerProtos.ContainerDataProto.State state) throws IOException { ContainerProtos.CreateContainerRequestProto.Builder createRequest = - ContainerProtos.CreateContainerRequestProto - .newBuilder(); + ContainerProtos.CreateContainerRequestProto.newBuilder(); createRequest.setContainerType(ContainerProtos.ContainerType .KeyValueContainer); + if (state != null) { + createRequest.setState(state); + } String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index d90ae6173c16..db8943f2a810 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -175,19 +175,24 @@ public static ContainerCommandRequestProto getPutBlockRequest( .build(); } + public static Builder newWriteChunkRequestBuilder(Pipeline pipeline, + BlockID blockID, int datalen, int seq) throws IOException { + ChunkBuffer data = getData(datalen); + return newWriteChunkRequestBuilder(pipeline, blockID, data, seq); + } + public static Builder newWriteChunkRequestBuilder( - Pipeline pipeline, BlockID blockID, int datalen, int seq) + Pipeline pipeline, BlockID blockID, ChunkBuffer data, int seq) throws IOException { LOG.trace("writeChunk {} (blockID={}) to pipeline={}", - datalen, blockID, pipeline); + data.limit(), blockID, pipeline); ContainerProtos.WriteChunkRequestProto.Builder writeRequest = ContainerProtos.WriteChunkRequestProto .newBuilder(); writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); - ChunkBuffer data = getData(datalen); - ChunkInfo info = getChunk(blockID.getLocalID(), seq, 0, datalen); + ChunkInfo info = getChunk(blockID.getLocalID(), seq, 0, data.limit()); setDataChecksum(info, data); writeRequest.setChunkData(info.getProtoBufMessage()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index e5325c95a593..6aaf6fc45757 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -246,6 +246,11 @@ public ContainerReportsProto getContainerReport() throws IOException { // consumers such as SCM. synchronized (this) { for (Container container : containers) { + if (container.getContainerState() + == ContainerProtos.ContainerDataProto.State.RECOVERING) { + // Skip the recovering containers in ICR and FCR for now. + continue; + } crBuilder.addReports(container.getContainerReport()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 83ba1044392c..802104a17140 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -344,7 +344,9 @@ private ContainerCommandResponseProto dispatchRequest( // only be in open or closing state. State containerState = container.getContainerData().getState(); Preconditions.checkState( - containerState == State.OPEN || containerState == State.CLOSING); + containerState == State.OPEN + || containerState == State.CLOSING + || containerState == State.RECOVERING); // mark and persist the container state to be unhealthy try { handler.markContainerUnhealthy(container); @@ -471,7 +473,8 @@ public void validateContainerCommand( } State containerState = container.getContainerState(); - if (!HddsUtils.isReadOnly(msg) && containerState != State.OPEN) { + if (!HddsUtils.isReadOnly(msg) + && !HddsUtils.isOpenToWriteState(containerState)) { switch (cmdType) { case CreateContainer: // Create Container is idempotent. There is nothing to validate. @@ -481,8 +484,8 @@ public void validateContainerCommand( // while execution. Nothing to validate here. break; default: - // if the container is not open, no updates can happen. Just throw - // an exception + // if the container is not open/recovering, no updates can happen. Just + // throw an exception ContainerNotOpenException cex = new ContainerNotOpenException( "Container " + containerID + " in " + containerState + " state"); audit(action, eventType, params, AuditEventStatus.FAILURE, cex); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 1dbd588d33cb..67f977ca387b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; @@ -97,6 +98,12 @@ protected String getDatanodeId() { */ protected void sendICR(final Container container) throws StorageContainerException { + if (container + .getContainerState() == ContainerProtos.ContainerDataProto + .State.RECOVERING) { + // Ignoring the recovering containers reports for now. + return; + } icrSender.send(container); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 742e1018c32e..5516e02fc5a8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; @@ -304,7 +305,7 @@ public void delete() throws StorageContainerException { public void markContainerForClose() throws StorageContainerException { writeLock(); try { - if (getContainerState() != ContainerDataProto.State.OPEN) { + if (!HddsUtils.isOpenToWriteState(getContainerState())) { throw new StorageContainerException( "Attempting to close a " + getContainerState() + " container.", CONTAINER_NOT_OPEN); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index fe5b8fc3ca63..7fcbdb3e7f4c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -31,6 +31,7 @@ import java.util.function.Function; import com.google.common.util.concurrent.Striped; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; @@ -276,6 +277,10 @@ ContainerCommandResponseProto handleCreateContainer( KeyValueContainerData newContainerData = new KeyValueContainerData( containerID, layoutVersion, maxContainerSize, request.getPipelineID(), getDatanodeId()); + State state = request.getCreateContainer().getState(); + if (state != null) { + newContainerData.setState(state); + } newContainerData.setReplicaIndex(request.getCreateContainer() .getReplicaIndex()); // TODO: Add support to add metadataList to ContainerData. Add metadata @@ -911,7 +916,8 @@ private void checkContainerOpen(KeyValueContainer kvContainer) * in the leader goes to closing state, will arrive here even the container * might already be in closing state here. */ - if (containerState == State.OPEN || containerState == State.CLOSING) { + if (containerState == State.OPEN || containerState == State.CLOSING + || containerState == State.RECOVERING) { return; } @@ -972,8 +978,8 @@ public void markContainerForClose(Container container) throws IOException { container.writeLock(); try { - // Move the container to CLOSING state only if it's OPEN - if (container.getContainerState() == State.OPEN) { + // Move the container to CLOSING state only if it's OPEN/RECOVERING + if (HddsUtils.isOpenToWriteState(container.getContainerState())) { container.markContainerForClose(); sendICR(container); } diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index c9d0c53273f2..c16059c5c43c 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -246,6 +246,7 @@ message ContainerDataProto { UNHEALTHY = 5; INVALID = 6; DELETED = 7; + RECOVERING = 8; } required int64 containerID = 1; repeated KeyValue metadata = 2; @@ -272,6 +273,7 @@ message CreateContainerRequestProto { repeated KeyValue metadata = 2; optional ContainerType containerType = 3 [default = KeyValueContainer]; optional int32 replicaIndex = 4; + optional ContainerDataProto.State state = 5; } message CreateContainerResponseProto { diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 01597a7b0ece..555431b199ec 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -210,6 +210,7 @@ enum LifeCycleState { CLOSED = 4; DELETING = 5; DELETED = 6; // object is deleted. + RECOVERING = 7; } enum LifeCycleEvent { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMContainerMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMContainerMetrics.java index e2bbe1781bd3..df209621037e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMContainerMetrics.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMContainerMetrics.java @@ -50,6 +50,7 @@ public void testSCMContainerMetrics() { put(HddsProtos.LifeCycleState.CLOSED.toString(), 5); put(HddsProtos.LifeCycleState.DELETING.toString(), 6); put(HddsProtos.LifeCycleState.DELETED.toString(), 7); + put(HddsProtos.LifeCycleState.RECOVERING.toString(), 8); }}; @@ -78,6 +79,6 @@ public void testSCMContainerMetrics() { verify(mb, times(1)).addGauge(Interns.info("DeletedContainers", "Number of containers in deleted state"), 7); verify(mb, times(1)).addGauge(Interns.info("TotalContainers", - "Number of all containers"), 27); + "Number of all containers"), 35); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 2ab9d241c8ef..76cfb7b289a8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -20,17 +20,22 @@ import com.google.common.collect.ImmutableList; import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig; @@ -43,6 +48,10 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.utils.BufferUtils; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -52,6 +61,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -60,6 +70,8 @@ import java.util.UUID; import java.util.function.Function; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * This class tests container commands on EC containers. */ @@ -94,15 +106,16 @@ public class TestContainerCommandsEC { private static Pipeline pipeline; private static List datanodeDetails; private List clients = null; + private static OzoneConfiguration config; @BeforeAll public static void init() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); - conf.setBoolean(OzoneConfigKeys.OZONE_ACL_ENABLED, true); - conf.set(OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS, + config = new OzoneConfiguration(); + config.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); + config.setBoolean(OzoneConfigKeys.OZONE_ACL_ENABLED, true); + config.set(OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS, OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE); - startCluster(conf); + startCluster(config); prepareData(KEY_SIZE_RANGES); } @@ -187,6 +200,84 @@ public void testListBlock() throws Exception { } } + @Test + public void testCreateRecoveryContainer() throws Exception { + XceiverClientManager xceiverClientManager = + new XceiverClientManager(config); + ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2); + Pipeline newPipeline = + scm.getPipelineManager().createPipeline(replicationConfig); + scm.getPipelineManager().activatePipeline(newPipeline.getId()); + ContainerInfo container = + scm.getContainerManager().allocateContainer(replicationConfig, "test"); + scm.getContainerManager().getContainerStateManager() + .addContainer(container.getProtobuf()); + + XceiverClientSpi dnClient = xceiverClientManager.acquireClient( + createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0), + 2)); + try { + // To create the actual situation, container would have been in closed + // state at SCM. + scm.getContainerManager().getContainerStateManager() + .updateContainerState(container.containerID().getProtobuf(), + HddsProtos.LifeCycleEvent.FINALIZE); + scm.getContainerManager().getContainerStateManager() + .updateContainerState(container.containerID().getProtobuf(), + HddsProtos.LifeCycleEvent.CLOSE); + + //Create the recovering container in DN. + ContainerProtocolCalls.createRecoveringContainer(dnClient, + container.containerID().getProtobuf().getId(), null); + + BlockID blockID = ContainerTestHelper + .getTestBlockID(container.containerID().getProtobuf().getId()); + byte[] data = "TestData".getBytes(UTF_8); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper.newWriteChunkRequestBuilder(newPipeline, blockID, + ChunkBuffer.wrap(ByteBuffer.wrap(data)), 0).build(); + dnClient.sendCommand(writeChunkRequest); + + // Now, explicitly make a putKey request for the block. + ContainerProtos.ContainerCommandRequestProto putKeyRequest = + ContainerTestHelper.getPutBlockRequest(newPipeline, + writeChunkRequest.getWriteChunk()); + dnClient.sendCommand(putKeyRequest); + + ContainerProtos.ReadContainerResponseProto readContainerResponseProto = + ContainerProtocolCalls.readContainer(dnClient, + container.containerID().getProtobuf().getId(), null); + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.RECOVERING, + readContainerResponseProto.getContainerData().getState()); + // Container at SCM should be still in closed state. + Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED, + scm.getContainerManager().getContainerStateManager() + .getContainer(container.containerID()).getState()); + // close container call + ContainerProtocolCalls.closeContainer(dnClient, + container.containerID().getProtobuf().getId(), null); + // Make sure we have the container and readable. + readContainerResponseProto = ContainerProtocolCalls + .readContainer(dnClient, + container.containerID().getProtobuf().getId(), null); + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + readContainerResponseProto.getContainerData().getState()); + ContainerProtos.ReadChunkResponseProto readChunkResponseProto = + ContainerProtocolCalls.readChunk(dnClient, + writeChunkRequest.getWriteChunk().getChunkData(), blockID, null, + null); + ByteBuffer[] readOnlyByteBuffersArray = BufferUtils + .getReadOnlyByteBuffersArray( + readChunkResponseProto.getDataBuffers().getBuffersList()); + Assert.assertEquals(readOnlyByteBuffersArray[0].limit(), data.length); + byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()]; + readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length); + Assert.assertArrayEquals(data, readBuff); + } finally { + xceiverClientManager.releaseClient(dnClient, false); + } + } + public static void startCluster(OzoneConfiguration conf) throws Exception { // Set minimum pipeline to 1 to ensure all data is written to