diff --git a/build.gradle b/build.gradle index 446d6eac19c9..2d7ec3be0eb1 100644 --- a/build.gradle +++ b/build.gradle @@ -1286,6 +1286,14 @@ project(':iceberg-nessie') { } } +project(':iceberg-dell') { + + dependencies { + compile project(':iceberg-core') + compileOnly 'com.amazonaws:aws-java-sdk-s3' + } +} + @Memoized boolean isVersionFileExists() { return file('version.txt').exists() diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsClient.java b/dell/src/main/java/org/apache/iceberg/dell/EcsClient.java new file mode 100644 index 000000000000..8963d02f13bb --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsClient.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +/** + * ECS client and its util-methods. + */ +public interface EcsClient extends AutoCloseable { + + /** + * Get an {@link ObjectKeys} instance to convert between {@link ObjectKey} and {@link String}. + */ + ObjectKeys objectKeys(); + + /** + * Get a {@link PropertiesSerDes} instance to convert between {@link Map} and object content. + */ + default PropertiesSerDes propertiesSerDes() { + return PropertiesSerDes.useJdk(); + } + + /** + * Get the object info of specific key. If object is absent, return {@link Optional#empty()} + */ + Optional head(ObjectKey key); + + /** + * Get the {@link InputStream} of specific key and position. If object is absent, an exception will be thrown. + */ + InputStream inputStream(ObjectKey key, long pos); + + /** + * Get the {@link OutputStream} of specific key. If object is present, the behaviour is undefined. + */ + OutputStream outputStream(ObjectKey key); + + /** + * A tuple interface for {@link #readAll(ObjectKey)} + */ + interface ContentAndHeadInfo { + ObjectHeadInfo getHeadInfo(); + + byte[] getContent(); + } + + /** + * Get the object content and the head info of object. If object is absent, an exception will be thrown. + */ + ContentAndHeadInfo readAll(ObjectKey key); + + /** + * A CAS operation to replace an object with the previous E-Tag. + *

+ * We assume that E-Tag can distinct content of the objects. + *

+ * If E-Tag is not matched, the method will return false, and the object won't be changed. + */ + boolean replace(ObjectKey key, String eTag, byte[] bytes, Map userMetadata); + + /** + * A CAS operation to create an object. + *

+ * If the specific object is existed, the method will return false, and the existed object won't be changed. + */ + boolean writeIfAbsent(ObjectKey key, byte[] bytes, Map userMetadata); + + /** + * A CAS operation to copy an object. + *

+ * We assume that E-Tag can distinct content of the objects. + *

+ * If the destination object is existed, or the original object is not matched with E-Tag, the method will return + * false, the both objects won't be changed. + */ + boolean copyObjectIfAbsent(ObjectKey fromKey, String eTag, ObjectKey toKey); + + /** + * Delete object + */ + void deleteObject(ObjectKey key); + + /** + * List all objects with delimiter. + *

+ * For example: there are objects like: + *

+ * If prefix is namespace1 and delimiter is /, then return value will be + * + *

+ * The function can filter and convert to the object that user want to use + *

+ * note: the common prefixes, such as namespace1/namespace2/, won't return by this method. + * + * @param prefix prefix key + * @param filterAndMapper map object key to specify item + * @param search item type + * @return all items with given prefix + */ + List listDelimiterAll(ObjectKey prefix, Function> filterAndMapper); +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectBaseKey.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectBaseKey.java new file mode 100644 index 000000000000..20104df7ed8a --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectBaseKey.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Objects; + +/** + * An record class of object base key which allow not null bucket and key. + */ +public class ObjectBaseKey { + + private final String bucket; + public final String key; + + public ObjectBaseKey(String bucket, String key) { + this.bucket = bucket; + this.key = key; + } + + public ObjectKey asKey() { + if (bucket == null) { + throw new IllegalArgumentException(String.format( + "fail to cast base key %s as object key, bucket is unknown", + this)); + } + return new ObjectKey(bucket, key == null ? "" : key); + } + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + @Override + public String toString() { + return "ObjectBaseKey{" + + "bucket='" + bucket + '\'' + + ", key='" + key + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ObjectBaseKey that = (ObjectBaseKey) o; + return Objects.equals(bucket, that.bucket) && Objects.equals(key, that.key); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, key); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectHeadInfo.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectHeadInfo.java new file mode 100644 index 000000000000..f4c79e99cc70 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectHeadInfo.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Map; + +/** + * The object head info which can be fetched without the whole object content data. + */ +public interface ObjectHeadInfo { + + long getContentLength(); + + /** + * E-Tag is a hash string of object content. + */ + String getETag(); + + Map getUserMetadata(); +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectKey.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectKey.java new file mode 100644 index 000000000000..2837efa7fc96 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectKey.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Objects; + +/** + * An immutable record class of object key. + */ +public class ObjectKey { + + private final String bucket; + private final String key; + + public ObjectKey(String bucket, String key) { + if (bucket == null || key == null) { + throw new IllegalArgumentException(String.format("bucket %s and key %s must be not null", bucket, key)); + } + this.bucket = bucket; + this.key = key; + } + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + /** + * The method is for debug. If you want to use string format, please use {@link ObjectKeys#toString(ObjectKey)} and + * {@link ObjectKeys#parse(String)} + */ + @Override + public String toString() { + return "ObjectKey{" + + "bucket='" + bucket + '\'' + + ", key='" + key + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ObjectKey objectKey = (ObjectKey) o; + return Objects.equals(bucket, objectKey.bucket) && Objects.equals(key, objectKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, key); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectKeys.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectKeys.java new file mode 100644 index 000000000000..c5b30baa931e --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectKeys.java @@ -0,0 +1,281 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; + +/** + * The operations of {@link ObjectKey} + */ +public interface ObjectKeys { + + /** + * The default and generally be-used delimiter. + */ + String DELIMITER = "/"; + /** + * The default suffix of table metadata object + */ + String TABLE_METADATA_SUFFIX = ".table"; + /** + * The default suffix of namespace metadata object + */ + String NAMESPACE_METADATA_SUFFIX = ".namespace"; + + /** + * The base key of catalog. + */ + ObjectBaseKey baseKey(); + + /** + * The base key parts for calculate sub key. + * + * @return the parts of the base key + * @see #subParts(ObjectKey) + */ + default List baseKeyParts() { + ObjectBaseKey baseKey = baseKey(); + if (baseKey.getBucket() == null) { + return Collections.emptyList(); + } else if (baseKey.getKey() == null) { + return Collections.singletonList(baseKey.getBucket()); + } else { + List parts = new ArrayList<>(); + parts.add(baseKey.getBucket()); + for (String result : Splitter.on(getDelimiter()).split(baseKey.getKey())) { + parts.add(result); + } + if (!checkParts(parts)) { + throw new IllegalArgumentException(String.format("invalid base key %s with delimiter %s", + baseKey, getDelimiter())); + } + return Collections.unmodifiableList(parts); + } + } + + /** + * Get current delimiter. + *

+ * Now, we only support "/". + */ + default String getDelimiter() { + return DELIMITER; + } + + /** + * Get current namespace object suffix. + *

+ * Now, we only support ".namespace". + */ + default String getNamespaceMetadataSuffix() { + return NAMESPACE_METADATA_SUFFIX; + } + + /** + * Get current table object suffix. + *

+ * Now, we only support ".table". + */ + default String getTableMetadataSuffix() { + return TABLE_METADATA_SUFFIX; + } + + /** + * Convert relative parts to object key + * + * @param parts that relative to base key + * @return object key + */ + default ObjectKey createObjectKey(List parts) { + ObjectBaseKey baseKey = baseKey(); + if (parts.isEmpty()) { + return baseKey.asKey(); + } + String delimiter = getDelimiter(); + if (parts.stream().anyMatch(it -> it.contains(delimiter))) { + throw new IllegalArgumentException(String.format("delimiter %s in key parts: %s", delimiter, parts)); + } + if (baseKey.getBucket() == null) { + return new ObjectKey(parts.get(0), String.join(delimiter, parts.subList(1, parts.size()))); + } else { + String prefix = baseKey.getKey() == null ? "" : (baseKey.getKey() + delimiter); + return new ObjectKey(baseKey.getBucket(), prefix + String.join(delimiter, parts)); + } + } + + /** + * Create an {@link ObjectKey} for namespace object + */ + default ObjectKey createMetadataKey(Namespace namespace) { + if (namespace.isEmpty()) { + return createObjectKey(Collections.singletonList(getNamespaceMetadataSuffix())); + } + // copy namespace levels + List keyParts = new ArrayList<>(Arrays.asList(namespace.levels())); + int lastIndex = keyParts.size() - 1; + keyParts.set(lastIndex, keyParts.get(lastIndex) + getNamespaceMetadataSuffix()); + return createObjectKey(keyParts); + } + + /** + * Create a prefix {@link ObjectKey} to list tables or namespaces in the specific namespace. + *

+ * The prefix key lack namespace metadata suffix. + */ + default ObjectKey createPrefixKey(Namespace namespace) { + return createObjectKey(Arrays.asList(namespace.levels())); + } + + /** + * Try to extract {@link Namespace} from specific key. If the key is not a namespace object key pattern, or parent + * namespace is not matched, the method will return {@link Optional#empty()} + */ + default Optional extractNamespace(ObjectKey key, Namespace parent) { + if (!key.getKey().endsWith(getNamespaceMetadataSuffix())) { + return Optional.empty(); + } + Optional lastPartOpt = extractLastPart(key, parent); + if (!lastPartOpt.isPresent()) { + return Optional.empty(); + } + String lastPart = lastPartOpt.get(); + String namespaceName = lastPart.substring(0, lastPart.length() - getNamespaceMetadataSuffix().length()); + String[] levels = Arrays.copyOf(parent.levels(), parent.levels().length + 1); + levels[levels.length - 1] = namespaceName; + return Optional.of(Namespace.of(levels)); + } + + /** + * Create an {@link ObjectKey} for table object. + */ + default ObjectKey createMetadataKey(TableIdentifier tableIdentifier) { + if (tableIdentifier.hasNamespace()) { + List parts = new ArrayList<>(tableIdentifier.namespace().levels().length + 1); + parts.addAll(Arrays.asList(tableIdentifier.namespace().levels())); + parts.add(tableIdentifier.name() + getTableMetadataSuffix()); + return createObjectKey(parts); + } else { + return createObjectKey(Collections.singletonList(tableIdentifier.name() + getTableMetadataSuffix())); + } + } + + /** + * Try to extract {@link TableIdentifier} from specific key. If the key is not a table object key pattern, or + * namespace is not matched, the method will return {@link Optional#empty()} + */ + default Optional extractTableIdentifier(ObjectKey key, Namespace namespace) { + if (!key.getKey().endsWith(getTableMetadataSuffix())) { + return Optional.empty(); + } + Optional lastPartOpt = extractLastPart(key, namespace); + if (!lastPartOpt.isPresent()) { + return Optional.empty(); + } + String lastPart = lastPartOpt.get(); + String tableName = lastPart.substring(0, lastPart.length() - getTableMetadataSuffix().length()); + return Optional.of(TableIdentifier.of(namespace, tableName)); + } + + /** + * Try to extract last part from specific key. If the parent key is not match the input namespace, the method + * will return {@link Optional#empty()} + */ + default Optional extractLastPart(ObjectKey key, Namespace expectNamespace) { + Optional> partsOpt = subParts(key); + if (!partsOpt.isPresent()) { + return Optional.empty(); + } + List parts = partsOpt.get(); + if (parts.isEmpty()) { + return Optional.empty(); + } + int lastIndex = parts.size() - 1; + Namespace namespace = Namespace.of(parts.subList(0, lastIndex).toArray(new String[] {})); + if (expectNamespace != null && !Objects.equals(expectNamespace, namespace)) { + throw new IllegalArgumentException(String.format("namespace not match: %s != %s", namespace, expectNamespace)); + } + return Optional.of(parts.get(lastIndex)); + } + + /** + * Get the relative parts of {@link #baseKey()}. The object key will be spilt by {@link #getDelimiter()} + */ + default Optional> subParts(ObjectKey key) { + List parts = new ArrayList<>(); + parts.add(key.getBucket()); + for (String result : Splitter.on(getDelimiter()).split(key.getKey())) { + parts.add(result); + } + if (!checkParts(parts)) { + return Optional.empty(); + } + List baseParts = baseKeyParts(); + if (parts.size() < baseParts.size() || !Objects.equals(parts.subList(0, baseParts.size()), baseParts)) { + return Optional.empty(); + } else { + return Optional.of(parts.subList(baseParts.size(), parts.size())); + } + } + + /** + * Check all parts are valid in Iceberg. + */ + default boolean checkParts(List parts) { + return parts.stream().noneMatch(String::isEmpty); + } + + /** + * Get default warehouse location of table id + * + * @param tableIdentifier is table id + * @return default warehouse location prefix key + */ + default ObjectKey warehouseLocation(TableIdentifier tableIdentifier) { + if (!tableIdentifier.hasNamespace()) { + return createObjectKey(Collections.singletonList(tableIdentifier.name())); + } else { + List parts = new ArrayList<>(tableIdentifier.namespace().levels().length + 1); + parts.addAll(Arrays.asList(tableIdentifier.namespace().levels())); + parts.add(tableIdentifier.name()); + return createObjectKey(parts); + } + } + + /** + * Convert {@link ObjectKey} to string + */ + default String toString(ObjectKey key) { + return key.getBucket() + getDelimiter() + key.getKey(); + } + + /** + * Convert string to {@link ObjectKey} + */ + default ObjectKey parse(String key) { + String[] r = key.split(getDelimiter(), 2); + if (r.length < 2) { + throw new IllegalArgumentException("failed to parse key " + key); + } + return new ObjectKey(r[0], r[1]); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/PropertiesSerDes.java b/dell/src/main/java/org/apache/iceberg/dell/PropertiesSerDes.java new file mode 100644 index 000000000000..6f6d5214be35 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/PropertiesSerDes.java @@ -0,0 +1,121 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * convert Map properties to bytes. + */ +public interface PropertiesSerDes { + + /** + * The current version of properties files. + *

+ * This property set in the object metadata. And now it is only a placeholder and may distinct in the future. + */ + String CURRENT_VERSION = "0"; + + /** + * E-Tag property name in results + */ + String E_TAG_KEY = "ecs-object-e-tag"; + + /** + * Version property name in results + */ + String PROPERTY_VERSION_KEY = "ecs-object-property-version"; + + Logger log = LoggerFactory.getLogger(PropertiesSerDes.class); + + /** + * read properties from stream + * + * @param input is stream + * @return properties + */ + Map read(InputStream input); + + /** + * Get the properties with the object content. + *

+ * This method help to put additional properties in object metadata. + */ + default Map readProperties(byte[] content, String eTag, String version) { + Map propertiesInObject = read(new ByteArrayInputStream(content)); + Map properties = new HashMap<>(propertiesInObject); + properties.put(E_TAG_KEY, eTag); + properties.put(PROPERTY_VERSION_KEY, version); + return properties; + } + + /** + * Write properties to bytes. + */ + byte[] toBytes(Map value); + + /** + * Create a {@link PropertiesSerDes} to serialize and deserialize properties with {@link Properties}. + */ + static PropertiesSerDes useJdk() { + return new PropertiesSerDes() { + @Override + public Map read(InputStream input) { + Properties jdkProperties = new Properties(); + try { + jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8)); + } catch (IOException e) { + log.error("fail to read properties", e); + throw new UncheckedIOException(e); + } + Set propertyNames = jdkProperties.stringPropertyNames(); + Map properties = new HashMap<>(); + for (String name : propertyNames) { + properties.put(name, jdkProperties.getProperty(name)); + } + return Collections.unmodifiableMap(properties); + } + + @Override + public byte[] toBytes(Map value) { + Properties jdkProperties = new Properties(); + for (Map.Entry entry : value.entrySet()) { + jdkProperties.setProperty(entry.getKey(), entry.getValue()); + } + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + jdkProperties.store(new OutputStreamWriter(output, StandardCharsets.UTF_8), null); + return output.toByteArray(); + } catch (IOException e) { + log.error("fail to store properties {} to file", value, e); + throw new UncheckedIOException(e); + } + } + }; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/ContentAndHeadInfoImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/ContentAndHeadInfoImpl.java new file mode 100644 index 000000000000..6e06b979bffb --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/ContentAndHeadInfoImpl.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import org.apache.iceberg.dell.EcsClient; +import org.apache.iceberg.dell.ObjectHeadInfo; + +/** + * A record class of {@link EcsClient.ContentAndHeadInfo} + */ +public class ContentAndHeadInfoImpl implements EcsClient.ContentAndHeadInfo { + + private final ObjectHeadInfo headInfo; + private final byte[] content; + + public ContentAndHeadInfoImpl(ObjectHeadInfo headInfo, byte[] content) { + this.headInfo = headInfo; + this.content = content; + } + + @Override + public ObjectHeadInfo getHeadInfo() { + return headInfo; + } + + @Override + public byte[] getContent() { + return content; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/EcsAppendOutputStream.java b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsAppendOutputStream.java new file mode 100644 index 000000000000..0baeac04b1d1 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsAppendOutputStream.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.Headers; +import com.amazonaws.services.s3.model.ObjectMetadata; +import java.io.ByteArrayInputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.iceberg.dell.ObjectKey; + +/** + * An {@link OutputStream} implementation of ECS append API. + */ +public class EcsAppendOutputStream extends OutputStream { + + private final AmazonS3 s3; + private final ObjectKey key; + + /** + * local bytes cache that avoid too many requests + *

+ * use {@link ByteBuffer} to maintain offset + */ + private final ByteBuffer localCache; + + private boolean firstPart = true; + + public EcsAppendOutputStream(AmazonS3 s3, ObjectKey key, byte[] localCache) { + this.s3 = s3; + this.key = key; + this.localCache = ByteBuffer.wrap(localCache); + } + + @Override + public void write(int b) { + if (!checkBuffer(1)) { + flush(); + } + localCache.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) { + if (!checkBuffer(len)) { + flush(); + } + if (checkBuffer(len)) { + localCache.put(b, off, len); + } else { + // if content > cache, directly flush itself. + flushBuffer(b, off, len); + } + } + + private boolean checkBuffer(int nextWrite) { + return localCache.remaining() >= nextWrite; + } + + private void flushBuffer(byte[] buffer, int offset, int length) { + ObjectMetadata metadata = new ObjectMetadata(); + if (firstPart) { + firstPart = false; + } else { + // only following parts need append api + metadata.setHeader(Headers.RANGE, "bytes=-1-"); + } + metadata.setContentLength(length); + s3.putObject(key.getBucket(), key.getKey(), new ByteArrayInputStream(buffer, offset, length), metadata); + } + + /** + * flush all cached bytes + */ + @Override + public void flush() { + if (localCache.remaining() < localCache.capacity()) { + localCache.flip(); + flushBuffer(localCache.array(), localCache.arrayOffset(), localCache.remaining()); + localCache.clear(); + } + } + + @Override + public void close() { + // call flush to guarantee all bytes are submitted + flush(); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/EcsClientImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsClientImpl.java new file mode 100644 index 000000000000..327974eb032f --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsClientImpl.java @@ -0,0 +1,250 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import org.apache.iceberg.dell.EcsClient; +import org.apache.iceberg.dell.ObjectHeadInfo; +import org.apache.iceberg.dell.ObjectKey; +import org.apache.iceberg.dell.ObjectKeys; +import org.apache.iceberg.dell.PropertiesSerDes; + +/** + * An implementation of {@link EcsClient} + *

+ * ECS use aws sdk v1 to support private function. + */ +public class EcsClientImpl implements EcsClient { + + private final AmazonS3 s3; + private final ObjectKeys keys; + private final PropertiesSerDes propertiesSerDes; + + public EcsClientImpl( + AmazonS3 s3, + ObjectKeys keys, + PropertiesSerDes propertiesSerDes) { + this.s3 = s3; + this.keys = keys; + this.propertiesSerDes = propertiesSerDes; + } + + @Override + public ObjectKeys objectKeys() { + return keys; + } + + @Override + public PropertiesSerDes propertiesSerDes() { + return propertiesSerDes; + } + + /** + * Delegate to {@link AmazonS3#getObjectMetadata(java.lang.String, java.lang.String)} + */ + @Override + public Optional head(ObjectKey key) { + try { + ObjectMetadata metadata = s3.getObjectMetadata(key.getBucket(), key.getKey()); + return Optional.of(new ObjectHeadInfoImpl( + metadata.getContentLength(), + metadata.getETag(), + metadata.getUserMetadata())); + } catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + return Optional.empty(); + } else { + throw e; + } + } + } + + /** + * Delegate to {@link AmazonS3#getObject(com.amazonaws.services.s3.model.GetObjectRequest)} with a range option. + */ + @Override + public InputStream inputStream(ObjectKey key, long pos) { + S3Object object = s3.getObject(new GetObjectRequest(key.getBucket(), key.getKey()) + .withRange(pos)); + return object.getObjectContent(); + } + + @Override + public OutputStream outputStream(ObjectKey key) { + return new EcsAppendOutputStream(s3, key, new byte[1_000]); + } + + /** + * Delegate to {@link AmazonS3#getObject(com.amazonaws.services.s3.model.GetObjectRequest)}. + */ + @Override + public ContentAndHeadInfo readAll(ObjectKey key) { + S3Object object = s3.getObject(new GetObjectRequest(key.getBucket(), key.getKey())); + ObjectMetadata metadata = object.getObjectMetadata(); + int size = (int) metadata.getContentLength(); + byte[] content = new byte[size]; + try (S3ObjectInputStream input = object.getObjectContent()) { + int offset = 0; + while (offset < size) { + offset += input.read(content, offset, size - offset); + } + if (offset != size) { + throw new IllegalStateException(String.format( + "size of %s is unmatched, current size %d != %d", + key, offset, size)); + } + } catch (IOException e) { + throw new UncheckedIOException("rethrow unchecked exception during read all bytes", e); + } + ObjectHeadInfoImpl headInfo = new ObjectHeadInfoImpl( + metadata.getContentLength(), + metadata.getETag(), + metadata.getUserMetadata()); + return new ContentAndHeadInfoImpl(headInfo, content); + } + + /** + * Delegate to {@link AmazonS3#putObject(java.lang.String, java.lang.String, java.io.InputStream, com.amazonaws.services.s3.model.ObjectMetadata)} + * with an ECS customer header "If-Match = ${E-Tag}". + */ + @Override + public boolean replace(ObjectKey key, String eTag, byte[] bytes, Map userMetadata) { + return cas(() -> { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setHeader("If-Match", eTag); + metadata.setContentLength(bytes.length); + metadata.setUserMetadata(userMetadata); + s3.putObject(key.getBucket(), key.getKey(), new ByteArrayInputStream(bytes), metadata); + }); + } + + /** + * Delegate to {@link AmazonS3#putObject(java.lang.String, java.lang.String, java.io.InputStream, com.amazonaws.services.s3.model.ObjectMetadata)} + * with an ECS customer header "If-None-Match = *". + */ + @Override + public boolean writeIfAbsent(ObjectKey key, byte[] bytes, Map userMetadata) { + return cas(() -> { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setHeader("If-None-Match", "*"); + metadata.setContentLength(bytes.length); + metadata.setUserMetadata(userMetadata); + s3.putObject(key.getBucket(), key.getKey(), new ByteArrayInputStream(bytes), metadata); + }); + } + + /** + * Delegate to {@link AmazonS3#copyObject(com.amazonaws.services.s3.model.CopyObjectRequest)} with + * AWS S3's customer header "x-amz-copy-source-if-match = ${E-Tag}" and an ECS customer header "If-None-Match = *". + */ + @Override + public boolean copyObjectIfAbsent(ObjectKey fromKey, String eTag, ObjectKey toKey) { + return cas(() -> { + CopyObjectRequest request = new CopyObjectRequest( + fromKey.getBucket(), + fromKey.getKey(), + toKey.getBucket(), + toKey.getKey()); + request.setMatchingETagConstraints(Collections.singletonList(eTag)); + request.putCustomRequestHeader("If-None-Match", "*"); + s3.copyObject(request); + }); + } + + /** + * Process CAS error code. + */ + private boolean cas(Runnable fn) { + try { + fn.run(); + return true; + } catch (AmazonS3Exception e) { + if ("PreconditionFailed".equals(e.getErrorCode())) { + return false; + } else { + throw e; + } + } + } + + /** + * Delegate to {@link AmazonS3#deleteObject(java.lang.String, java.lang.String)} + */ + @Override + public void deleteObject(ObjectKey key) { + s3.deleteObject(key.getBucket(), key.getKey()); + } + + /** + * Delegate to {@link AmazonS3#listObjectsV2(com.amazonaws.services.s3.model.ListObjectsV2Request)}, and process + * the result list. + */ + @Override + public List listDelimiterAll(ObjectKey prefix, Function> filterAndMapper) { + String delimiter = objectKeys().getDelimiter(); + List result = new ArrayList<>(); + String prefixKey; + if (prefix.getKey().isEmpty()) { + prefixKey = ""; + } else if (prefix.getKey().endsWith(delimiter)) { + prefixKey = prefix.getKey(); + } else { + prefixKey = prefix.getKey() + delimiter; + } + String continuationToken = null; + do { + ListObjectsV2Result response = s3.listObjectsV2( + new ListObjectsV2Request() + .withDelimiter(delimiter) + .withPrefix(prefixKey) + .withContinuationToken(continuationToken)); + continuationToken = response.getNextContinuationToken(); + for (S3ObjectSummary objectSummary : response.getObjectSummaries()) { + Optional itemOpt = filterAndMapper.apply(new ObjectKey( + objectSummary.getBucketName(), + objectSummary.getKey())); + if (!itemOpt.isPresent()) { + continue; + } + result.add(itemOpt.get()); + } + } while (continuationToken != null); + return result; + } + + @Override + public void close() { + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectHeadInfoImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectHeadInfoImpl.java new file mode 100644 index 000000000000..f8dc14658e50 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectHeadInfoImpl.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import java.util.Collections; +import java.util.Map; +import org.apache.iceberg.dell.ObjectHeadInfo; + +/** + * A record class of {@link ObjectHeadInfo} + */ +public class ObjectHeadInfoImpl implements ObjectHeadInfo { + private final long contentLength; + private final String eTag; + private final Map userMetadata; + + public ObjectHeadInfoImpl(long contentLength, String eTag, Map userMetadata) { + this.contentLength = contentLength; + this.eTag = eTag; + this.userMetadata = Collections.unmodifiableMap(userMetadata); + } + + @Override + public long getContentLength() { + return contentLength; + } + + @Override + public String getETag() { + return eTag; + } + + @Override + public Map getUserMetadata() { + return userMetadata; + } + + @Override + public String toString() { + return "ObjectHeadInfoImpl{" + + "contentLength=" + contentLength + + ", eTag='" + eTag + '\'' + + ", userMetadata=" + userMetadata + + '}'; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectKeysImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectKeysImpl.java new file mode 100644 index 000000000000..b9eafc3fc43e --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectKeysImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import java.util.List; +import org.apache.iceberg.dell.ObjectBaseKey; +import org.apache.iceberg.dell.ObjectKeys; + +/** + * The implementation of {@link ObjectKeys} + */ +public class ObjectKeysImpl implements ObjectKeys { + + private final ObjectBaseKey baseKey; + + /** + * A cached lazy results of {@link #baseKeyParts()} + */ + private volatile List lazyBaseKeyParts; + + public ObjectKeysImpl(ObjectBaseKey baseKey) { + this.baseKey = baseKey; + } + + @Override + public ObjectBaseKey baseKey() { + return baseKey; + } + + /** + * Use field to cache result parts. + */ + @Override + public List baseKeyParts() { + // code isn't full thread safe. but creating instances in multiple times is fine + if (lazyBaseKeyParts == null) { + lazyBaseKeyParts = ObjectKeys.super.baseKeyParts(); + } + return lazyBaseKeyParts; + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/MemoryEcsClient.java b/dell/src/test/java/org/apache/iceberg/dell/MemoryEcsClient.java new file mode 100644 index 000000000000..2e473757b6e3 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/MemoryEcsClient.java @@ -0,0 +1,193 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import com.amazonaws.util.Md5Utils; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.dell.impl.ContentAndHeadInfoImpl; +import org.apache.iceberg.dell.impl.ObjectHeadInfoImpl; +import org.apache.iceberg.dell.impl.ObjectKeysImpl; + +public class MemoryEcsClient implements EcsClient { + + private final ObjectBaseKey baseKey; + + private final ConcurrentMap data = new ConcurrentHashMap<>(); + + public MemoryEcsClient(ObjectBaseKey baseKey) { + this.baseKey = baseKey; + } + + @Override + public ObjectKeys objectKeys() { + return new ObjectKeysImpl(baseKey); + } + + @Override + public Optional head(ObjectKey key) { + return Optional.ofNullable(data.get(key)).map(EcsObject::getHeadInfo); + } + + @Override + public InputStream inputStream(ObjectKey key, long pos) { + EcsObject object = data.get(key); + byte[] content = object.getContent(); + return new ByteArrayInputStream(content, (int) pos, content.length); + } + + @Override + public OutputStream outputStream(ObjectKey key) { + return new WrappedOutputStream(key); + } + + @Override + public ContentAndHeadInfo readAll(ObjectKey key) { + return data.get(key).getContentAndETag(); + } + + @Override + public boolean replace(ObjectKey key, String eTag, byte[] bytes, Map userMetadata) { + EcsObject original = data.get(key); + if (original == null) { + return false; + } + if (!original.getHeadInfo().getETag().equals(eTag)) { + return false; + } + return data.replace(key, original, EcsObject.create(bytes, userMetadata)); + } + + @Override + public boolean writeIfAbsent(ObjectKey key, byte[] bytes, Map userMetadata) { + return data.putIfAbsent(key, EcsObject.create(bytes, userMetadata)) == null; + } + + @Override + public boolean copyObjectIfAbsent(ObjectKey fromKey, String eTag, ObjectKey toKey) { + EcsObject original = data.get(fromKey); + if (original == null) { + return false; + } + if (!original.getHeadInfo().getETag().equals(eTag)) { + return false; + } + return data.putIfAbsent(toKey, original) == null; + } + + @Override + public void deleteObject(ObjectKey key) { + data.remove(key); + } + + @Override + public List listDelimiterAll(ObjectKey prefix, Function> filterAndMapper) { + String delimiter = objectKeys().getDelimiter(); + String prefixKey; + if (prefix.getKey().isEmpty()) { + prefixKey = ""; + } else if (prefix.getKey().endsWith(delimiter)) { + prefixKey = prefix.getKey(); + } else { + prefixKey = prefix.getKey() + delimiter; + } + int prefixLength = prefixKey.length(); + return data.keySet().stream() + .filter(key -> { + if (!Objects.equals(key.getBucket(), prefix.getBucket())) { + return false; + } + if (!key.getKey().startsWith(prefixKey)) { + return false; + } + return key.getKey().indexOf(delimiter, prefixLength) < 0; + }) + .sorted(Comparator.comparing(ObjectKey::getBucket).thenComparing(ObjectKey::getKey)) + .flatMap(key -> filterAndMapper.apply(key).map(Stream::of).orElse(Stream.empty())) + .collect(Collectors.toList()); + } + + @Override + public void close() { + } + + public static class EcsObject { + + private final ObjectHeadInfo headInfo; + private final byte[] content; + + public EcsObject(ObjectHeadInfo headInfo, byte[] content) { + this.headInfo = headInfo; + this.content = content; + } + + public ObjectHeadInfo getHeadInfo() { + return headInfo; + } + + public byte[] getContent() { + return Arrays.copyOf(content, content.length); + } + + public ContentAndHeadInfo getContentAndETag() { + return new ContentAndHeadInfoImpl(getHeadInfo(), getContent()); + } + + public static EcsObject create(byte[] content, Map userMetadata) { + return new EcsObject( + new ObjectHeadInfoImpl(content.length, Md5Utils.md5AsBase64(content), userMetadata), + content); + } + } + + public class WrappedOutputStream extends OutputStream { + + private final ObjectKey key; + private final ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream(); + + public WrappedOutputStream(ObjectKey key) { + this.key = key; + } + + @Override + public void write(int b) { + byteArrayOutput.write(b); + } + + @Override + public void write(byte[] b, int off, int len) { + byteArrayOutput.write(b, off, len); + } + + @Override + public void close() { + data.put(key, EcsObject.create(byteArrayOutput.toByteArray(), Collections.emptyMap())); + } + } +} diff --git a/settings.gradle b/settings.gradle index ba00916e8239..ebb5fbcdb222 100644 --- a/settings.gradle +++ b/settings.gradle @@ -38,6 +38,7 @@ include 'spark3-runtime' include 'pig' include 'hive-metastore' include 'nessie' +include 'dell' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -59,6 +60,7 @@ project(':spark3-runtime').name = 'iceberg-spark3-runtime' project(':pig').name = 'iceberg-pig' project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' +project(':dell').name = 'iceberg-dell' if (JavaVersion.current() == JavaVersion.VERSION_1_8) { include 'spark2' diff --git a/versions.props b/versions.props index d2bafd8489dc..285d1fb284ed 100644 --- a/versions.props +++ b/versions.props @@ -17,6 +17,7 @@ org.apache.arrow:arrow-vector = 2.0.0 org.apache.arrow:arrow-memory-netty = 2.0.0 com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 software.amazon.awssdk:* = 2.15.7 +com.amazonaws:* = 1.11.974 org.scala-lang:scala-library = 2.12.10 org.projectnessie:* = 0.5.1 javax.ws.rs:javax.ws.rs-api = 2.1.1