diff --git a/hadoop-ozone/integration-test/pom.xml b/hadoop-ozone/integration-test/pom.xml index f66f64d2874f..ad18bcb1920d 100644 --- a/hadoop-ozone/integration-test/pom.xml +++ b/hadoop-ozone/integration-test/pom.xml @@ -141,6 +141,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> org.slf4j * + + com.sun.jersey + jersey-servlet + @@ -161,6 +165,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> org.slf4j * + + com.sun.jersey + jersey-servlet + @@ -216,6 +224,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> org.slf4j * + + com.sun.jersey + jersey-servlet + diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 9c76c0ec0c79..ff55ee83c176 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -35,11 +35,14 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.recon.ReconServer; +import org.apache.hadoop.ozone.s3.Gateway; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.function.CheckedFunction; +import com.amazonaws.services.s3.AmazonS3; + /** * Interface used for MiniOzoneClusters. */ @@ -142,10 +145,17 @@ void waitForPipelineTobeReady(HddsProtos.ReplicationFactor factor, /** * Returns a {@link ReconServer} instance. * - * @return List of {@link ReconServer} + * @return {@link ReconServer} instance if it is initialized, otherwise null. */ ReconServer getReconServer(); + /** + * Returns a {@link Gateway} instance. + * + * @return {@link Gateway} instance if it is initialized, otherwise null. + */ + Gateway getS3G(); + /** * Returns an {@link OzoneClient} to access the {@link MiniOzoneCluster}. * The caller is responsible for closing the client after use. @@ -154,6 +164,11 @@ void waitForPipelineTobeReady(HddsProtos.ReplicationFactor factor, */ OzoneClient newClient() throws IOException; + /** + * Returns an {@link AmazonS3} to access the {@link MiniOzoneCluster}. + */ + AmazonS3 newS3Client(); + /** * Returns StorageContainerLocationClient to communicate with * {@link StorageContainerManager} associated with the MiniOzoneCluster. @@ -219,6 +234,21 @@ void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode) */ void stopRecon(); + /** + * Start S3G. + */ + void startS3G(); + + /** + * Restart S3G. + */ + void restartS3G(); + + /** + * Stop S3G. + */ + void stopS3G(); + /** * Shutdown the MiniOzoneCluster and delete the storage dirs. */ @@ -273,6 +303,7 @@ abstract class Builder { protected String omId = UUID.randomUUID().toString(); protected boolean includeRecon = false; + protected boolean includeS3G = false; protected int dnInitialVersion = DatanodeVersion.FUTURE_VERSION.toProtoValue(); protected int dnCurrentVersion = DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUNK_RPC.toProtoValue(); @@ -382,6 +413,11 @@ public Builder includeRecon(boolean include) { return this; } + public Builder includeS3G(boolean include) { + this.includeS3G = include; + return this; + } + /** * Constructs and returns MiniOzoneCluster. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 50013b57f4c3..3594996856af 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -32,6 +32,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.DatanodeVersion; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -58,6 +66,7 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; +import org.apache.hadoop.hdds.server.http.HttpConfig; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.CodecBuffer; import org.apache.hadoop.hdds.utils.db.CodecTestUtil; @@ -73,6 +82,10 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.recon.ConfigurationProvider; import org.apache.hadoop.ozone.recon.ReconServer; +import org.apache.hadoop.ozone.s3.Gateway; +import org.apache.hadoop.ozone.s3.OzoneClientCache; +import org.apache.hadoop.ozone.s3.OzoneConfigurationHolder; +import org.apache.hadoop.ozone.s3.S3GatewayConfigKeys; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.ozone.test.GenericTestUtils; @@ -84,9 +97,14 @@ import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_TASK_SAFEMODE_WAIT_THRESHOLD; +import static org.apache.hadoop.hdds.server.http.HttpConfig.getHttpPolicy; +import static org.apache.hadoop.hdds.server.http.HttpServer2.HTTPS_SCHEME; +import static org.apache.hadoop.hdds.server.http.HttpServer2.HTTP_SCHEME; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR; +import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_HTTP_ADDRESS_KEY; import static org.apache.ozone.test.GenericTestUtils.PortAllocator.anyHostWithFreePort; import static org.apache.ozone.test.GenericTestUtils.PortAllocator.getFreePort; import static org.apache.ozone.test.GenericTestUtils.PortAllocator.localhostWithFreePort; @@ -120,6 +138,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { private OzoneManager ozoneManager; private final List hddsDatanodes; private ReconServer reconServer; + private Gateway s3g; // Timeout for the cluster to be ready private int waitForClusterToBeReadyTimeout = 120000; // 2 min @@ -136,13 +155,15 @@ private MiniOzoneClusterImpl(OzoneConfiguration conf, OzoneManager ozoneManager, StorageContainerManager scm, List hddsDatanodes, - ReconServer reconServer) { + ReconServer reconServer, + Gateway s3g) { this.conf = conf; this.ozoneManager = ozoneManager; this.scm = scm; this.hddsDatanodes = hddsDatanodes; this.reconServer = reconServer; this.scmConfigurator = scmConfigurator; + this.s3g = s3g; } /** @@ -268,6 +289,11 @@ public ReconServer getReconServer() { return this.reconServer; } + @Override + public Gateway getS3G() { + return this.s3g; + } + @Override public int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException { for (HddsDatanodeService service : hddsDatanodes) { @@ -286,6 +312,54 @@ public OzoneClient newClient() throws IOException { return client; } + @Override + public AmazonS3 newS3Client() { + // TODO: Parameterize tests between Virtual host style and Path style + return createS3Client(true); + } + + public AmazonS3 createS3Client(boolean enablePathStyle) { + final String accessKey = "user"; + final String secretKey = "password"; + final Regions region = Regions.DEFAULT_REGION; + + final String protocol; + final HttpConfig.Policy webPolicy = getHttpPolicy(conf); + String host; + + if (webPolicy.isHttpsEnabled()) { + protocol = HTTPS_SCHEME; + host = conf.get(OZONE_S3G_HTTPS_ADDRESS_KEY); + } else { + protocol = HTTP_SCHEME; + host = conf.get(OZONE_S3G_HTTP_ADDRESS_KEY); + } + + String endpoint = protocol + "://" + host; + + AWSCredentialsProvider credentials = new AWSStaticCredentialsProvider( + new BasicAWSCredentials(accessKey, secretKey) + ); + + + ClientConfiguration clientConfiguration = new ClientConfiguration(); + LOG.info("S3 Endpoint is {}", endpoint); + + AmazonS3 s3Client = + AmazonS3ClientBuilder.standard() + .withPathStyleAccessEnabled(enablePathStyle) + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + endpoint, region.getName() + ) + ) + .withClientConfiguration(clientConfiguration) + .withCredentials(credentials) + .build(); + + return s3Client; + } + protected OzoneClient createClient() throws IOException { return OzoneClientFactory.getRpcClient(conf); } @@ -428,6 +502,7 @@ public void stop() { stopDatanodes(hddsDatanodes); stopSCM(scm); stopRecon(reconServer); + stopS3G(s3g); } private void startHddsDatanode(HddsDatanodeService datanode) { @@ -467,6 +542,23 @@ public void stopRecon() { stopRecon(reconServer); } + @Override + public void startS3G() { + s3g = new Gateway(); + s3g.execute(NO_ARGS); + } + + @Override + public void restartS3G() { + stopS3G(s3g); + startS3G(); + } + + @Override + public void stopS3G() { + stopS3G(s3g); + } + private CertificateClient getCAClient() { return this.caClient; } @@ -521,6 +613,19 @@ private static void stopRecon(ReconServer reconServer) { } } + private static void stopS3G(Gateway s3g) { + try { + if (s3g != null) { + LOG.info("Stopping S3G"); + // TODO (HDDS-11539): Remove this workaround once the @PreDestroy issue is fixed + OzoneClientCache.closeClient(); + s3g.stop(); + } + } catch (Exception e) { + LOG.error("Exception while shutting down S3 Gateway.", e); + } + } + /** * Builder for configuring the MiniOzoneCluster to run. */ @@ -544,15 +649,17 @@ public MiniOzoneCluster build() throws IOException { OzoneManager om = null; ReconServer reconServer = null; List hddsDatanodes = Collections.emptyList(); + Gateway s3g = null; try { scm = createAndStartSingleSCM(); om = createAndStartSingleOM(); + s3g = createS3G(); reconServer = createRecon(); hddsDatanodes = createHddsDatanodes(); MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, scmConfigurator, om, scm, - hddsDatanodes, reconServer); + hddsDatanodes, reconServer, s3g); cluster.setCAClient(certClient); cluster.setSecretKeyClient(secretKeyClient); @@ -567,6 +674,9 @@ public MiniOzoneCluster build() throws IOException { if (includeRecon) { stopRecon(reconServer); } + if (includeS3G) { + stopS3G(s3g); + } if (startDataNodes) { stopDatanodes(hddsDatanodes); } @@ -740,6 +850,16 @@ protected ReconServer createRecon() { return reconServer; } + protected Gateway createS3G() { + Gateway s3g = null; + if (includeS3G) { + configureS3G(); + s3g = new Gateway(); + s3g.execute(NO_ARGS); + } + return s3g; + } + /** * Creates HddsDatanodeService(s) instance. * @@ -806,5 +926,14 @@ protected void configureRecon() { ConfigurationProvider.setConfiguration(conf); } + private void configureS3G() { + OzoneConfigurationHolder.resetConfiguration(); + + conf.set(S3GatewayConfigKeys.OZONE_S3G_HTTP_ADDRESS_KEY, localhostWithFreePort()); + conf.set(S3GatewayConfigKeys.OZONE_S3G_HTTPS_ADDRESS_KEY, localhostWithFreePort()); + + OzoneConfigurationHolder.setConfiguration(conf); + } + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java new file mode 100644 index 000000000000..069ac4bd9dce --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.s3.awssdk.v1; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.Bucket; +import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.CreateBucketRequest; +import com.amazonaws.services.s3.model.Grantee; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ListPartsRequest; +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.MultipartUploadListing; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.ObjectTagging; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PartListing; +import com.amazonaws.services.s3.model.PartSummary; +import com.amazonaws.services.s3.model.Permission; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.Tag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerBuilder; +import com.amazonaws.services.s3.transfer.Upload; +import com.amazonaws.services.s3.transfer.model.UploadResult; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.utils.InputSubstream; +import org.apache.ozone.test.OzoneTestBase; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.io.TempDir; + +import javax.xml.bind.DatatypeConverter; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.apache.hadoop.ozone.OzoneConsts.MB; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This is an abstract class to test the AWS Java S3 SDK operations. + * This class should be extended for OM standalone and OM HA (Ratis) cluster setup. + * + * The test scenarios are adapted from + * - https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/java/example_code/s3/ + * - https://github.com/ceph/s3-tests + * + * TODO: Currently we are using AWS SDK V1, need to also add tests for AWS SDK V2. + */ +@TestMethodOrder(MethodOrderer.MethodName.class) +public abstract class AbstractS3SDKV1Tests extends OzoneTestBase { + + /** + * There are still some unsupported S3 operations. + * Current unsupported S3 operations (non-exhaustive): + * - Cross Region Replication (CrossRegionReplication.java) + * - Versioned enabled buckets + * - DeleteObjectVersionEnabledBucket.java + * - DeleteMultipleObjectsVersionEnabledBucket.java + * - ListKeysVersioningEnabledBucket.java + * - Website configurations + * - WebsiteConfiguration.java + * - SetWebsiteConfiguration.java + * - GetWebsiteConfiguration.java + * - DeleteWebsiteConfiguration.java + * - S3 Event Notifications + * - EnableNotificationOnABucket.java + * - Object tags + * - GetObjectTags.java + * - GetObjectTags2.java + * - Bucket policy + * - SetBucketPolicy.java + * - GetBucketPolicy.java + * - DeleteBucketPolicy.java + * - Bucket lifecycle configuration + * - LifecycleConfiguration.java + * - Canned Bucket ACL + * - CreateBucketWithACL.java + * - Object ACL + * - SetAcl.java + * - ModifyACLExistingObject.java + * - GetAcl.java + * - S3 Encryption + * - S3Encrypt.java + * - S3EncryptV2.java + * - Client-side encryption + * - S3ClientSideEncryptionAsymmetricMasterKey.java + * - S3ClientSideEncryptionSymMasterKey.java + * - Server-side encryption + * - SpecifyServerSideEncryption.ajva + * - ServerSideEncryptionCopyObjectUsingHLWithSSEC.java + * - ServerSideEncryptionUsingClientSideEncryptionKey.java + * - Dual stack endpoints + * - DualStackEndpoints.java + * - Transfer acceleration + * - TransferAcceleration.java + * - Temp credentials + * - MakingRequestsWithFederatedTempCredentials.java + * - MakingRequestsWithIAMTempCredentials.java + * - Object archival + * - RestoreArchivedObject + * - KMS key + * - UploadObjectKMSKey.java + */ + + private static MiniOzoneCluster cluster = null; + private static AmazonS3 s3Client = null; + + /** + * Create a MiniOzoneCluster with S3G enabled for testing. + * @param conf Configurations to start the cluster + * @throws Exception exception thrown when waiting for the cluster to be ready. + */ + static void startCluster(OzoneConfiguration conf) throws Exception { + cluster = MiniOzoneCluster.newBuilder(conf) + .includeS3G(true) + .setNumDatanodes(5) + .build(); + cluster.waitForClusterToBeReady(); + s3Client = cluster.newS3Client(); + } + + /** + * Shutdown the MiniOzoneCluster. + */ + static void shutdownCluster() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + public static void setCluster(MiniOzoneCluster cluster) { + AbstractS3SDKV1Tests.cluster = cluster; + } + + public static MiniOzoneCluster getCluster() { + return AbstractS3SDKV1Tests.cluster; + } + + @Test + public void testCreateBucket() { + final String bucketName = getBucketName(); + + Bucket b = s3Client.createBucket(bucketName); + + assertEquals(bucketName, b.getName()); + assertTrue(s3Client.doesBucketExist(bucketName)); + assertTrue(s3Client.doesBucketExistV2(bucketName)); + assertTrue(isBucketEmpty(b)); + } + + @Test + public void testBucketACLOperations() { + // TODO: Uncomment assertions when bucket S3 ACL logic has been fixed + final String bucketName = getBucketName(); + + AccessControlList aclList = new AccessControlList(); + Owner owner = new Owner("owner", "owner"); + aclList.withOwner(owner); + Grantee grantee = new CanonicalGrantee("testGrantee"); + aclList.grantPermission(grantee, Permission.Read); + + + CreateBucketRequest createBucketRequest = new CreateBucketRequest(bucketName) + .withAccessControlList(aclList); + + s3Client.createBucket(createBucketRequest); + +// AccessControlList retrievedAclList = s3.getBucketAcl(bucketName); +// assertEquals(aclList, retrievedAclList); + +// aclList.grantPermission(grantee, Permission.Write); +// s3.setBucketAcl(bucketName, aclList); + +// retrievedAclList = s3.getBucketAcl(bucketName); +// assertEquals(aclList, retrievedAclList); + + } + + @Test + public void testListBuckets() { + List bucketNames = new ArrayList<>(); + for (int i = 0; i <= 5; i++) { + String bucketName = getBucketName(String.valueOf(i)); + s3Client.createBucket(bucketName); + bucketNames.add(bucketName); + } + + List bucketList = s3Client.listBuckets(); + List listBucketNames = bucketList.stream() + .map(Bucket::getName).collect(Collectors.toList()); + + assertThat(listBucketNames).containsAll(bucketNames); + } + + @Test + public void testDeleteBucket() { + final String bucketName = getBucketName(); + + s3Client.createBucket(bucketName); + + s3Client.deleteBucket(bucketName); + + assertFalse(s3Client.doesBucketExist(bucketName)); + assertFalse(s3Client.doesBucketExistV2(bucketName)); + } + + @Test + public void testDeleteBucketNotExist() { + final String bucketName = getBucketName(); + + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.deleteBucket(bucketName)); + + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(404, ase.getStatusCode()); + assertEquals("NoSuchBucket", ase.getErrorCode()); + } + + @Test + public void testDeleteBucketNonEmptyWithKeys() { + final String bucketName = getBucketName(); + s3Client.createBucket(bucketName); + + // Upload some objects to the bucket + for (int i = 1; i <= 10; i++) { + s3Client.putObject(bucketName, "key-" + i, RandomStringUtils.randomAlphanumeric(1024)); + } + + // Bucket deletion should fail if there are still keys in the bucket + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.deleteBucket(bucketName) + ); + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(409, ase.getStatusCode()); + assertEquals("BucketNotEmpty", ase.getErrorCode()); + + // Delete all the keys + ObjectListing objectListing = s3Client.listObjects(bucketName); + while (true) { + for (S3ObjectSummary summary : objectListing.getObjectSummaries()) { + s3Client.deleteObject(bucketName, summary.getKey()); + } + + // more object_listing to retrieve? + if (objectListing.isTruncated()) { + objectListing = s3Client.listNextBatchOfObjects(objectListing); + } else { + break; + } + } + } + + @Test + public void testDeleteBucketNonEmptyWithIncompleteMultipartUpload(@TempDir Path tempDir) throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + s3Client.createBucket(bucketName); + + File multipartUploadFile = Files.createFile(tempDir.resolve("multipartupload.txt")).toFile(); + + createFile(multipartUploadFile, (int) (5 * MB)); + + // Create an incomplete multipart upload by initiating multipart upload, + // uploading some parts, but not actually completing it. + String uploadId = initiateMultipartUpload(bucketName, keyName, null, null, null); + + uploadParts(bucketName, keyName, uploadId, multipartUploadFile, 1 * MB); + + // Bucket deletion should fail if there are still keys in the bucket + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.deleteBucket(bucketName) + ); + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(409, ase.getStatusCode()); + assertEquals("BucketNotEmpty", ase.getErrorCode()); + + // After the multipart upload is aborted, the bucket deletion should succeed + abortMultipartUpload(bucketName, keyName, uploadId); + + s3Client.deleteBucket(bucketName); + + assertFalse(s3Client.doesBucketExistV2(bucketName)); + } + + @Test + public void testPutObject() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + final String content = "bar"; + s3Client.createBucket(bucketName); + + InputStream is = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + + PutObjectResult putObjectResult = s3Client.putObject(bucketName, keyName, is, new ObjectMetadata()); + assertEquals("37b51d194a7513e45b56f6524f2d51f2", putObjectResult.getETag()); + } + + @Test + public void testPutObjectEmpty() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + final String content = ""; + s3Client.createBucket(bucketName); + + InputStream is = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + + PutObjectResult putObjectResult = s3Client.putObject(bucketName, keyName, is, new ObjectMetadata()); + assertEquals("d41d8cd98f00b204e9800998ecf8427e", putObjectResult.getETag()); + } + + @Test + public void testGetObject() throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + final String content = "bar"; + final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); + s3Client.createBucket(bucketName); + + InputStream is = new ByteArrayInputStream(contentBytes); + ObjectMetadata objectMetadata = new ObjectMetadata(); + Map userMetadata = new HashMap<>(); + userMetadata.put("key1", "value1"); + userMetadata.put("key2", "value2"); + objectMetadata.setUserMetadata(userMetadata); + + List tags = Arrays.asList(new Tag("tag1", "value1"), new Tag("tag2", "value2")); + ObjectTagging objectTagging = new ObjectTagging(tags); + + + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, keyName, is, objectMetadata) + .withTagging(objectTagging); + + s3Client.putObject(putObjectRequest); + + S3Object s3Object = s3Client.getObject(bucketName, keyName); + assertEquals(tags.size(), s3Object.getTaggingCount()); + + try (S3ObjectInputStream s3is = s3Object.getObjectContent(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(contentBytes.length)) { + byte[] readBuf = new byte[1024]; + int readLen = 0; + while ((readLen = s3is.read(readBuf)) > 0) { + bos.write(readBuf, 0, readLen); + } + assertEquals(content, bos.toString("UTF-8")); + } + } + + @Test + public void testGetObjectWithoutETag() throws Exception { + // Object uploaded using other protocols (e.g. ofs / ozone cli) will not + // have ETag. Ensure that ETag will not do ETag validation on GetObject if there + // is no ETag present. + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + + s3Client.createBucket(bucketName); + + String value = "sample value"; + byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + + OzoneConfiguration conf = cluster.getConf(); + OzoneClient ozoneClient = OzoneClientFactory.getRpcClient(conf); + ObjectStore store = ozoneClient.getObjectStore(); + + OzoneVolume volume = store.getS3Volume(); + OzoneBucket bucket = volume.getBucket(bucketName); + + try (OzoneOutputStream out = bucket.createKey(keyName, + valueBytes.length, + ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), + Collections.emptyMap())) { + out.write(valueBytes); + } + + S3Object s3Object = s3Client.getObject(bucketName, keyName); + assertNull(s3Object.getObjectMetadata().getETag()); + + try (S3ObjectInputStream s3is = s3Object.getObjectContent(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(valueBytes.length)) { + byte[] readBuf = new byte[1024]; + int readLen = 0; + while ((readLen = s3is.read(readBuf)) > 0) { + bos.write(readBuf, 0, readLen); + } + assertEquals(value, bos.toString("UTF-8")); + } + } + + @Test + public void testListObjectsMany() { + final String bucketName = getBucketName(); + s3Client.createBucket(bucketName); + final List keyNames = Arrays.asList( + getKeyName("1"), + getKeyName("2"), + getKeyName("3") + ); + + for (String keyName: keyNames) { + s3Client.putObject(bucketName, keyName, RandomStringUtils.randomAlphanumeric(5)); + } + + ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(bucketName) + .withMaxKeys(2); + ObjectListing listObjectsResponse = s3Client.listObjects(listObjectsRequest); + assertThat(listObjectsResponse.getObjectSummaries()).hasSize(2); + assertEquals(bucketName, listObjectsResponse.getBucketName()); + assertEquals(listObjectsResponse.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey).collect(Collectors.toList()), + keyNames.subList(0, 2)); + assertTrue(listObjectsResponse.isTruncated()); + + + listObjectsRequest = new ListObjectsRequest() + .withBucketName(bucketName) + .withMaxKeys(2) + .withMarker(listObjectsResponse.getNextMarker()); + listObjectsResponse = s3Client.listObjects(listObjectsRequest); + assertThat(listObjectsResponse.getObjectSummaries()).hasSize(1); + assertEquals(bucketName, listObjectsResponse.getBucketName()); + assertEquals(listObjectsResponse.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey).collect(Collectors.toList()), + keyNames.subList(2, keyNames.size())); + assertFalse(listObjectsResponse.isTruncated()); + } + + @Test + public void testListObjectsManyV2() { + final String bucketName = getBucketName(); + s3Client.createBucket(bucketName); + final List keyNames = Arrays.asList( + getKeyName("1"), + getKeyName("2"), + getKeyName("3") + ); + + for (String keyName: keyNames) { + s3Client.putObject(bucketName, keyName, RandomStringUtils.randomAlphanumeric(5)); + } + + ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request() + .withBucketName(bucketName) + .withMaxKeys(2); + ListObjectsV2Result listObjectsResponse = s3Client.listObjectsV2(listObjectsRequest); + assertThat(listObjectsResponse.getObjectSummaries()).hasSize(2); + assertEquals(bucketName, listObjectsResponse.getBucketName()); + assertEquals(listObjectsResponse.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey).collect(Collectors.toList()), + keyNames.subList(0, 2)); + assertTrue(listObjectsResponse.isTruncated()); + + + listObjectsRequest = new ListObjectsV2Request() + .withBucketName(bucketName) + .withMaxKeys(2) + .withContinuationToken(listObjectsResponse.getNextContinuationToken()); + listObjectsResponse = s3Client.listObjectsV2(listObjectsRequest); + assertThat(listObjectsResponse.getObjectSummaries()).hasSize(1); + assertEquals(bucketName, listObjectsResponse.getBucketName()); + assertEquals(listObjectsResponse.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey).collect(Collectors.toList()), + keyNames.subList(2, keyNames.size())); + assertFalse(listObjectsResponse.isTruncated()); + } + + @Test + public void testListObjectsBucketNotExist() { + final String bucketName = getBucketName(); + ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(bucketName); + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.listObjects(listObjectsRequest)); + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(404, ase.getStatusCode()); + assertEquals("NoSuchBucket", ase.getErrorCode()); + } + + @Test + public void testListObjectsV2BucketNotExist() { + final String bucketName = getBucketName(); + ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request() + .withBucketName(bucketName); + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.listObjectsV2(listObjectsRequest)); + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(404, ase.getStatusCode()); + assertEquals("NoSuchBucket", ase.getErrorCode()); + } + + @Test + public void testHighLevelMultipartUpload(@TempDir Path tempDir) throws Exception { + TransferManager tm = TransferManagerBuilder.standard() + .withS3Client(s3Client) + .build(); + + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + + s3Client.createBucket(bucketName); + + // The minimum file size to for TransferManager to initiate multipart upload is 16MB, so create a file + // larger than the threshold. + // See TransferManagerConfiguration#getMultipartUploadThreshold + int fileSize = (int) (20 * MB); + File multipartUploadFile = Files.createFile(tempDir.resolve("multipartupload.txt")).toFile(); + + createFile(multipartUploadFile, fileSize); + + // TransferManager processes all transfers asynchronously, + // so this call returns immediately. + Upload upload = tm.upload(bucketName, keyName, multipartUploadFile); + + upload.waitForCompletion(); + UploadResult uploadResult = upload.waitForUploadResult(); + assertEquals(bucketName, uploadResult.getBucketName()); + assertEquals(keyName, uploadResult.getKey()); + } + + @Test + public void testLowLevelMultipartUpload(@TempDir Path tempDir) throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + final Map userMetadata = new HashMap<>(); + userMetadata.put("key1", "value1"); + userMetadata.put("key2", "value2"); + + List tags = Arrays.asList(new Tag("tag1", "value1"), new Tag("tag2", "value2")); + + s3Client.createBucket(bucketName); + + File multipartUploadFile = Files.createFile(tempDir.resolve("multipartupload.txt")).toFile(); + + createFile(multipartUploadFile, (int) (25 * MB)); + + multipartUpload(bucketName, keyName, multipartUploadFile, 5 * MB, null, userMetadata, tags); + + S3Object s3Object = s3Client.getObject(bucketName, keyName); + assertEquals(keyName, s3Object.getKey()); + assertEquals(bucketName, s3Object.getBucketName()); + assertEquals(tags.size(), s3Object.getTaggingCount()); + + ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucketName, keyName); + assertEquals(userMetadata, objectMetadata.getUserMetadata()); + } + + @Test + public void testListMultipartUploads() { + final String bucketName = getBucketName(); + final String multipartKey1 = getKeyName("multipart1"); + final String multipartKey2 = getKeyName("multipart2"); + + s3Client.createBucket(bucketName); + + List uploadIds = new ArrayList<>(); + + String uploadId1 = initiateMultipartUpload(bucketName, multipartKey1, null, null, null); + uploadIds.add(uploadId1); + String uploadId2 = initiateMultipartUpload(bucketName, multipartKey1, null, null, null); + uploadIds.add(uploadId2); + // TODO: Currently, Ozone sorts based on uploadId instead of MPU init time within the same key. + // Remove this sorting step once HDDS-11532 has been implemented + Collections.sort(uploadIds); + String uploadId3 = initiateMultipartUpload(bucketName, multipartKey2, null, null, null); + uploadIds.add(uploadId3); + + // TODO: Add test for max uploads threshold and marker once HDDS-11530 has been implemented + ListMultipartUploadsRequest listMultipartUploadsRequest = new ListMultipartUploadsRequest(bucketName); + + MultipartUploadListing result = s3Client.listMultipartUploads(listMultipartUploadsRequest); + + List listUploadIds = result.getMultipartUploads().stream() + .map(MultipartUpload::getUploadId) + .collect(Collectors.toList()); + + assertEquals(uploadIds, listUploadIds); + } + + @Test + public void testListParts(@TempDir Path tempDir) throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + final long fileSize = 5 * MB; + final long partSize = 1 * MB; + final int maxParts = 2; + + s3Client.createBucket(bucketName); + + String uploadId = initiateMultipartUpload(bucketName, keyName, null, null, null); + + File multipartUploadFile = Files.createFile(tempDir.resolve("multipartupload.txt")).toFile(); + + createFile(multipartUploadFile, (int) fileSize); + + List partETags = uploadParts(bucketName, keyName, uploadId, multipartUploadFile, partSize); + + List listPartETags = new ArrayList<>(); + int partNumberMarker = 0; + int expectedNumOfParts = 5; + PartListing listPartsResult; + do { + ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, keyName, uploadId) + .withMaxParts(maxParts) + .withPartNumberMarker(partNumberMarker); + listPartsResult = s3Client.listParts(listPartsRequest); + if (expectedNumOfParts > maxParts) { + assertTrue(listPartsResult.isTruncated()); + partNumberMarker = listPartsResult.getNextPartNumberMarker(); + expectedNumOfParts -= maxParts; + } else { + assertFalse(listPartsResult.isTruncated()); + } + for (PartSummary partSummary : listPartsResult.getParts()) { + listPartETags.add(new PartETag(partSummary.getPartNumber(), partSummary.getETag())); + } + } while (listPartsResult.isTruncated()); + + assertEquals(partETags.size(), listPartETags.size()); + for (int i = 0; i < partETags.size(); i++) { + assertEquals(partETags.get(i).getPartNumber(), listPartETags.get(i).getPartNumber()); + assertEquals(partETags.get(i).getETag(), listPartETags.get(i).getETag()); + } + } + + @Test + public void testListPartsNotFound() { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + + s3Client.createBucket(bucketName); + + ListPartsRequest listPartsRequest = + new ListPartsRequest(bucketName, keyName, "nonexist"); + + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.listParts(listPartsRequest)); + + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(404, ase.getStatusCode()); + assertEquals("NoSuchUpload", ase.getErrorCode()); + } + + private boolean isBucketEmpty(Bucket bucket) { + ObjectListing objectListing = s3Client.listObjects(bucket.getName()); + return objectListing.getObjectSummaries().isEmpty(); + } + + private String getBucketName() { + return getBucketName(null); + } + + private String getBucketName(String suffix) { + return (getTestName() + "bucket" + suffix).toLowerCase(Locale.ROOT); + } + + private String getKeyName() { + return getKeyName(null); + } + + private String getKeyName(String suffix) { + return (getTestName() + "key" + suffix).toLowerCase(Locale.ROOT); + } + + private String multipartUpload(String bucketName, String key, File file, long partSize, String contentType, + Map userMetadata, List tags) throws Exception { + String uploadId = initiateMultipartUpload(bucketName, key, contentType, userMetadata, tags); + + List partETags = uploadParts(bucketName, key, uploadId, file, partSize); + + completeMultipartUpload(bucketName, key, uploadId, partETags); + + return uploadId; + } + + private String initiateMultipartUpload(String bucketName, String key, String contentType, + Map metadata, List tags) { + InitiateMultipartUploadRequest initRequest; + if (metadata == null || metadata.isEmpty()) { + initRequest = new InitiateMultipartUploadRequest(bucketName, key); + } else { + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setUserMetadata(metadata); + if (contentType != null) { + objectMetadata.setContentType(contentType); + } + + initRequest = new InitiateMultipartUploadRequest(bucketName, key, objectMetadata) + .withTagging(new ObjectTagging(tags)); + } + + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + assertEquals(bucketName, initResponse.getBucketName()); + assertEquals(key, initResponse.getKey()); + // TODO: Once bucket lifecycle configuration is supported, should check for "abortDate" and "abortRuleId" + + return initResponse.getUploadId(); + } + + // TODO: Also support async upload parts (similar to v2 asyncClient) + private List uploadParts(String bucketName, String key, String uploadId, File file, long partSize) + throws Exception { + // Create a list of ETag objects. You retrieve ETags for each object part + // uploaded, + // then, after each individual part has been uploaded, pass the list of ETags to + // the request to complete the upload. + List partETags = new ArrayList<>(); + + // Upload the file parts. + long filePosition = 0; + long fileLength = file.length(); + try (FileInputStream fileInputStream = new FileInputStream(file)) { + for (int i = 1; filePosition < fileLength; i++) { + // Because the last part could be less than 5 MB, adjust the part size as + // needed. + partSize = Math.min(partSize, (fileLength - filePosition)); + + // Create the request to upload a part. + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucketName) + .withKey(key) + .withUploadId(uploadId) + .withPartNumber(i) + .withFileOffset(filePosition) + .withFile(file) + .withPartSize(partSize); + + // Upload the part and add the response's ETag to our list. + UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); + PartETag partETag = uploadResult.getPartETag(); + assertEquals(i, partETag.getPartNumber()); + assertEquals(DatatypeConverter.printHexBinary( + calculateDigest(fileInputStream, 0, (int) partSize)).toLowerCase(), partETag.getETag()); + partETags.add(partETag); + + filePosition += partSize; + } + } + + return partETags; + } + + private void completeMultipartUpload(String bucketName, String key, String uploadId, List partETags) { + CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, key, + uploadId, partETags); + CompleteMultipartUploadResult compResponse = s3Client.completeMultipartUpload(compRequest); + assertEquals(bucketName, compResponse.getBucketName()); + assertEquals(key, compResponse.getKey()); + } + + private void abortMultipartUpload(String bucketName, String key, String uploadId) { + AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, key, uploadId); + s3Client.abortMultipartUpload(abortRequest); + } + + private static byte[] calculateDigest(InputStream inputStream, int skip, int length) throws Exception { + int numRead; + byte[] buffer = new byte[1024]; + + MessageDigest complete = MessageDigest.getInstance("MD5"); + if (skip > -1 && length > -1) { + inputStream = new InputSubstream(inputStream, skip, length); + } + + do { + numRead = inputStream.read(buffer); + if (numRead > 0) { + complete.update(buffer, 0, numRead); + } + } while (numRead != -1); + + return complete.digest(); + } + + private static void createFile(File newFile, int size) throws IOException { + // write random data so that filesystems with compression enabled (e.g. ZFS) + // can't compress the file + Random random = new Random(); + byte[] data = new byte[size]; + random.nextBytes(data); + + RandomAccessFile file = new RandomAccessFile(newFile, "rws"); + + file.write(data); + + file.getFD().sync(); + file.close(); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1.java new file mode 100644 index 000000000000..5e9b3633be06 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.s3.awssdk.v1; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; + +/** + * Tests the AWS S3 SDK basic operations with OM Ratis disabled. + */ +@Timeout(300) +public class TestS3SDKV1 extends AbstractS3SDKV1Tests { + + @BeforeAll + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, false); + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); + startCluster(conf); + } + + @AfterAll + public static void shutdown() throws IOException { + shutdownCluster(); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatis.java new file mode 100644 index 000000000000..cb614453f69f --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatis.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.s3.awssdk.v1; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.io.IOException; + +/** + * Tests the AWS S3 SDK basic operations with OM Ratis enabled. + */ +public class TestS3SDKV1WithRatis extends AbstractS3SDKV1Tests { + + @BeforeAll + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, + false); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, + true); + startCluster(conf); + } + + @AfterAll + public static void shutdown() throws IOException { + shutdownCluster(); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/InputSubstream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/InputSubstream.java new file mode 100644 index 000000000000..4908ecabf2ed --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/InputSubstream.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +import com.google.common.base.Preconditions; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * A filter input stream implementation that exposes a range of the underlying input stream. + */ +public class InputSubstream extends FilterInputStream { + private static final int MAX_SKIPS = 100; + private long currentPosition; + private final long requestedSkipOffset; + private final long requestedLength; + private long markedPosition = 0; + + public InputSubstream(InputStream in, long skip, long length) { + super(in); + Preconditions.checkNotNull(in); + this.currentPosition = 0; + this.requestedSkipOffset = skip; + this.requestedLength = length; + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int bytesRead = read(b, 0, 1); + + if (bytesRead == -1) { + return bytesRead; + } + return b[0]; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int count = 0; + while (currentPosition < requestedSkipOffset) { + long skippedBytes = super.skip(requestedSkipOffset - currentPosition); + if (skippedBytes == 0) { + count++; + if (count > MAX_SKIPS) { + throw new IOException( + "Unable to position the currentPosition from " + + currentPosition + " to " + + requestedSkipOffset); + } + } + currentPosition += skippedBytes; + } + + long bytesRemaining = + (requestedLength + requestedSkipOffset) - currentPosition; + if (bytesRemaining <= 0) { + return -1; + } + + len = (int) Math.min(len, bytesRemaining); + int bytesRead = super.read(b, off, len); + currentPosition += bytesRead; + + return bytesRead; + } + + @Override + public synchronized void mark(int readlimit) { + markedPosition = currentPosition; + super.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + currentPosition = markedPosition; + super.reset(); + } + + @Override + public void close() throws IOException { + // No-op operation since we don't want to close the underlying stream + // when the susbtream has been read + } + + @Override + public int available() throws IOException { + long bytesRemaining; + if (currentPosition < requestedSkipOffset) { + bytesRemaining = requestedLength; + } else { + bytesRemaining = + (requestedLength + requestedSkipOffset) - currentPosition; + } + + return (int) Math.min(bytesRemaining, super.available()); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ConfigurationProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ConfigurationProvider.java index 6312365bf4b4..9f0a9796e283 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ConfigurationProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ConfigurationProvider.java @@ -53,6 +53,8 @@ private static void addDeprecations() { @VisibleForTesting public static void setConfiguration(OzoneConfiguration conf) { + // Nullity check is used in case the configuration was already set + // in the MiniOzoneCluster if (configuration == null) { ConfigurationProvider.configuration = conf; } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/Gateway.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/Gateway.java index 86d25d194173..9816b023dc45 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/Gateway.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/Gateway.java @@ -18,7 +18,9 @@ package org.apache.hadoop.ozone.s3; import java.io.IOException; +import java.net.InetSocketAddress; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.cli.GenericCli; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -57,7 +59,6 @@ public class Gateway extends GenericCli { private S3GatewayHttpServer httpServer; private S3GatewayMetrics metrics; - private OzoneConfiguration ozoneConfiguration; private final JvmPauseMonitor jvmPauseMonitor = newJvmPauseMonitor("S3G"); @@ -71,14 +72,14 @@ public static void main(String[] args) throws Exception { @Override public Void call() throws Exception { - ozoneConfiguration = createOzoneConfiguration(); - TracingUtil.initTracing("S3gateway", ozoneConfiguration); + OzoneConfiguration ozoneConfiguration = createOzoneConfiguration(); OzoneConfigurationHolder.setConfiguration(ozoneConfiguration); - UserGroupInformation.setConfiguration(ozoneConfiguration); - loginS3GUser(ozoneConfiguration); - setHttpBaseDir(ozoneConfiguration); - httpServer = new S3GatewayHttpServer(ozoneConfiguration, "s3gateway"); - metrics = S3GatewayMetrics.create(ozoneConfiguration); + TracingUtil.initTracing("S3gateway", OzoneConfigurationHolder.configuration()); + UserGroupInformation.setConfiguration(OzoneConfigurationHolder.configuration()); + loginS3GUser(OzoneConfigurationHolder.configuration()); + setHttpBaseDir(OzoneConfigurationHolder.configuration()); + httpServer = new S3GatewayHttpServer(OzoneConfigurationHolder.configuration(), "s3gateway"); + metrics = S3GatewayMetrics.create(OzoneConfigurationHolder.configuration()); start(); ShutdownHookManager.get().addShutdownHook(() -> { @@ -95,10 +96,10 @@ public void start() throws IOException { String[] originalArgs = getCmd().getParseResult().originalArgs() .toArray(new String[0]); HddsServerUtil.startupShutdownMessage(OzoneVersionInfo.OZONE_VERSION_INFO, - Gateway.class, originalArgs, LOG, ozoneConfiguration); + Gateway.class, originalArgs, LOG, OzoneConfigurationHolder.configuration()); LOG.info("Starting Ozone S3 gateway"); - HddsServerUtil.initializeMetrics(ozoneConfiguration, "S3Gateway"); + HddsServerUtil.initializeMetrics(OzoneConfigurationHolder.configuration(), "S3Gateway"); jvmPauseMonitor.start(); httpServer.start(); } @@ -133,4 +134,14 @@ private static void loginS3GUser(OzoneConfiguration conf) } } + @VisibleForTesting + public InetSocketAddress getHttpAddress() { + return this.httpServer.getHttpAddress(); + } + + @VisibleForTesting + public InetSocketAddress getHttpsAddress() { + return this.httpServer.getHttpsAddress(); + } + } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneConfigurationHolder.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneConfigurationHolder.java index 4aeab1f3c4a6..9d6f7a822527 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneConfigurationHolder.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneConfigurationHolder.java @@ -19,6 +19,7 @@ import javax.enterprise.inject.Produces; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.conf.OzoneConfiguration; /** @@ -27,17 +28,30 @@ * As the OzoneConfiguration is created by the CLI application here we inject * it via a singleton instance to the Jax-RS/CDI instances. */ -public class OzoneConfigurationHolder { +public final class OzoneConfigurationHolder { private static OzoneConfiguration configuration; + private OzoneConfigurationHolder() { + } + @Produces - public OzoneConfiguration configuration() { + public static OzoneConfiguration configuration() { return configuration; } + @VisibleForTesting public static void setConfiguration( OzoneConfiguration conf) { - OzoneConfigurationHolder.configuration = conf; + // Nullity check is used in case the configuration was already set + // in the MiniOzoneCluster + if (configuration == null) { + OzoneConfigurationHolder.configuration = conf; + } + } + + @VisibleForTesting + public static void resetConfiguration() { + configuration = null; } } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayHttpServer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayHttpServer.java index 97117a30bbda..8b6af74e0723 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayHttpServer.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayHttpServer.java @@ -114,7 +114,7 @@ protected String getHttpAddressKey() { @Override protected String getHttpBindHostKey() { - return OZONE_S3G_HTTP_BIND_HOST_KEY; + return S3GatewayConfigKeys.OZONE_S3G_HTTP_BIND_HOST_KEY; } @Override @@ -144,12 +144,12 @@ protected int getHttpsBindPortDefault() { @Override protected String getKeytabFile() { - return OZONE_S3G_KEYTAB_FILE; + return S3GatewayConfigKeys.OZONE_S3G_KEYTAB_FILE; } @Override protected String getSpnegoPrincipal() { - return OZONE_S3G_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL; + return S3GatewayConfigKeys.OZONE_S3G_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL; } @Override