diff --git a/build.gradle b/build.gradle index fa509212fb8b..1153aa959336 100644 --- a/build.gradle +++ b/build.gradle @@ -587,6 +587,24 @@ project(':iceberg-nessie') { } } +project(':iceberg-dell') { + dependencies { + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + compileOnly 'com.emc.ecs:object-client-bundle' + + testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation("org.apache.hadoop:hadoop-common") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + testImplementation "javax.xml.bind:jaxb-api" + testImplementation "javax.activation:activation" + testImplementation "org.glassfish.jaxb:jaxb-runtime" + } +} + @Memoized boolean versionFileExists() { return file('version.txt').exists() diff --git a/dell/src/main/java/org/apache/iceberg/dell/DellClientFactories.java b/dell/src/main/java/org/apache/iceberg/dell/DellClientFactories.java new file mode 100644 index 000000000000..1048b231ed8a --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/DellClientFactories.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Config; +import com.emc.object.s3.jersey.S3JerseyClient; +import java.net.URI; +import java.util.Map; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.util.PropertyUtil; + +public class DellClientFactories { + + private DellClientFactories() { + } + + public static DellClientFactory from(Map properties) { + String factoryImpl = PropertyUtil.propertyAsString( + properties, DellProperties.CLIENT_FACTORY, DefaultDellClientFactory.class.getName()); + return loadClientFactory(factoryImpl, properties); + } + + private static DellClientFactory loadClientFactory(String impl, Map properties) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(DellClientFactory.class).hiddenImpl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize DellClientFactory, missing no-arg constructor: %s", impl), e); + } + + DellClientFactory factory; + try { + factory = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize DellClientFactory, %s does not implement DellClientFactory.", impl), e); + } + + factory.initialize(properties); + return factory; + } + + static class DefaultDellClientFactory implements DellClientFactory { + private DellProperties dellProperties; + + DefaultDellClientFactory() { + } + + @Override + public S3Client ecsS3() { + S3Config config = new S3Config(URI.create(dellProperties.ecsS3Endpoint())); + + config.withIdentity(dellProperties.ecsS3AccessKeyId()) + .withSecretKey(dellProperties.ecsS3SecretAccessKey()); + + return new S3JerseyClient(config); + } + + @Override + public void initialize(Map properties) { + this.dellProperties = new DellProperties(properties); + } + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/DellClientFactory.java b/dell/src/main/java/org/apache/iceberg/dell/DellClientFactory.java new file mode 100644 index 000000000000..4909125d7ace --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/DellClientFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell; + +import com.emc.object.s3.S3Client; +import java.util.Map; + +public interface DellClientFactory { + + /** + * Create a Dell EMC ECS S3 client + * + * @return Dell EMC ECS S3 client + */ + S3Client ecsS3(); + + /** + * Initialize Dell EMC ECS client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(Map properties); +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/DellProperties.java b/dell/src/main/java/org/apache/iceberg/dell/DellProperties.java new file mode 100644 index 000000000000..fbd3db62537e --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/DellProperties.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell; + +import java.io.Serializable; +import java.util.Map; + +public class DellProperties implements Serializable { + /** + * S3 Access key id of Dell EMC ECS + */ + public static final String ECS_S3_ACCESS_KEY_ID = "ecs.s3.access-key-id"; + + /** + * S3 Secret access key of Dell EMC ECS + */ + public static final String ECS_S3_SECRET_ACCESS_KEY = "ecs.s3.secret-access-key"; + + /** + * S3 endpoint of Dell EMC ECS + */ + public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint"; + + /** + * The implementation class of {@link DellClientFactory} to customize Dell client configurations. + * If set, all Dell clients will be initialized by the specified factory. + * If not set, {@link DellClientFactories.DefaultDellClientFactory} is used as default factory. + */ + public static final String CLIENT_FACTORY = "client.factory"; + + private String ecsS3Endpoint; + private String ecsS3AccessKeyId; + private String ecsS3SecretAccessKey; + + public DellProperties() { + } + + public DellProperties(Map properties) { + this.ecsS3AccessKeyId = properties.get(DellProperties.ECS_S3_ACCESS_KEY_ID); + this.ecsS3SecretAccessKey = properties.get(DellProperties.ECS_S3_SECRET_ACCESS_KEY); + this.ecsS3Endpoint = properties.get(DellProperties.ECS_S3_ENDPOINT); + } + + public String ecsS3Endpoint() { + return ecsS3Endpoint; + } + + public void setEcsS3Endpoint(String ecsS3Endpoint) { + this.ecsS3Endpoint = ecsS3Endpoint; + } + + public String ecsS3AccessKeyId() { + return ecsS3AccessKeyId; + } + + public void setEcsS3AccessKeyId(String ecsS3AccessKeyId) { + this.ecsS3AccessKeyId = ecsS3AccessKeyId; + } + + public String ecsS3SecretAccessKey() { + return ecsS3SecretAccessKey; + } + + public void setEcsS3SecretAccessKey(String ecsS3SecretAccessKey) { + this.ecsS3SecretAccessKey = ecsS3SecretAccessKey; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/BaseEcsFile.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/BaseEcsFile.java new file mode 100644 index 000000000000..5912b03149bf --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/BaseEcsFile.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import org.apache.iceberg.dell.DellProperties; + +abstract class BaseEcsFile { + + private final S3Client client; + private final EcsURI uri; + private final DellProperties dellProperties; + private S3ObjectMetadata metadata; + + BaseEcsFile(S3Client client, EcsURI uri, DellProperties dellProperties) { + this.client = client; + this.uri = uri; + this.dellProperties = dellProperties; + } + + public String location() { + return uri.location(); + } + + S3Client client() { + return client; + } + + EcsURI uri() { + return uri; + } + + public DellProperties dellProperties() { + return dellProperties; + } + + /** + * Note: this may be stale if file was deleted since metadata is cached for size/existence checks. + * + * @return flag + */ + public boolean exists() { + try { + getObjectMetadata(); + return true; + } catch (S3Exception e) { + if (e.getHttpCode() == 404) { + return false; + } else { + throw e; + } + } + } + + protected S3ObjectMetadata getObjectMetadata() throws S3Exception { + if (metadata == null) { + metadata = client().getObjectMetadata(uri.bucket(), uri.name()); + } + + return metadata; + } + + @Override + public String toString() { + return uri.toString(); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsAppendOutputStream.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsAppendOutputStream.java new file mode 100644 index 000000000000..86f9f5b43ba7 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsAppendOutputStream.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.request.PutObjectRequest; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import org.apache.iceberg.io.PositionOutputStream; + +/** + * Use ECS append API to write data. + */ +class EcsAppendOutputStream extends PositionOutputStream { + + private final S3Client client; + + private final EcsURI uri; + + /** + * Local bytes cache that avoid too many requests + *

+ * Use {@link ByteBuffer} to maintain offset. + */ + private final ByteBuffer localCache; + + /** + * A marker for data file to put first part instead of append first part. + */ + private boolean firstPart = true; + + /** + * Pos for {@link PositionOutputStream} + */ + private long pos; + + private EcsAppendOutputStream(S3Client client, EcsURI uri, byte[] localCache) { + this.client = client; + this.uri = uri; + this.localCache = ByteBuffer.wrap(localCache); + } + + /** + * Use built-in 1 KiB byte buffer + */ + static EcsAppendOutputStream create(S3Client client, EcsURI uri) { + return createWithBufferSize(client, uri, 1024); + } + + /** + * Create {@link PositionOutputStream} with specific buffer size. + */ + static EcsAppendOutputStream createWithBufferSize(S3Client client, EcsURI uri, int size) { + return new EcsAppendOutputStream(client, uri, new byte[size]); + } + + /** + * Write a byte. If buffer is full, upload the buffer. + */ + @Override + public void write(int b) { + if (!checkBuffer(1)) { + flush(); + } + + localCache.put((byte) b); + pos += 1; + } + + /** + * Write a byte. + * If buffer is full, upload the buffer. + * If buffer size < input bytes, upload input bytes. + */ + @Override + public void write(byte[] b, int off, int len) { + if (!checkBuffer(len)) { + flush(); + } + + if (checkBuffer(len)) { + localCache.put(b, off, len); + } else { + // if content > cache, directly flush itself. + flushBuffer(b, off, len); + } + + pos += len; + } + + private boolean checkBuffer(int nextWrite) { + return localCache.remaining() >= nextWrite; + } + + private void flushBuffer(byte[] buffer, int offset, int length) { + if (firstPart) { + client.putObject(new PutObjectRequest(uri.bucket(), uri.name(), + new ByteArrayInputStream(buffer, offset, length))); + firstPart = false; + } else { + client.appendObject(uri.bucket(), uri.name(), new ByteArrayInputStream(buffer, offset, length)); + } + } + + /** + * Pos of the file + */ + @Override + public long getPos() { + return pos; + } + + /** + * Write cached bytes if present. + */ + @Override + public void flush() { + if (localCache.remaining() < localCache.capacity()) { + localCache.flip(); + flushBuffer(localCache.array(), localCache.arrayOffset(), localCache.remaining()); + localCache.clear(); + } + } + + /** + * Trigger flush() when closing stream. + */ + @Override + public void close() { + flush(); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsFileIO.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsFileIO.java new file mode 100644 index 000000000000..632ceb96e76b --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsFileIO.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellClientFactory; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableSupplier; + +/** + * FileIO implementation backed by Dell EMC ECS. + *

+ * Locations used must follow the conventions for ECS URIs (e.g. ecs://bucket/path...). + * URIs with schemes s3, s3a, s3n, https are also treated as ECS object paths. + * Using this FileIO with other schemes will result in {@link org.apache.iceberg.exceptions.ValidationException}. + */ +public class EcsFileIO implements FileIO { + + private SerializableSupplier s3; + private DellProperties dellProperties; + private DellClientFactory dellClientFactory; + private transient S3Client client; + private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); + + @Override + public InputFile newInputFile(String path) { + return EcsInputFile.fromLocation(path, client(), dellProperties); + } + + @Override + public OutputFile newOutputFile(String path) { + return EcsOutputFile.fromLocation(path, client(), dellProperties); + } + + @Override + public void deleteFile(String path) { + EcsURI uri = new EcsURI(path); + + client().deleteObject(uri.bucket(), uri.name()); + } + + private S3Client client() { + if (client == null) { + client = s3.get(); + } + return client; + } + + @Override + public void initialize(Map properties) { + this.dellProperties = new DellProperties(properties); + this.dellClientFactory = DellClientFactories.from(properties); + this.s3 = dellClientFactory::ecsS3; + } + + @Override + public void close() { + // handles concurrent calls to close() + if (isResourceClosed.compareAndSet(false, true)) { + client.destroy(); + } + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsInputFile.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsInputFile.java new file mode 100644 index 000000000000..ee6b6d6399ef --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsInputFile.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +class EcsInputFile extends BaseEcsFile implements InputFile { + + public static EcsInputFile fromLocation(String location, S3Client client) { + return new EcsInputFile(client, new EcsURI(location), new DellProperties()); + } + + public static EcsInputFile fromLocation(String location, S3Client client, DellProperties dellProperties) { + return new EcsInputFile(client, new EcsURI(location), dellProperties); + } + + EcsInputFile(S3Client client, EcsURI uri, DellProperties dellProperties) { + super(client, uri, dellProperties); + } + + /** + * Note: this may be stale if file was deleted since metadata is cached for size/existence checks. + * + * @return content length + */ + @Override + public long getLength() { + return getObjectMetadata().getContentLength(); + } + + @Override + public SeekableInputStream newStream() { + return new EcsSeekableInputStream(client(), uri()); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsOutputFile.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsOutputFile.java new file mode 100644 index 000000000000..d125fb571e7e --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsOutputFile.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; + +class EcsOutputFile extends BaseEcsFile implements OutputFile { + public static EcsOutputFile fromLocation(String location, S3Client client) { + return new EcsOutputFile(client, new EcsURI(location), new DellProperties()); + } + + public static EcsOutputFile fromLocation(String location, S3Client client, DellProperties dellProperties) { + return new EcsOutputFile(client, new EcsURI(location), dellProperties); + } + + EcsOutputFile(S3Client client, EcsURI uri, DellProperties dellProperties) { + super(client, uri, dellProperties); + } + + /** + * Create an output stream for the specified location if the target object + * does not exist in ECS at the time of invocation. + * + * @return output stream + */ + @Override + public PositionOutputStream create() { + if (!exists()) { + return createOrOverwrite(); + } else { + throw new AlreadyExistsException("ECS object already exists: %s", uri()); + } + } + + @Override + public PositionOutputStream createOrOverwrite() { + return EcsAppendOutputStream.create(client(), uri()); + } + + @Override + public InputFile toInputFile() { + return new EcsInputFile(client(), uri(), dellProperties()); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsSeekableInputStream.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsSeekableInputStream.java new file mode 100644 index 000000000000..0528a498f371 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsSeekableInputStream.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.Range; +import com.emc.object.s3.S3Client; +import java.io.IOException; +import java.io.InputStream; +import org.apache.iceberg.io.SeekableInputStream; + +/** + * A {@link SeekableInputStream} implementation that warp {@link S3Client#readObjectStream(String, String, Range)} + *

    + *
  1. The stream is only be loaded when start reading.
  2. + *
  3. This class won't cache any bytes of content. It only maintains pos of {@link SeekableInputStream}
  4. + *
  5. This class is not thread-safe.
  6. + *
+ */ +class EcsSeekableInputStream extends SeekableInputStream { + + private final S3Client client; + private final EcsURI uri; + + /** + * Mutable pos set by {@link #seek(long)} + */ + private long newPos = 0; + /** + * Current pos of object content + */ + private long pos = -1; + private InputStream internalStream; + + EcsSeekableInputStream(S3Client client, EcsURI uri) { + this.client = client; + this.uri = uri; + } + + @Override + public long getPos() { + return newPos >= 0 ? newPos : pos; + } + + @Override + public void seek(long inputNewPos) { + if (pos == inputNewPos) { + return; + } + + newPos = inputNewPos; + } + + @Override + public int read() throws IOException { + checkAndUseNewPos(); + pos += 1; + return internalStream.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkAndUseNewPos(); + int delta = internalStream.read(b, off, len); + pos += delta; + return delta; + } + + private void checkAndUseNewPos() throws IOException { + if (newPos < 0) { + return; + } + + if (newPos == pos) { + newPos = -1; + return; + } + + if (internalStream != null) { + internalStream.close(); + } + + pos = newPos; + internalStream = client.readObjectStream(uri.bucket(), uri.name(), Range.fromOffset(pos)); + newPos = -1; + } + + @Override + public void close() throws IOException { + if (internalStream != null) { + internalStream.close(); + } + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsURI.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsURI.java new file mode 100644 index 000000000000..d501f9932035 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsURI.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import java.net.URI; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + +/** + * An immutable record class of ECS location + */ +class EcsURI { + + private static final Set VALID_SCHEME = ImmutableSet.of("ecs", "s3", "s3a", "s3n"); + + private final String location; + private final String bucket; + private final String name; + + EcsURI(String location) { + Preconditions.checkNotNull(location == null, "Location %s can not be null", location); + + this.location = location; + + URI uri = URI.create(location); + ValidationException.check( + VALID_SCHEME.contains(uri.getScheme().toLowerCase()), + "Invalid ecs location: %s", + location); + this.bucket = uri.getHost(); + this.name = uri.getPath().replaceAll("^/*", ""); + } + + EcsURI(String bucket, String name) { + this.location = String.format("ecs://%s/%s", bucket, name); + this.bucket = bucket; + this.name = name; + } + + /** + * Returns ECS bucket name. + */ + public String bucket() { + return bucket; + } + + /** + * Returns ECS object name. + */ + public String name() { + return name; + } + + /** + * Returns original location. + */ + public String location() { + return location; + } + + @Override + public String toString() { + return location; + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsAppendOutputStream.java b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsAppendOutputStream.java new file mode 100644 index 000000000000..7e775f1aa3c2 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsAppendOutputStream.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.Range; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.dell.mock.ecs.EcsS3MockRule; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +public class TestEcsAppendOutputStream { + + @ClassRule + public static EcsS3MockRule rule = EcsS3MockRule.create(); + + @Test + public void testBaseObjectWrite() throws IOException { + String objectName = rule.randomObjectName(); + try (EcsAppendOutputStream output = EcsAppendOutputStream.createWithBufferSize( + rule.client(), + new EcsURI(rule.bucket(), objectName), + 10)) { + // write 1 byte + output.write('1'); + // write 3 bytes + output.write("123".getBytes()); + // write 7 bytes, totally 11 bytes > local buffer limit (10) + output.write("1234567".getBytes()); + // write 11 bytes, flush remain 7 bytes and new 11 bytes + output.write("12345678901".getBytes()); + } + + try (InputStream input = rule.client().readObjectStream(rule.bucket(), objectName, + Range.fromOffset(0))) { + Assert.assertEquals("Must write all the object content", "1" + "123" + "1234567" + "12345678901", + new String(ByteStreams.toByteArray(input), StandardCharsets.UTF_8)); + } + } + + @Test + public void testRewrite() throws IOException { + String objectName = rule.randomObjectName(); + try (EcsAppendOutputStream output = EcsAppendOutputStream.createWithBufferSize( + rule.client(), + new EcsURI(rule.bucket(), objectName), + 10)) { + // write 7 bytes + output.write("7654321".getBytes()); + } + + try (EcsAppendOutputStream output = EcsAppendOutputStream.createWithBufferSize( + rule.client(), + new EcsURI(rule.bucket(), objectName), + 10)) { + // write 14 bytes + output.write("1234567".getBytes()); + output.write("1234567".getBytes()); + } + + try (InputStream input = rule.client().readObjectStream(rule.bucket(), objectName, + Range.fromOffset(0))) { + Assert.assertEquals("Must replace the object content", "1234567" + "1234567", + new String(ByteStreams.toByteArray(input), StandardCharsets.UTF_8)); + } + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsInputFile.java b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsInputFile.java new file mode 100644 index 000000000000..57a947d0c224 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsInputFile.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.request.PutObjectRequest; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.dell.mock.ecs.EcsS3MockRule; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +public class TestEcsInputFile { + + @ClassRule + public static EcsS3MockRule rule = EcsS3MockRule.create(); + + @Test + public void testAbsentFile() { + String objectName = rule.randomObjectName(); + EcsInputFile inputFile = EcsInputFile.fromLocation( + new EcsURI(rule.bucket(), objectName).toString(), + rule.client()); + Assert.assertFalse("File is absent", inputFile.exists()); + } + + @Test + public void testFileRead() throws IOException { + String objectName = rule.randomObjectName(); + EcsInputFile inputFile = EcsInputFile.fromLocation( + new EcsURI(rule.bucket(), objectName).toString(), + rule.client()); + + rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, "0123456789".getBytes())); + + Assert.assertTrue("File should exists", inputFile.exists()); + Assert.assertEquals("File length should be 10", 10, inputFile.getLength()); + try (InputStream inputStream = inputFile.newStream()) { + Assert.assertEquals( + "The file content should be 0123456789", + "0123456789", + new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8)); + } + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsOutputFile.java b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsOutputFile.java new file mode 100644 index 000000000000..5a1f6cd7bfa3 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsOutputFile.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.Range; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.dell.mock.ecs.EcsS3MockRule; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +public class TestEcsOutputFile { + + @ClassRule + public static EcsS3MockRule rule = EcsS3MockRule.create(); + + @Test + public void testFileWrite() throws IOException { + String objectName = rule.randomObjectName(); + EcsOutputFile outputFile = EcsOutputFile.fromLocation( + new EcsURI(rule.bucket(), objectName).toString(), + rule.client()); + + // File write + try (PositionOutputStream output = outputFile.create()) { + output.write("1234567890".getBytes()); + } + + try (InputStream input = rule.client().readObjectStream( + rule.bucket(), objectName, + Range.fromOffset(0))) { + Assert.assertEquals("File content is expected", "1234567890", + new String(ByteStreams.toByteArray(input), StandardCharsets.UTF_8)); + } + } + + @Test + public void testFileOverwrite() throws IOException { + String objectName = rule.randomObjectName(); + EcsOutputFile outputFile = EcsOutputFile.fromLocation( + new EcsURI(rule.bucket(), objectName).toString(), + rule.client()); + + try (PositionOutputStream output = outputFile.create()) { + output.write("1234567890".getBytes()); + } + + try (PositionOutputStream output = outputFile.createOrOverwrite()) { + output.write("abcdefghij".getBytes()); + } + + try (InputStream input = rule.client().readObjectStream( + rule.bucket(), objectName, + Range.fromOffset(0))) { + Assert.assertEquals("File content should be overwritten", "abcdefghij", + new String(ByteStreams.toByteArray(input), StandardCharsets.UTF_8)); + } + } + + @Test + public void testFileAlreadyExists() throws IOException { + String objectName = rule.randomObjectName(); + EcsOutputFile outputFile = EcsOutputFile.fromLocation( + new EcsURI(rule.bucket(), objectName).toString(), + rule.client()); + + try (PositionOutputStream output = outputFile.create()) { + output.write("1234567890".getBytes()); + } + + AssertHelpers.assertThrows("Create should throw exception", AlreadyExistsException.class, + outputFile.location(), + outputFile::create); + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsSeekableInputStream.java b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsSeekableInputStream.java new file mode 100644 index 000000000000..7188bf648cbe --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsSeekableInputStream.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.request.PutObjectRequest; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.dell.mock.ecs.EcsS3MockRule; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +public class TestEcsSeekableInputStream { + + @ClassRule + public static EcsS3MockRule rule = EcsS3MockRule.create(); + + @Test + public void testSeekPosRead() throws IOException { + String objectName = rule.randomObjectName(); + rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, "0123456789".getBytes())); + + try (EcsSeekableInputStream input = new EcsSeekableInputStream( + rule.client(), + new EcsURI(rule.bucket(), objectName))) { + input.seek(2); + Assert.assertEquals("Expect 2 when seek to 2", '2', input.read()); + } + } + + @Test + public void testMultipleSeekPosRead() throws IOException { + String objectName = rule.randomObjectName(); + rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, "0123456789".getBytes())); + + try (EcsSeekableInputStream input = new EcsSeekableInputStream( + rule.client(), + new EcsURI(rule.bucket(), objectName))) { + input.seek(999); + input.seek(3); + Assert.assertEquals("Expect 3 when seek to 3 finally", '3', input.read()); + } + } + + @Test + public void testReadOneByte() throws IOException { + String objectName = rule.randomObjectName(); + rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, "0123456789".getBytes())); + + try (EcsSeekableInputStream input = new EcsSeekableInputStream( + rule.client(), + new EcsURI(rule.bucket(), objectName))) { + Assert.assertEquals("The first byte should be 0 ", '0', input.read()); + } + } + + @Test + public void testReadBytes() throws IOException { + String objectName = rule.randomObjectName(); + rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, "0123456789".getBytes())); + + try (EcsSeekableInputStream input = new EcsSeekableInputStream( + rule.client(), + new EcsURI(rule.bucket(), objectName))) { + byte[] buffer = new byte[3]; + Assert.assertEquals("The first read should be 3 bytes", 3, input.read(buffer)); + Assert.assertEquals("The first 3 bytes should be 012", "012", + new String(buffer, StandardCharsets.UTF_8)); + } + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsURI.java b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsURI.java new file mode 100644 index 000000000000..31f8ca52ec83 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsURI.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.exceptions.ValidationException; +import org.junit.Assert; +import org.junit.Test; + +public class TestEcsURI { + + @Test + public void testConstructor() { + assertURI("bucket", "", new EcsURI("ecs://bucket")); + assertURI("bucket", "", new EcsURI("ecs://bucket/")); + assertURI("bucket", "", new EcsURI("ecs://bucket//")); + assertURI("bucket", "a", new EcsURI("ecs://bucket//a")); + assertURI("bucket", "a/b", new EcsURI("ecs://bucket/a/b")); + assertURI("bucket", "a//b", new EcsURI("ecs://bucket/a//b")); + assertURI("bucket", "a//b", new EcsURI("ecs://bucket//a//b")); + } + + private void assertURI(String bucket, String name, EcsURI ecsURI) { + Assert.assertEquals("bucket", bucket, ecsURI.bucket()); + Assert.assertEquals("name", name, ecsURI.name()); + } + + @Test + public void testInvalidLocation() { + AssertHelpers.assertThrows( + "Invalid location should cause exception", + ValidationException.class, + "http://bucket/a", + () -> new EcsURI("http://bucket/a")); + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/mock/MockDellClientFactory.java b/dell/src/test/java/org/apache/iceberg/dell/mock/MockDellClientFactory.java new file mode 100644 index 000000000000..8921c32dfb24 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/mock/MockDellClientFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.mock; + +import com.emc.object.s3.S3Client; +import java.util.Map; +import org.apache.iceberg.dell.DellClientFactory; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.dell.mock.ecs.MockS3Client; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class MockDellClientFactory implements DellClientFactory { + + public static final Map MOCK_ECS_CLIENT_PROPERTIES = ImmutableMap.of( + DellProperties.CLIENT_FACTORY, + MockDellClientFactory.class.getName() + ); + + @Override + public S3Client ecsS3() { + return new MockS3Client(); + } + + @Override + public void initialize(Map properties) { + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/EcsS3MockRule.java b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/EcsS3MockRule.java new file mode 100644 index 000000000000..92fb83068acd --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/EcsS3MockRule.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.mock.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.ObjectKey; +import com.emc.object.s3.request.DeleteObjectsRequest; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.dell.mock.MockDellClientFactory; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +/** + * Mock rule of ECS S3 mock. + *

+ * Use environment parameter to specify use mock client or real client. + */ +public class EcsS3MockRule implements TestRule { + + /** + * Object ID generator + */ + private static final AtomicInteger ID = new AtomicInteger(0); + + // Config fields + private final boolean autoCreateBucket; + + // Setup fields + private Map clientProperties; + private String bucket; + private boolean mock; + + // State fields during test + /** + * Lazy client for test rule. + */ + private S3Client lazyClient; + private boolean bucketCreated; + + public static EcsS3MockRule create() { + return new EcsS3MockRule(true); + } + + public static EcsS3MockRule manualCreateBucket() { + return new EcsS3MockRule(false); + } + + public EcsS3MockRule(boolean autoCreateBucket) { + this.autoCreateBucket = autoCreateBucket; + } + + @Override + public Statement apply(Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + initialize(); + try { + base.evaluate(); + } finally { + cleanUp(); + } + } + }; + } + + private void initialize() { + if (System.getenv(DellProperties.ECS_S3_ENDPOINT) == null) { + clientProperties = MockDellClientFactory.MOCK_ECS_CLIENT_PROPERTIES; + bucket = "test"; + mock = true; + } else { + Map properties = new LinkedHashMap<>(); + properties.put(DellProperties.ECS_S3_ACCESS_KEY_ID, System.getenv(DellProperties.ECS_S3_ACCESS_KEY_ID)); + properties.put(DellProperties.ECS_S3_SECRET_ACCESS_KEY, System.getenv(DellProperties.ECS_S3_SECRET_ACCESS_KEY)); + properties.put(DellProperties.ECS_S3_ENDPOINT, System.getenv(DellProperties.ECS_S3_ENDPOINT)); + clientProperties = properties; + bucket = "test-" + UUID.randomUUID(); + if (autoCreateBucket) { + createBucket(); + } + + mock = false; + } + } + + private void cleanUp() { + if (mock) { + S3Client client = this.lazyClient; + if (client != null) { + client.destroy(); + } + } else { + S3Client client = client(); + if (bucketCreated) { + deleteBucket(); + } + + client.destroy(); + } + } + + public void createBucket() { + // create test bucket for this unit test + client().createBucket(bucket); + bucketCreated = true; + } + + private void deleteBucket() { + S3Client client = client(); + if (!client.bucketExists(bucket)) { + return; + } + + // clean up test bucket of this unit test + while (true) { + ListObjectsResult result = client.listObjects(bucket); + if (result.getObjects().isEmpty()) { + break; + } + + List keys = result.getObjects() + .stream() + .map(it -> new ObjectKey(it.getKey())) + .collect(Collectors.toList()); + client.deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keys)); + } + + client.deleteBucket(bucket); + } + + public Map clientProperties() { + return clientProperties; + } + + public S3Client client() { + if (lazyClient == null) { + lazyClient = DellClientFactories.from(clientProperties).ecsS3(); + } + + return lazyClient; + } + + public String bucket() { + return bucket; + } + + public String randomObjectName() { + return "test-" + ID.getAndIncrement() + "-" + UUID.randomUUID(); + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/MockS3Client.java b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/MockS3Client.java new file mode 100644 index 000000000000..52a19135d27d --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/MockS3Client.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.mock.ecs; + +import com.emc.object.Protocol; +import com.emc.object.Range; +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import com.emc.object.s3.bean.AccessControlList; +import com.emc.object.s3.bean.BucketInfo; +import com.emc.object.s3.bean.BucketPolicy; +import com.emc.object.s3.bean.CannedAcl; +import com.emc.object.s3.bean.CompleteMultipartUploadResult; +import com.emc.object.s3.bean.CopyObjectResult; +import com.emc.object.s3.bean.CopyPartResult; +import com.emc.object.s3.bean.CorsConfiguration; +import com.emc.object.s3.bean.DeleteObjectsResult; +import com.emc.object.s3.bean.GetObjectResult; +import com.emc.object.s3.bean.InitiateMultipartUploadResult; +import com.emc.object.s3.bean.LifecycleConfiguration; +import com.emc.object.s3.bean.ListBucketsResult; +import com.emc.object.s3.bean.ListDataNode; +import com.emc.object.s3.bean.ListMultipartUploadsResult; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.ListPartsResult; +import com.emc.object.s3.bean.ListVersionsResult; +import com.emc.object.s3.bean.LocationConstraint; +import com.emc.object.s3.bean.MetadataSearchList; +import com.emc.object.s3.bean.MultipartPartETag; +import com.emc.object.s3.bean.ObjectLockConfiguration; +import com.emc.object.s3.bean.ObjectLockLegalHold; +import com.emc.object.s3.bean.ObjectLockRetention; +import com.emc.object.s3.bean.PingResponse; +import com.emc.object.s3.bean.PutObjectResult; +import com.emc.object.s3.bean.QueryObjectsResult; +import com.emc.object.s3.bean.VersioningConfiguration; +import com.emc.object.s3.request.AbortMultipartUploadRequest; +import com.emc.object.s3.request.CompleteMultipartUploadRequest; +import com.emc.object.s3.request.CopyObjectRequest; +import com.emc.object.s3.request.CopyPartRequest; +import com.emc.object.s3.request.CreateBucketRequest; +import com.emc.object.s3.request.DeleteObjectRequest; +import com.emc.object.s3.request.DeleteObjectsRequest; +import com.emc.object.s3.request.GetObjectAclRequest; +import com.emc.object.s3.request.GetObjectLegalHoldRequest; +import com.emc.object.s3.request.GetObjectMetadataRequest; +import com.emc.object.s3.request.GetObjectRequest; +import com.emc.object.s3.request.GetObjectRetentionRequest; +import com.emc.object.s3.request.InitiateMultipartUploadRequest; +import com.emc.object.s3.request.ListBucketsRequest; +import com.emc.object.s3.request.ListMultipartUploadsRequest; +import com.emc.object.s3.request.ListObjectsRequest; +import com.emc.object.s3.request.ListPartsRequest; +import com.emc.object.s3.request.ListVersionsRequest; +import com.emc.object.s3.request.PresignedUrlRequest; +import com.emc.object.s3.request.PutObjectRequest; +import com.emc.object.s3.request.QueryObjectsRequest; +import com.emc.object.s3.request.SetBucketAclRequest; +import com.emc.object.s3.request.SetObjectAclRequest; +import com.emc.object.s3.request.SetObjectLegalHoldRequest; +import com.emc.object.s3.request.SetObjectRetentionRequest; +import com.emc.object.s3.request.UploadPartRequest; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URL; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.IOUtils; + +/** + * Memorized s3 client used in tests. + */ +public class MockS3Client implements S3Client { + + /** + * The object data of this client. + *

+ * Current {@link S3ObjectMetadata} only store the user metadata. + */ + private final Map objectData = new ConcurrentHashMap<>(); + + private String createId(String bucket, String key) { + return bucket + "/" + key; + } + + @Override + public PutObjectResult putObject(PutObjectRequest request) { + S3ObjectMetadata metadata = request.getObjectMetadata(); + objectData.put( + createId(request.getBucketName(), request.getKey()), + ObjectData.create(convertContent(request.getEntity()), metadata)); + return new PutObjectResult(); + } + + @Override + public long appendObject(String bucketName, String key, Object content) { + String id = createId(bucketName, key); + ObjectData old = objectData.get(id); + if (old == null) { + throw new S3Exception("", 404, "NoSuchKey", ""); + } + + byte[] appendedData = convertContent(content); + if (objectData.replace(id, old, old.appendContent(appendedData))) { + return old.length(); + } else { + return wontImplement(); + } + } + + private byte[] convertContent(Object entity) { + if (entity instanceof InputStream) { + try (InputStream inputStream = (InputStream) entity) { + return IOUtils.toByteArray(inputStream); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else if (entity instanceof byte[]) { + return ((byte[]) entity).clone(); + } + + throw new IllegalArgumentException(String.format("Invalid object entity type %s", entity.getClass())); + } + + @Override + public InputStream readObjectStream(String bucketName, String key, Range range) { + ObjectData data = objectData.get(createId(bucketName, key)); + if (data == null) { + throw new S3Exception("", 404, "NoSuchKey", ""); + } + + return data.createInputStream(range); + } + + @Override + public S3ObjectMetadata getObjectMetadata(String bucketName, String key) { + ObjectData data = objectData.get(createId(bucketName, key)); + if (data == null) { + throw new S3Exception("", 404, "NoSuchKey", ""); + } + + return data.createFullMetadata(); + } + + @Override + public void deleteObject(String bucketName, String key) { + objectData.remove(createId(bucketName, key)); + } + + @Override + public void destroy() { + } + + @Override + @Deprecated + public void shutdown() { + destroy(); + } + + // Following methods won't be implemented. They aren't used in this module. + @Override + public ListDataNode listDataNodes() { + return wontImplement(); + } + + @Override + public PingResponse pingNode(String host) { + return wontImplement(); + } + + @Override + public PingResponse pingNode(Protocol protocol, String host, int port) { + return wontImplement(); + } + + @Override + public ListBucketsResult listBuckets() { + return wontImplement(); + } + + @Override + public ListBucketsResult listBuckets(ListBucketsRequest request) { + return wontImplement(); + } + + @Override + public boolean bucketExists(String bucketName) { + return wontImplement(); + } + + @Override + public void createBucket(String bucketName) { + wontImplement(); + } + + @Override + public void createBucket(CreateBucketRequest request) { + wontImplement(); + } + + @Override + public BucketInfo getBucketInfo(String bucketName) { + return wontImplement(); + } + + @Override + public void deleteBucket(String bucketName) { + wontImplement(); + } + + @Override + public void setBucketAcl(String bucketName, AccessControlList acl) { + wontImplement(); + } + + @Override + public void setBucketAcl(String bucketName, CannedAcl cannedAcl) { + wontImplement(); + } + + @Override + public void setBucketAcl(SetBucketAclRequest request) { + wontImplement(); + } + + @Override + public AccessControlList getBucketAcl(String bucketName) { + return wontImplement(); + } + + @Override + public void setBucketCors(String bucketName, CorsConfiguration corsConfiguration) { + wontImplement(); + } + + @Override + public CorsConfiguration getBucketCors(String bucketName) { + return wontImplement(); + } + + @Override + public void deleteBucketCors(String bucketName) { + wontImplement(); + } + + @Override + public void setBucketLifecycle(String bucketName, LifecycleConfiguration lifecycleConfiguration) { + wontImplement(); + } + + @Override + public LifecycleConfiguration getBucketLifecycle(String bucketName) { + return wontImplement(); + } + + @Override + public void deleteBucketLifecycle(String bucketName) { + wontImplement(); + } + + @Override + public void setBucketPolicy(String bucketName, BucketPolicy policy) { + wontImplement(); + } + + @Override + public BucketPolicy getBucketPolicy(String bucketName) { + return wontImplement(); + } + + @Override + public LocationConstraint getBucketLocation(String bucketName) { + return wontImplement(); + } + + @Override + public void setBucketVersioning( + String bucketName, VersioningConfiguration versioningConfiguration) { + wontImplement(); + } + + @Override + public VersioningConfiguration getBucketVersioning(String bucketName) { + return wontImplement(); + } + + @Override + public void setBucketStaleReadAllowed(String bucketName, boolean staleReadsAllowed) { + wontImplement(); + } + + @Override + public MetadataSearchList listSystemMetadataSearchKeys() { + return wontImplement(); + } + + @Override + public MetadataSearchList listBucketMetadataSearchKeys(String bucketName) { + return wontImplement(); + } + + @Override + public QueryObjectsResult queryObjects(QueryObjectsRequest request) { + return wontImplement(); + } + + @Override + public QueryObjectsResult queryMoreObjects(QueryObjectsResult lastResult) { + return wontImplement(); + } + + @Override + public ListObjectsResult listObjects(String bucketName) { + return wontImplement(); + } + + @Override + public ListObjectsResult listObjects(String bucketName, String prefix) { + return wontImplement(); + } + + @Override + public ListObjectsResult listObjects(ListObjectsRequest request) { + return wontImplement(); + } + + @Override + public ListObjectsResult listMoreObjects(ListObjectsResult lastResult) { + return wontImplement(); + } + + @Override + public ListVersionsResult listVersions(String bucketName, String prefix) { + return wontImplement(); + } + + @Override + public ListVersionsResult listVersions(ListVersionsRequest request) { + return wontImplement(); + } + + @Override + public ListVersionsResult listMoreVersions(ListVersionsResult lastResult) { + return wontImplement(); + } + + @Override + public void putObject(String bucketName, String key, Object content, String contentType) { + wontImplement(); + } + + @Override + public void putObject(String bucketName, String key, Range range, Object content) { + wontImplement(); + } + + @Override + public CopyObjectResult copyObject( + String sourceBucketName, String sourceKey, String bucketName, String key) { + return wontImplement(); + } + + @Override + public CopyObjectResult copyObject(CopyObjectRequest request) { + return wontImplement(); + } + + @Override + public T readObject(String bucketName, String key, Class objectType) { + return wontImplement(); + } + + @Override + public T readObject(String bucketName, String key, String versionId, Class objectType) { + return wontImplement(); + } + + @Override + public GetObjectResult getObject(String bucketName, String key) { + return wontImplement(); + } + + @Override + public GetObjectResult getObject( + GetObjectRequest request, Class objectType) { + return wontImplement(); + } + + @Override + public URL getPresignedUrl(String bucketName, String key, Date expirationTime) { + return wontImplement(); + } + + @Override + public URL getPresignedUrl(PresignedUrlRequest request) { + return wontImplement(); + } + + @Override + public void deleteObject(DeleteObjectRequest request) { + wontImplement(); + } + + @Override + public void deleteVersion(String bucketName, String key, String versionId) { + wontImplement(); + } + + @Override + public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) { + return wontImplement(); + } + + @Override + public void setObjectMetadata(String bucketName, String key, S3ObjectMetadata objectMetadata) { + wontImplement(); + } + + @Override + public S3ObjectMetadata getObjectMetadata(GetObjectMetadataRequest request) { + return wontImplement(); + } + + @Override + public void setObjectAcl(String bucketName, String key, AccessControlList acl) { + wontImplement(); + } + + @Override + public void setObjectAcl(String bucketName, String key, CannedAcl cannedAcl) { + wontImplement(); + } + + @Override + public void setObjectAcl(SetObjectAclRequest request) { + wontImplement(); + } + + @Override + public AccessControlList getObjectAcl(String bucketName, String key) { + return wontImplement(); + } + + @Override + public AccessControlList getObjectAcl(GetObjectAclRequest request) { + return wontImplement(); + } + + @Override + public void extendRetentionPeriod(String bucketName, String key, Long period) { + wontImplement(); + } + + @Override + public ListMultipartUploadsResult listMultipartUploads(String bucketName) { + return wontImplement(); + } + + @Override + public ListMultipartUploadsResult listMultipartUploads(ListMultipartUploadsRequest request) { + return wontImplement(); + } + + @Override + public String initiateMultipartUpload(String bucketName, String key) { + return wontImplement(); + } + + @Override + public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request) { + return wontImplement(); + } + + @Override + public ListPartsResult listParts(String bucketName, String key, String uploadId) { + return wontImplement(); + } + + @Override + public ListPartsResult listParts(ListPartsRequest request) { + return wontImplement(); + } + + @Override + public MultipartPartETag uploadPart(UploadPartRequest request) { + return wontImplement(); + } + + @Override + public CopyPartResult copyPart(CopyPartRequest request) { + return wontImplement(); + } + + @Override + public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) { + return wontImplement(); + } + + @Override + public void abortMultipartUpload(AbortMultipartUploadRequest request) { + wontImplement(); + } + + @Override + public void setObjectLockConfiguration( + String bucketName, ObjectLockConfiguration objectLockConfiguration) { + wontImplement(); + } + + @Override + public ObjectLockConfiguration getObjectLockConfiguration(String bucketName) { + return wontImplement(); + } + + @Override + public void enableObjectLock(String bucketName) { + wontImplement(); + } + + @Override + public void setObjectLegalHold(SetObjectLegalHoldRequest request) { + wontImplement(); + } + + @Override + public ObjectLockLegalHold getObjectLegalHold(GetObjectLegalHoldRequest request) { + return wontImplement(); + } + + @Override + public void setObjectRetention(SetObjectRetentionRequest request) { + wontImplement(); + } + + @Override + public ObjectLockRetention getObjectRetention(GetObjectRetentionRequest request) { + return wontImplement(); + } + + private T wontImplement() { + throw new UnsupportedOperationException(); + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/ObjectData.java b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/ObjectData.java new file mode 100644 index 000000000000..b07132dd8963 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/ObjectData.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.mock.ecs; + +import com.emc.object.Range; +import com.emc.object.s3.S3ObjectMetadata; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.commons.codec.digest.DigestUtils; + +/** + * Object data in memory. + */ +public class ObjectData { + public final byte[] content; + public final Map userMetadata; + + public static ObjectData create(byte[] content, S3ObjectMetadata metadata) { + Map userMetadata = new LinkedHashMap<>(); + if (metadata != null) { + userMetadata.putAll(metadata.getUserMetadata()); + } + + return new ObjectData(content, userMetadata); + } + + private ObjectData(byte[] content, Map userMetadata) { + this.content = content; + this.userMetadata = userMetadata; + } + + public int length() { + return content.length; + } + + public ObjectData appendContent(byte[] appendedData) { + byte[] newContent = Arrays.copyOf(content, content.length + appendedData.length); + System.arraycopy(appendedData, 0, newContent, content.length, appendedData.length); + return new ObjectData(newContent, userMetadata); + } + + public InputStream createInputStream(Range range) { + int offset = range.getFirst().intValue(); + int length; + if (range.getLast() != null) { + length = range.getLast().intValue() - offset; + } else { + length = content.length - offset; + } + + return new ByteArrayInputStream(content, offset, length); + } + + public S3ObjectMetadata createFullMetadata() { + S3ObjectMetadata metadata = new S3ObjectMetadata(); + metadata.setETag(DigestUtils.md5Hex(content)); + metadata.setContentLength((long) content.length); + metadata.setUserMetadata(userMetadata); + return metadata; + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/TestExceptionCode.java b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/TestExceptionCode.java new file mode 100644 index 000000000000..76f41e71d8d7 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/mock/ecs/TestExceptionCode.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.mock.ecs; + +import com.emc.object.Range; +import com.emc.object.s3.S3Exception; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verify the error codes between real client and mock client. + */ +public class TestExceptionCode { + + @Rule + public EcsS3MockRule rule = EcsS3MockRule.create(); + + @Test + public void testExceptionCode() { + String object = "test"; + assertS3Exception("Append absent object", 404, "NoSuchKey", + () -> rule.client().appendObject(rule.bucket(), object, "abc".getBytes())); + assertS3Exception("Get object", 404, "NoSuchKey", + () -> rule.client().readObjectStream(rule.bucket(), object, Range.fromOffset(0))); + } + + public void assertS3Exception(String message, int httpCode, String errorCode, Runnable task) { + try { + task.run(); + Assert.fail("Expect s3 exception for " + message); + } catch (S3Exception e) { + Assert.assertEquals(message + ", http code", httpCode, e.getHttpCode()); + Assert.assertEquals(message + ", error code", errorCode, e.getErrorCode()); + } + } +} diff --git a/settings.gradle b/settings.gradle index 09d38c66892b..97f029711a30 100644 --- a/settings.gradle +++ b/settings.gradle @@ -33,6 +33,7 @@ include 'pig' include 'hive-metastore' include 'nessie' include 'gcp' +include 'dell' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -49,6 +50,7 @@ project(':pig').name = 'iceberg-pig' project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' +project(':dell').name = 'iceberg-dell' List knownFlinkVersions = System.getProperty("knownFlinkVersions").split(",") String flinkVersionsString = System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions") diff --git a/versions.props b/versions.props index 436a6e00722d..6ecabdc34846 100644 --- a/versions.props +++ b/versions.props @@ -27,6 +27,7 @@ org.projectnessie:* = 0.18.0 com.google.cloud:libraries-bom = 24.1.0 org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0 org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0 +com.emc.ecs:object-client-bundle = 3.3.2 # test deps org.junit.vintage:junit-vintage-engine = 5.7.2