diff --git a/build.gradle b/build.gradle index 00ad33463f7a..f2f8d44fcdd6 100644 --- a/build.gradle +++ b/build.gradle @@ -399,6 +399,7 @@ project(':iceberg-flink-runtime') { compile(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + compile(project(':iceberg-dell')) } shadowJar { @@ -869,6 +870,7 @@ if (jdkVersion == '8') { compile(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + compile(project(':iceberg-dell')) } shadowJar { @@ -1027,6 +1029,7 @@ project(':iceberg-spark3-runtime') { compile(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + compile(project(':iceberg-dell')) integrationImplementation 'org.apache.spark:spark-hive_2.12' integrationImplementation 'junit:junit' @@ -1132,6 +1135,14 @@ project(':iceberg-nessie') { } } +project(':iceberg-dell') { + + dependencies { + compile project(':iceberg-core') + compileOnly 'com.amazonaws:aws-java-sdk-s3' + } +} + @Memoized boolean isVersionFileExists() { return file('version.txt').exists() diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsCatalog.java b/dell/src/main/java/org/apache/iceberg/dell/EcsCatalog.java new file mode 100644 index 000000000000..73243aeb339f --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsCatalog.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ECS catalog implementation + */ +public class EcsCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Cloneable { + + private static final Logger log = LoggerFactory.getLogger(EcsCatalog.class); + + private String catalogName; + private EcsClient ecs; + + /** + * @param name a custom name for the catalog + * @param properties catalog properties + */ + @Override + public void initialize(String name, Map properties) { + catalogName = name; + ecs = EcsClient.create(properties); + } + + @Override + public String name() { + return catalogName; + } + + @Override + public List listTables(Namespace namespace) { + return ecs.listTables(namespace); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + ObjectKey tableMetadataKey = ecs.getKeys().getMetadataKey(identifier); + if (purge) { + // if re-use the same instance, current() will throw exception. + TableOperations ops = newTableOps(identifier); + TableMetadata current = ops.current(); + if (current == null) { + return false; + } + CatalogUtil.dropTableData(ops.io(), current); + } + ecs.deleteObject(tableMetadataKey); + return true; + } + + /** + * rename table only move table object, the data and metadata will still be in-place. + * + * @param from identifier of the table to rename + * @param to new table name + */ + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + ObjectKey fromKey = ecs.getKeys().getMetadataKey(from); + ObjectKey toKey = ecs.getKeys().getMetadataKey(to); + Optional fromHeadOpt = ecs.head(fromKey); + if (!fromHeadOpt.isPresent()) { + throw new NoSuchTableException("table %s(%s) is absent", from, fromKey); + } + String eTag = fromHeadOpt.get().getETag(); + if (!ecs.copyObjectIfAbsent(fromKey, eTag, toKey)) { + throw new AlreadyExistsException("table %s is present", to); + } + log.info("rename table {} to {}", from, to); + ecs.deleteObject(fromKey); + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + if (namespace.isEmpty()) { + throw new AlreadyExistsException("namespace %s has already existed", namespace); + } + ObjectKey metadataKey = ecs.getKeys().getMetadataKey(namespace); + if (ecs.head(metadataKey).isPresent()) { + throw new AlreadyExistsException("namespace %s(%s) has already existed", namespace, metadataKey); + } + if (!ecs.writePropertiesIfAbsent(metadataKey, metadata)) { + throw new AlreadyExistsException("namespace %s(%s) has already existed", namespace, metadataKey); + } + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return ecs.listNamespaces(namespace); + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + ecs.assertNamespaceExist(namespace); + ObjectKey metadataKey = ecs.getKeys().getMetadataKey(namespace); + if (namespace.isEmpty()) { + if (!ecs.head(metadataKey).isPresent()) { + return Collections.emptyMap(); + } + } + return ecs.readProperties(metadataKey); + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + if (namespace.isEmpty()) { + // empty namespace can't be dropped + return false; + } + ObjectKey metadataKey = ecs.getKeys().getMetadataKey(namespace); + if (!listNamespaces(namespace).isEmpty() || !listTables(namespace).isEmpty()) { + throw new NamespaceNotEmptyException("namespace %s is not empty", namespace); + } + ecs.deleteObject(metadataKey); + return true; + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) throws NoSuchNamespaceException { + Map namespaceMetadata = loadNamespaceMetadata(namespace); + String eTag = namespaceMetadata.get(EcsClient.E_TAG_KEY); + if (eTag == null) { + throw new UnsupportedOperationException("eTag isn't in properties"); + } + ObjectKey metadataKey = ecs.getKeys().getMetadataKey(namespace); + Map newProperties = new HashMap<>(namespaceMetadata); + newProperties.putAll(properties); + newProperties.remove(EcsClient.E_TAG_KEY); + return ecs.replaceProperties(metadataKey, eTag, newProperties); + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) throws NoSuchNamespaceException { + Map namespaceMetadata = loadNamespaceMetadata(namespace); + String eTag = namespaceMetadata.get(EcsClient.E_TAG_KEY); + if (eTag == null) { + throw new UnsupportedOperationException("eTag isn't in properties"); + } + ObjectKey metadataKey = ecs.getKeys().getMetadataKey(namespace); + Map newProperties = new HashMap<>(namespaceMetadata); + newProperties.keySet().removeAll(properties); + newProperties.remove(EcsClient.E_TAG_KEY); + return ecs.replaceProperties(metadataKey, eTag, newProperties); + } + + @Override + public boolean namespaceExists(Namespace namespace) { + if (namespace.isEmpty()) { + return true; + } + return ecs.head(ecs.getKeys().getMetadataKey(namespace)).isPresent(); + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new EcsTableOperations( + catalogName + "." + tableIdentifier, + tableIdentifier, + ecs, + new EcsFileIO(ecs.copy())); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return ecs.getKeys().toString(ecs.getKeys().warehouseLocation(tableIdentifier)); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsCatalogProperties.java b/dell/src/main/java/org/apache/iceberg/dell/EcsCatalogProperties.java new file mode 100644 index 000000000000..22ad750d6729 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsCatalogProperties.java @@ -0,0 +1,162 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.dell.impl.EcsClientImpl; +import org.apache.iceberg.dell.impl.ObjectKeysImpl; +import org.apache.iceberg.util.PropertyUtil; + +/** + * property constants of catalog + */ +public interface EcsCatalogProperties { + + /** + * static access key id + */ + String ACCESS_KEY_ID = "s3.access.key.id"; + + /** + * static secret access key + */ + String SECRET_ACCESS_KEY = "s3.secret.access.key"; + + /** + * s3 endpoint + */ + String ENDPOINT = "s3.endpoint"; + + /** + * s3 region + */ + String REGION = "s3.region"; + + /** + * base key which is like "bucket/key" + *

+ * In current version, the bucket of base must be present. + *

+ * In future, we'll support list buckets in catalog + */ + String BASE_KEY = "s3.base.key"; + + /** + * factory method of {@link EcsClient}. + *

+ * The method should be static and use format like: "org.apache.iceberg.dell.EcsClient#create" + *

+ * The method must have only one parameter. And return exact {@link EcsClient} type. + */ + String ECS_CLIENT_FACTORY = "ecs.client.factory"; + + /** + * get object base key from properties + * + * @param properties is property + * @return instance of ObjectBaseKey + */ + static ObjectBaseKey getObjectBaseKey(Map properties) { + String baseKey = properties.get(BASE_KEY); + if (baseKey == null) { + throw new IllegalArgumentException(String.format("missing property %s", BASE_KEY)); + } + String[] baseKeySplits = baseKey.split(ObjectKeys.DELIMITER, 2); + if (baseKeySplits.length == 1) { + return new ObjectBaseKey(baseKeySplits[0], null); + } else { + return new ObjectBaseKey(baseKeySplits[0], baseKeySplits[1]); + } + } + + /** + * get ecs client from factory + * + * @return ecs client that created by factory + */ + static Optional getEcsClientFromFactory(Map properties) { + String factory = properties.get(ECS_CLIENT_FACTORY); + if (factory == null || factory.isEmpty()) { + return Optional.empty(); + } + String[] classAndMethod = factory.split("#", 2); + if (classAndMethod.length != 2) { + throw new IllegalArgumentException(String.format("invalid property %s", ECS_CLIENT_FACTORY)); + } + Class clazz; + try { + clazz = Class.forName(classAndMethod[0], true, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("invalid property %s", ECS_CLIENT_FACTORY), e); + } + EcsClient client; + try { + client = (EcsClient) MethodHandles.lookup() + .findStatic(clazz, classAndMethod[1], MethodType.methodType(EcsClient.class, Map.class)) + .invoke(properties); + } catch (Throwable e) { + throw new IllegalArgumentException( + String.format("invalid property %s that throw exception", ECS_CLIENT_FACTORY), + e); + } + if (client == null) { + throw new IllegalArgumentException(String.format( + "invalid property %s that return null client", + ECS_CLIENT_FACTORY)); + } + return Optional.of(client); + } + + /** + * get built-in ecs client + */ + static EcsClient getBuiltInEcsClient(Map properties) { + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard() + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials( + properties.get(EcsCatalogProperties.ACCESS_KEY_ID), + properties.get(EcsCatalogProperties.SECRET_ACCESS_KEY) + ) + ) + ); + String endpoint = properties.get(EcsCatalogProperties.ENDPOINT); + if (endpoint != null) { + builder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + endpoint, + PropertyUtil.propertyAsString(properties, EcsCatalogProperties.REGION, "-") + ) + ); + } else { + builder.withRegion(properties.get(EcsCatalogProperties.REGION)); + } + + return new EcsClientImpl( + builder.build(), + Collections.unmodifiableMap(new LinkedHashMap<>(properties)), + new ObjectKeysImpl(EcsCatalogProperties.getObjectBaseKey(properties)), + PropertiesSerDes.useJdk()); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsClient.java b/dell/src/main/java/org/apache/iceberg/dell/EcsClient.java new file mode 100644 index 000000000000..0c90ec29e22c --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsClient.java @@ -0,0 +1,255 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; + +/** + * ECS client and its util-methods. + */ +public interface EcsClient extends AutoCloseable { + + /** + * utils of object key that provide methods to convert {@link ObjectKey} to other objects declared in Iceberg. + * + * @return utils class + */ + ObjectKeys getKeys(); + + /** + * utils of properties that provide methods to convert object to properties. + * + * @return utils class + */ + default PropertiesSerDes getPropertiesSerDes() { + return PropertiesSerDes.useJdk(); + } + + /** + * get original properties. + *

+ * when try to implement this interface, make this properties same as input of {@link EcsClient#create(Map)} + * + * @return properties of client + */ + Map getProperties(); + + /** + * get object info of specific key. If object is absent, return {@link Optional#empty()} + * + * @param key object key + * @return head info if present + */ + Optional head(ObjectKey key); + + /** + * get object input stream of specific key. + * + * @param key object key + * @return input stream + */ + InputStream inputStream(ObjectKey key, long pos); + + /** + * get object output stream of specific key. + * + * @param key object key + * @return output stream + */ + OutputStream outputStream(ObjectKey key); + + /** + * return tuple of {@link #readAll(ObjectKey)} + */ + interface ContentAndETag { + ObjectHeadInfo getHeadInfo(); + + byte[] getContent(); + } + + /** + * read all bytes of object + * + * @return bytes and e-tag + */ + ContentAndETag readAll(ObjectKey key); + + /** + * a util method of {@link #readAll(ObjectKey)} + */ + default Map readProperties(ObjectKey key) { + ContentAndETag contentAndETag = readAll(key); + return getPropertiesSerDes().readProperties( + contentAndETag.getContent(), + contentAndETag.getHeadInfo().getETag(), + contentAndETag.getHeadInfo().getUserMetadata().getOrDefault( + PROPERTY_VERSION_KEY, + PropertiesSerDes.CURRENT_VERSION)); + } + + /** + * CAS operation + *

+ * we assume that E-Tag can distinct content of the objects. + *

+ * if current object's eTag is not matched, the method will return false, and the object won't be changed + */ + boolean replace(ObjectKey key, String eTag, byte[] bytes, Map userMetadata); + + /** + * a util method of {@link #replace(ObjectKey, String, byte[], Map)} + */ + default boolean replaceProperties(ObjectKey key, String eTag, Map properties) { + return replace(key, eTag, getPropertiesSerDes().toBytes(properties), + Collections.singletonMap(PROPERTY_VERSION_KEY, PropertiesSerDes.CURRENT_VERSION)); + } + + /** + * compare-and-swap operation + *

+ * if current key is not existed, the method will return false, and the key is still absent. + */ + boolean writeIfAbsent(ObjectKey key, byte[] bytes, Map userMetadata); + + /** + * a util method of {@link #writeIfAbsent(ObjectKey, byte[], Map)} + */ + default boolean writePropertiesIfAbsent(ObjectKey key, Map properties) { + return writeIfAbsent(key, getPropertiesSerDes().toBytes(properties), + Collections.singletonMap(PROPERTY_VERSION_KEY, PropertiesSerDes.CURRENT_VERSION)); + } + + /** + * compare-and-swap operation + *

+ * we assume that E-Tag can distinct content of the objects. + */ + boolean copyObjectIfAbsent(ObjectKey fromKey, String eTag, ObjectKey toKey); + + /** + * delete object + * + * @param key is object key + */ + void deleteObject(ObjectKey key); + + /** + * list all objects with delimiter. + *

+ * for example: there are objects like: + *

    + *
  • namespace1/namespace2.namespace
  • + *
  • namespace1/namespace2/table1.table
  • + *
  • namespace1/table1.table
  • + *
  • namespace1/table2.table
  • + *
+ * if prefix is namespace1 and delimiter is /, then return value will be + *
    + *
  • namespace1/table1.table
  • + *
  • namespace1/table2.table
  • + *
+ *

+ * The function will can filter and convert to the object that user want to use + *

+ * note: the common prefixes, such as namespace1/namespace2/, won't return by this method. + * + * @param prefix prefix key + * @param filterAndMapper map object key to specify item + * @param search item type + * @return all items with given prefix + */ + List listDelimiterAll(ObjectKey prefix, Function> filterAndMapper); + + /** + * list all tables under this namespace. + * + * @param namespace a namespace + * @return a list of identifiers for tables + */ + default List listTables(Namespace namespace) throws NoSuchNamespaceException { + assertNamespaceExist(namespace); + return listDelimiterAll(getKeys().getPrefix(namespace), key -> getKeys().extractTableIdentifier(key, namespace)); + } + + /** + * list all namespaces under this namespace. + * + * @param namespace a namespace + * @return a list of namespace + */ + default List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + assertNamespaceExist(namespace); + return listDelimiterAll(getKeys().getPrefix(namespace), key -> getKeys().extractNamespace(key, namespace)); + } + + /** + * check namespace object existence. If not exist, throw exception. + * + * @param namespace is a non-null namespace + * @throws NoSuchNamespaceException if namespace object is absent + */ + default void assertNamespaceExist(Namespace namespace) throws NoSuchNamespaceException { + if (namespace.isEmpty()) { + return; + } + ObjectKey key = getKeys().getMetadataKey(namespace); + if (!head(key).isPresent()) { + throw new NoSuchNamespaceException("namespace %s(%s) is not found", namespace, key); + } + } + + /** + * copy a client + * + * @return a new ecs client + */ + default EcsClient copy() { + return EcsClient.create(getProperties()); + } + + /** + * ETag property name in results + * + * @see #readProperties(ObjectKey) + */ + String E_TAG_KEY = "ecs-object-e-tag"; + + /** + * version property name in results + * + * @see #readProperties(ObjectKey) + */ + String PROPERTY_VERSION_KEY = "ecs-object-property-version"; + + /** + * static factory method of {@link EcsClient} + * + * @param properties is properties + * @return ecs client + */ + static EcsClient create(Map properties) { + return EcsCatalogProperties.getEcsClientFromFactory(properties) + .orElseGet(() -> EcsCatalogProperties.getBuiltInEcsClient(properties)); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsFile.java b/dell/src/main/java/org/apache/iceberg/dell/EcsFile.java new file mode 100644 index 000000000000..68444839b15c --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsFile.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import org.apache.iceberg.dell.utils.PositionOutputStreamAdapter; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.io.SeekableInputStream; + +/** + * The file impl of {@link InputFile} and {@link OutputFile} + */ +public class EcsFile implements InputFile, OutputFile { + + private final EcsClient ecs; + private final String location; + private final ObjectKey key; + + public EcsFile(EcsClient ecs, String location, ObjectKey key) { + this.ecs = ecs; + this.location = location; + this.key = key; + } + + /** + * eager-get object length + * + * @return length if object exists + */ + @Override + public long getLength() { + return ecs.head(key) + .orElseThrow(() -> new IllegalStateException(String.format("object not found %s", key))) + .getContentLength(); + } + + @Override + public SeekableInputStream newStream() { + return new EcsSeekableInputStream(ecs, key); + } + + /** + * here are some confused things: + *

+ * 1. Should check existence when flush? + *

+ * 2. Should use a placeholder object? + * + * @return output stream of object + */ + @Override + public PositionOutputStream create() { + if (!exists()) { + return createOrOverwrite(); + } else { + throw new AlreadyExistsException("Invalid key"); + } + } + + @Override + public PositionOutputStream createOrOverwrite() { + return new PositionOutputStreamAdapter(ecs.outputStream(key)); + } + + @Override + public String location() { + return location; + } + + @Override + public InputFile toInputFile() { + return this; + } + + /** + * eager-get object existence + * + * @return true if object exists + */ + @Override + public boolean exists() { + return ecs.head(key).isPresent(); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java b/dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java new file mode 100644 index 000000000000..b4d2ad000cc1 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link java.io.Externalizable} FileIO that manage the life cycle of {@link EcsClient} + */ +public class EcsFileIO implements FileIO, Externalizable, AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(EcsFileIO.class); + + private EcsClient ecs; + private final AtomicBoolean closed = new AtomicBoolean(false); + + /** + * constructor for {@link Externalizable} + */ + public EcsFileIO() { + } + + /** + * constructor + * + * @param ecs is an ecs client + */ + public EcsFileIO(EcsClient ecs) { + this.ecs = ecs; + } + + @Override + public void close() throws Exception { + if (closed.compareAndSet(false, true)) { + log.info("close s3 file io"); + ecs.close(); + } + } + + @Override + public InputFile newInputFile(String path) { + checkOpen(); + return new EcsFile(ecs, path, ecs.getKeys().parse(path)); + } + + @Override + public OutputFile newOutputFile(String path) { + checkOpen(); + return new EcsFile(ecs, path, ecs.getKeys().parse(path)); + } + + @Override + public void deleteFile(String path) { + checkOpen(); + ecs.deleteObject(ecs.getKeys().parse(path)); + } + + /** + * manual serialize object + * + * @param out is object output stream + * @throws IOException if exceptional + */ + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ecs.getProperties()); + } + + /** + * manual deserialize object + * + * @param in is object input stream + * @throws IOException if exceptional + * @throws ClassNotFoundException if exceptional + */ + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + @SuppressWarnings("unchecked") + Map properties = (Map) in.readObject(); + ecs = EcsClient.create(properties); + } + + private void checkOpen() { + if (closed.get()) { + throw new IllegalStateException("file io is closed"); + } + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java b/dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java new file mode 100644 index 000000000000..addad700ea43 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.iceberg.io.SeekableInputStream; + +/** + * {@link SeekableInputStream} impl that warp {@link EcsClient#inputStream(ObjectKey, long)} + *

+ * 1. the stream is only be loaded when start reading. + *

+ * 2. this class won't catch any bytes of content. It only maintains pos of {@link SeekableInputStream} + *

+ * 3. this class is not thread-safe. + */ +public class EcsSeekableInputStream extends SeekableInputStream { + + private final EcsClient ecs; + private final ObjectKey key; + + /** + * mutable pos set by {@link #seek(long)} + */ + private long newPos = 0; + /** + * current pos of object content + */ + private long pos = -1; + private InputStream internal; + + public EcsSeekableInputStream(EcsClient ecs, ObjectKey key) { + this.ecs = ecs; + this.key = key; + } + + @Override + public long getPos() { + return newPos >= 0 ? newPos : pos; + } + + @Override + public void seek(long inputNewPos) { + if (pos == inputNewPos) { + return; + } + newPos = inputNewPos; + } + + @Override + public int read() throws IOException { + syncNewPosToPos(); + pos += 1; + return internal.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + syncNewPosToPos(); + int delta = internal.read(b, off, len); + pos += delta; + return delta; + } + + private void syncNewPosToPos() throws IOException { + if (newPos < 0) { + return; + } + if (newPos == pos) { + newPos = -1; + return; + } + if (internal != null) { + internal.close(); + } + pos = newPos; + internal = ecs.inputStream(key, pos); + newPos = -1; + } + + @Override + public void close() throws IOException { + if (internal != null) { + internal.close(); + } + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/EcsTableOperations.java b/dell/src/main/java/org/apache/iceberg/dell/EcsTableOperations.java new file mode 100644 index 000000000000..5816136c9a25 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/EcsTableOperations.java @@ -0,0 +1,153 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Collections; +import java.util.Map; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; + +/** + * use {@link EcsClient} to implement {@link BaseMetastoreTableOperations} + */ +public class EcsTableOperations extends BaseMetastoreTableOperations { + + public static final String ICEBERG_METADATA_LOCATION = "icebergMetadataLocation"; + + /** + * table full name + */ + private final String tableName; + /** + * table id + */ + private final TableIdentifier id; + /** + * ecs client for table metadata + */ + private final EcsClient ecs; + private final FileIO io; + + /** + * cached properties for CAS commit + * + * @see #doRefresh() when reset this field + * @see #doCommit(TableMetadata, TableMetadata) when use this field + */ + private Map cachedProperties; + + public EcsTableOperations(String tableName, TableIdentifier id, EcsClient ecs, FileIO io) { + this.tableName = tableName; + this.id = id; + this.ecs = ecs; + this.io = io; + } + + @Override + protected String tableName() { + return tableName; + } + + @Override + public FileIO io() { + return io; + } + + @Override + protected void doRefresh() { + String metadataLocation; + ObjectKey metadataKey = ecs.getKeys().getMetadataKey(id); + if (!ecs.head(metadataKey).isPresent()) { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("metadata is null"); + } else { + metadataLocation = null; + } + } else { + Map metadata = queryProperties(); + this.cachedProperties = metadata; + metadataLocation = metadata.get(ICEBERG_METADATA_LOCATION); + if (metadataLocation == null) { + throw new IllegalStateException("can't find location from table metadata"); + } + } + refreshFromMetadataLocation(metadataLocation); + } + + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + ObjectKey metadataKey = ecs.getKeys().getMetadataKey(id); + int currentVersion = currentVersion(); + int nextVersion = currentVersion + 1; + String newMetadataLocation = writeNewMetadata(metadata, nextVersion); + if (base == null) { + // create a new table, the metadataKey should be absent + boolean r = ecs.writePropertiesIfAbsent(metadataKey, buildProperties(newMetadataLocation)); + if (!r) { + throw new CommitFailedException("table exist when commit %s(%s)", debug(metadata), newMetadataLocation); + } + } else { + Map properties = cachedProperties; + if (properties == null) { + throw new CommitFailedException("when commit, local metadata is null, %s(%s) -> %s(%s)", + debug(base), currentMetadataLocation(), + debug(metadata), newMetadataLocation); + } + // replace to a new version, the E-Tag should be present and matched + boolean r = ecs.replaceProperties( + metadataKey, + properties.get(EcsClient.E_TAG_KEY), + buildProperties(newMetadataLocation) + ); + if (!r) { + throw new CommitFailedException("replace failed, properties %s, %s(%s) -> %s(%s)", properties, + debug(base), currentMetadataLocation(), + debug(metadata), newMetadataLocation); + } + } + } + + private Map queryProperties() { + return ecs.readProperties(ecs.getKeys().getMetadataKey(id)); + } + + /** + * debug string for exception + * + * @param metadata is table metadata + * @return debug string of metadata + */ + private String debug(TableMetadata metadata) { + if (metadata.currentSnapshot() == null) { + return "EmptyTable"; + } else { + return "Table(currentSnapshotId = " + metadata.currentSnapshot().snapshotId() + ")"; + } + } + + /** + * build a new properties for table + * + * @param metadataLocation is metadata json file location + * @return properties + */ + private Map buildProperties(String metadataLocation) { + return Collections.singletonMap(ICEBERG_METADATA_LOCATION, metadataLocation); + } +} \ No newline at end of file diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectBaseKey.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectBaseKey.java new file mode 100644 index 000000000000..79ead85dd8af --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectBaseKey.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Objects; + +/** + * a immutable record class of object base key which allow not null bucket and key. + */ +public class ObjectBaseKey { + + private final String bucket; + public final String key; + + public ObjectBaseKey(String bucket, String key) { + this.bucket = bucket; + this.key = key; + } + + public ObjectKey asKey() { + if (bucket == null) { + throw new IllegalArgumentException(String.format( + "fail to cast base key %s as object key, bucket is unknown", + this)); + } + return new ObjectKey(bucket, key == null ? "" : key); + } + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + @Override + public String toString() { + return "ObjectBaseKey{" + + "bucket='" + bucket + '\'' + + ", key='" + key + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ObjectBaseKey that = (ObjectBaseKey) o; + return Objects.equals(bucket, that.bucket) && Objects.equals(key, that.key); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, key); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectHeadInfo.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectHeadInfo.java new file mode 100644 index 000000000000..ea9f66457574 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectHeadInfo.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Map; + +/** + * object head info + *

+ * it's the basic info of an object in the object storage. + */ +public interface ObjectHeadInfo { + + /** + * content length of object + * + * @return length in bytes + */ + long getContentLength(); + + /** + * eTag of object + * + * @return eTag + */ + String getETag(); + + /** + * user metadata of object + * + * @return user metadata + */ + Map getUserMetadata(); +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectKey.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectKey.java new file mode 100644 index 000000000000..2fd91bea3c47 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectKey.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Objects; + +/** + * a immutable record class of object key + */ +public class ObjectKey { + /** + * bucket + */ + private final String bucket; + + /** + * key + */ + private final String key; + + public ObjectKey(String bucket, String key) { + if (bucket == null || key == null) { + throw new IllegalArgumentException(String.format("bucket %s and key %s must be not null", bucket, key)); + } + this.bucket = bucket; + this.key = key; + } + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + @Override + public String toString() { + return "ObjectKey{" + + "bucket='" + bucket + '\'' + + ", key='" + key + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ObjectKey objectKey = (ObjectKey) o; + return Objects.equals(bucket, objectKey.bucket) && Objects.equals(key, objectKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, key); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/ObjectKeys.java b/dell/src/main/java/org/apache/iceberg/dell/ObjectKeys.java new file mode 100644 index 000000000000..11d638c4fd9b --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/ObjectKeys.java @@ -0,0 +1,297 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; + +/** + * operations of {@link ObjectKey} + */ +public interface ObjectKeys { + + /** + * default delimiter + */ + String DELIMITER = "/"; + /** + * default suffix of table metadata object + */ + String TABLE_METADATA_SUFFIX = ".table"; + /** + * default suffix of namespace metadata object + */ + String NAMESPACE_METADATA_SUFFIX = ".namespace"; + + /** + * base key of catalog + * + * @return the base key + */ + ObjectBaseKey getBaseKey(); + + /** + * base key parts for calculate sub key + * + * @return the parts of the base key + */ + default List getBaseKeyParts() { + ObjectBaseKey baseKey = getBaseKey(); + if (baseKey.getBucket() == null) { + return Collections.emptyList(); + } else if (baseKey.getKey() == null) { + return Collections.singletonList(baseKey.getBucket()); + } else { + List parts = new ArrayList<>(); + parts.add(baseKey.getBucket()); + for (String result : Splitter.on(getDelimiter()).split(baseKey.getKey())) { + parts.add(result); + } + if (!checkParts(parts)) { + throw new IllegalArgumentException(String.format("invalid base key %s with delimiter %s", + baseKey, getDelimiter())); + } + return Collections.unmodifiableList(parts); + } + } + + default String getDelimiter() { + return DELIMITER; + } + + default String getNamespaceMetadataSuffix() { + return NAMESPACE_METADATA_SUFFIX; + } + + default String getTableMetadataSuffix() { + return TABLE_METADATA_SUFFIX; + } + + /** + * convert relative parts to object key + * + * @param parts that relative to base key + * @return object key + */ + default ObjectKey getObjectKey(List parts) { + ObjectBaseKey baseKey = getBaseKey(); + if (parts.isEmpty()) { + return baseKey.asKey(); + } + String delimiter = getDelimiter(); + if (parts.stream().anyMatch(it -> it.contains(delimiter))) { + throw new IllegalArgumentException(String.format("delimiter %s in key parts: %s", delimiter, parts)); + } + if (baseKey.getBucket() == null) { + return new ObjectKey(parts.get(0), String.join(delimiter, parts.subList(1, parts.size()))); + } else { + String prefix = baseKey.getKey() == null ? "" : (baseKey.getKey() + delimiter); + return new ObjectKey(baseKey.getBucket(), prefix + String.join(delimiter, parts)); + } + } + + /** + * get metadata key of namespace + * + * @param namespace is a namespace + * @return object key + */ + default ObjectKey getMetadataKey(Namespace namespace) { + if (namespace.isEmpty()) { + return getObjectKey(Collections.singletonList(getNamespaceMetadataSuffix())); + } + // copy namespace levels + List keyParts = new ArrayList<>(Arrays.asList(namespace.levels())); + int lastIndex = keyParts.size() - 1; + keyParts.set(lastIndex, keyParts.get(lastIndex) + getNamespaceMetadataSuffix()); + return getObjectKey(keyParts); + } + + /** + * get prefix key of namespace. + *

+ * The prefix key lack namespace metadata suffix. + * + * @param namespace is a namespace + * @return object prefix + */ + default ObjectKey getPrefix(Namespace namespace) { + return getObjectKey(Arrays.asList(namespace.levels())); + } + + /** + * try to extract namespace from specific key + * + * @param key key + * @param parent is parent namespace + * @return namespace if present + */ + default Optional extractNamespace(ObjectKey key, Namespace parent) { + if (!key.getKey().endsWith(getNamespaceMetadataSuffix())) { + return Optional.empty(); + } + Optional lastPartOpt = extractLastPart(key, parent); + if (!lastPartOpt.isPresent()) { + return Optional.empty(); + } + String lastPart = lastPartOpt.get(); + String namespaceName = lastPart.substring(0, lastPart.length() - getNamespaceMetadataSuffix().length()); + String[] levels = Arrays.copyOf(parent.levels(), parent.levels().length + 1); + levels[levels.length - 1] = namespaceName; + return Optional.of(Namespace.of(levels)); + } + + /** + * get metadata key of namespace + * + * @param tableIdentifier is a table id + * @return object key + */ + default ObjectKey getMetadataKey(TableIdentifier tableIdentifier) { + if (tableIdentifier.hasNamespace()) { + List parts = new ArrayList<>(tableIdentifier.namespace().levels().length + 1); + parts.addAll(Arrays.asList(tableIdentifier.namespace().levels())); + parts.add(tableIdentifier.name() + getTableMetadataSuffix()); + return getObjectKey(parts); + } else { + return getObjectKey(Collections.singletonList(tableIdentifier.name() + getTableMetadataSuffix())); + } + } + + /** + * try to extract table id from specific key + * + * @param key key + * @param namespace is parent namespace + * @return table id if present + */ + default Optional extractTableIdentifier(ObjectKey key, Namespace namespace) { + if (!key.getKey().endsWith(getTableMetadataSuffix())) { + return Optional.empty(); + } + Optional lastPartOpt = extractLastPart(key, namespace); + if (!lastPartOpt.isPresent()) { + return Optional.empty(); + } + String lastPart = lastPartOpt.get(); + String tableName = lastPart.substring(0, lastPart.length() - getTableMetadataSuffix().length()); + return Optional.of(TableIdentifier.of(namespace, tableName)); + } + + /** + * check namespace is expected and extract last part + * + * @param key is object key + * @param expectNamespace is parent namespace + * @return the last part if namespace is matched + */ + default Optional extractLastPart(ObjectKey key, Namespace expectNamespace) { + Optional> partsOpt = subParts(key); + if (!partsOpt.isPresent()) { + return Optional.empty(); + } + List parts = partsOpt.get(); + if (parts.isEmpty()) { + return Optional.empty(); + } + int lastIndex = parts.size() - 1; + Namespace namespace = Namespace.of(parts.subList(0, lastIndex).toArray(new String[] {})); + if (expectNamespace != null && !Objects.equals(expectNamespace, namespace)) { + throw new IllegalArgumentException(String.format("namespace not match: %s != %s", namespace, expectNamespace)); + } + return Optional.of(parts.get(lastIndex)); + } + + /** + * get the relative parts of {@link #getBaseKey()} + * + * @param key is object key + * @return relative parts if this key start with base key + */ + default Optional> subParts(ObjectKey key) { + List parts = new ArrayList<>(); + parts.add(key.getBucket()); + for (String result : Splitter.on(getDelimiter()).split(key.getKey())) { + parts.add(result); + } + if (!checkParts(parts)) { + return Optional.empty(); + } + List baseParts = getBaseKeyParts(); + if (parts.size() < baseParts.size() || !Objects.equals(parts.subList(0, baseParts.size()), baseParts)) { + return Optional.empty(); + } else { + return Optional.of(parts.subList(baseParts.size(), parts.size())); + } + } + + /** + * check all parts are valid. + * + * @param parts is key parts + * @return true if all parts are valid + */ + default boolean checkParts(List parts) { + return parts.stream().noneMatch(String::isEmpty); + } + + /** + * get default warehouse location of table id + * + * @param tableIdentifier is table id + * @return default warehouse location prefix key + */ + default ObjectKey warehouseLocation(TableIdentifier tableIdentifier) { + if (!tableIdentifier.hasNamespace()) { + return getObjectKey(Collections.singletonList(tableIdentifier.name())); + } else { + List parts = new ArrayList<>(tableIdentifier.namespace().levels().length + 1); + parts.addAll(Arrays.asList(tableIdentifier.namespace().levels())); + parts.add(tableIdentifier.name()); + return getObjectKey(parts); + } + } + + /** + * convert key to string + * + * @param key is object key + * @return string key + */ + default String toString(ObjectKey key) { + return key.getBucket() + getDelimiter() + key.getKey(); + } + + /** + * convert string to key + * + * @param key is string key + * @return object key + */ + default ObjectKey parse(String key) { + String[] r = key.split(getDelimiter(), 2); + if (r.length < 2) { + throw new IllegalArgumentException("failed to parse key " + key); + } + return new ObjectKey(r[0], r[1]); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/PropertiesSerDes.java b/dell/src/main/java/org/apache/iceberg/dell/PropertiesSerDes.java new file mode 100644 index 000000000000..18df080aba0d --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/PropertiesSerDes.java @@ -0,0 +1,119 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * convert Map properties to bytes. + */ +public interface PropertiesSerDes { + + /** + * version of properties. + *

+ * this properties is only a placeholder and may use in future. + */ + String CURRENT_VERSION = "0"; + + Logger log = LoggerFactory.getLogger(PropertiesSerDes.class); + + /** + * read properties from stream + * + * @param input is stream + * @return properties + */ + Map read(InputStream input); + + /** + * a utils to read properties from object content. + * + * @param content is content of object + * @param eTag is eTag of object + * @param version is property version (current field is a placeholder for future changes) + * @return properties which loaded by content + */ + default Map readProperties(byte[] content, String eTag, String version) { + Map propertiesInObject = read(new ByteArrayInputStream(content)); + Map properties = new HashMap<>(propertiesInObject); + properties.put(EcsClient.E_TAG_KEY, eTag); + properties.put(EcsClient.PROPERTY_VERSION_KEY, version); + return properties; + } + + /** + * write properties to bytes. + * + * @param value is properties + * @return bytes content + */ + byte[] toBytes(Map value); + + /** + * use {@link Properties} to serialize and deserialize properties. + * + * @return interface + */ + static PropertiesSerDes useJdk() { + return new PropertiesSerDes() { + @Override + public Map read(InputStream input) { + Properties jdkProperties = new Properties(); + try { + jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8)); + } catch (IOException e) { + log.error("fail to read properties", e); + throw new UncheckedIOException(e); + } + Set propertyNames = jdkProperties.stringPropertyNames(); + Map properties = new HashMap<>(); + for (String name : propertyNames) { + properties.put(name, jdkProperties.getProperty(name)); + } + return Collections.unmodifiableMap(properties); + } + + @Override + public byte[] toBytes(Map value) { + Properties jdkProperties = new Properties(); + for (Map.Entry entry : value.entrySet()) { + jdkProperties.setProperty(entry.getKey(), entry.getValue()); + } + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + jdkProperties.store(new OutputStreamWriter(output, StandardCharsets.UTF_8), null); + return output.toByteArray(); + } catch (IOException e) { + log.error("fail to store properties {} to file", value, e); + throw new UncheckedIOException(e); + } + } + }; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/ContentAndETagImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/ContentAndETagImpl.java new file mode 100644 index 000000000000..e2e6dc266de8 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/ContentAndETagImpl.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import org.apache.iceberg.dell.EcsClient; +import org.apache.iceberg.dell.ObjectHeadInfo; + +/** + * a record class of {@link EcsClient.ContentAndETag} + */ +public class ContentAndETagImpl implements EcsClient.ContentAndETag { + + private final ObjectHeadInfo headInfo; + private final byte[] content; + + public ContentAndETagImpl(ObjectHeadInfo headInfo, byte[] content) { + this.headInfo = headInfo; + this.content = content; + } + + @Override + public ObjectHeadInfo getHeadInfo() { + return headInfo; + } + + @Override + public byte[] getContent() { + return content; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/EcsAppendOutputStream.java b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsAppendOutputStream.java new file mode 100644 index 000000000000..1911484bae57 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsAppendOutputStream.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.Headers; +import com.amazonaws.services.s3.model.ObjectMetadata; +import java.io.ByteArrayInputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.iceberg.dell.ObjectKey; + +/** + * use ECS append api to write data + */ +public class EcsAppendOutputStream extends OutputStream { + + private final AmazonS3 s3; + + private final ObjectKey key; + + /** + * local bytes cache that avoid too many requests + *

+ * use {@link ByteBuffer} to maintain offset + */ + private final ByteBuffer localCache; + + private boolean firstPart = true; + + public EcsAppendOutputStream(AmazonS3 s3, ObjectKey key, byte[] localCache) { + this.s3 = s3; + this.key = key; + this.localCache = ByteBuffer.wrap(localCache); + } + + @Override + public void write(int b) { + if (!checkBuffer(1)) { + flush(); + } + localCache.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) { + if (!checkBuffer(len)) { + flush(); + } + if (checkBuffer(len)) { + localCache.put(b, off, len); + } else { + // if content > cache, directly flush itself. + flushBuffer(b, off, len); + } + } + + private boolean checkBuffer(int nextWrite) { + return localCache.remaining() >= nextWrite; + } + + private void flushBuffer(byte[] buffer, int offset, int length) { + ObjectMetadata metadata = new ObjectMetadata(); + if (firstPart) { + firstPart = false; + } else { + // only following parts need append api + metadata.setHeader(Headers.RANGE, "bytes=-1-"); + } + metadata.setContentLength(length); + s3.putObject(key.getBucket(), key.getKey(), new ByteArrayInputStream(buffer, offset, length), metadata); + } + + /** + * flush all cached bytes + */ + @Override + public void flush() { + if (localCache.remaining() < localCache.capacity()) { + localCache.flip(); + flushBuffer(localCache.array(), localCache.arrayOffset(), localCache.remaining()); + localCache.clear(); + } + } + + @Override + public void close() { + // call flush to guarantee all bytes are submitted + flush(); + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/EcsClientImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsClientImpl.java new file mode 100644 index 000000000000..dc22eb897bc3 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/EcsClientImpl.java @@ -0,0 +1,259 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import org.apache.iceberg.dell.EcsClient; +import org.apache.iceberg.dell.ObjectHeadInfo; +import org.apache.iceberg.dell.ObjectKey; +import org.apache.iceberg.dell.ObjectKeys; +import org.apache.iceberg.dell.PropertiesSerDes; + +/** + * the implementation of {@link EcsClient} + *

+ * ECS use aws sdk v1 to support private function. + */ +public class EcsClientImpl implements EcsClient { + + private final AmazonS3 s3; + private final Map properties; + private final ObjectKeys keys; + private final PropertiesSerDes propertiesSerDes; + + public EcsClientImpl( + AmazonS3 s3, + Map properties, + ObjectKeys keys, + PropertiesSerDes propertiesSerDes) { + this.s3 = s3; + this.properties = Collections.unmodifiableMap(new LinkedHashMap<>(properties)); + this.keys = keys; + this.propertiesSerDes = propertiesSerDes; + } + + @Override + public ObjectKeys getKeys() { + return keys; + } + + @Override + public PropertiesSerDes getPropertiesSerDes() { + return propertiesSerDes; + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public Optional head(ObjectKey key) { + try { + ObjectMetadata metadata = s3.getObjectMetadata(key.getBucket(), key.getKey()); + return Optional.of(new ObjectHeadInfoImpl( + metadata.getContentLength(), + metadata.getETag(), + metadata.getUserMetadata())); + } catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + return Optional.empty(); + } else { + throw e; + } + } + } + + @Override + public InputStream inputStream(ObjectKey key, long pos) { + S3Object object = s3.getObject(new GetObjectRequest(key.getBucket(), key.getKey()) + .withRange(pos)); + return object.getObjectContent(); + } + + @Override + public OutputStream outputStream(ObjectKey key) { + return new EcsAppendOutputStream(s3, key, new byte[1_000]); + } + + @Override + public ContentAndETag readAll(ObjectKey key) { + S3Object object = s3.getObject(new GetObjectRequest(key.getBucket(), key.getKey())); + ObjectMetadata metadata = object.getObjectMetadata(); + int size = (int) metadata.getContentLength(); + byte[] content = new byte[size]; + try (S3ObjectInputStream input = object.getObjectContent()) { + int offset = 0; + while (offset < size) { + offset += input.read(content, offset, size - offset); + } + if (offset != size) { + throw new IllegalStateException(String.format( + "size of %s is unmatched, current size %d != %d", + key, offset, size)); + } + } catch (IOException e) { + throw new UncheckedIOException("rethrow unchecked exception during read all bytes", e); + } + ObjectHeadInfoImpl headInfo = new ObjectHeadInfoImpl( + metadata.getContentLength(), + metadata.getETag(), + metadata.getUserMetadata()); + return new ContentAndETagImpl(headInfo, content); + } + + /** + * using If-Match to replace object with eTag + * + * @param key is object key + * @param eTag is e-tag + * @param bytes is content + * @param userMetadata is user metadata stored in object metadata + * @return true if replace success + */ + @Override + public boolean replace(ObjectKey key, String eTag, byte[] bytes, Map userMetadata) { + return cas(() -> { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setHeader("If-Match", eTag); + metadata.setContentLength(bytes.length); + metadata.setUserMetadata(userMetadata); + s3.putObject(key.getBucket(), key.getKey(), new ByteArrayInputStream(bytes), metadata); + }); + } + + /** + * using If-None-Match to write object + * + * @param key is object key + * @param bytes is content + * @param userMetadata is user metadata stored in object metadata + * @return true if object is absent when write object + */ + @Override + public boolean writeIfAbsent(ObjectKey key, byte[] bytes, Map userMetadata) { + return cas(() -> { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setHeader("If-None-Match", "*"); + metadata.setContentLength(bytes.length); + metadata.setUserMetadata(userMetadata); + s3.putObject(key.getBucket(), key.getKey(), new ByteArrayInputStream(bytes), metadata); + }); + } + + /** + * using x-amz-copy-source-if-match and If-None-Match to copy object + * + * @param fromKey is source key + * @param eTag is E-Tag of source key + * @param toKey is destination key + * @return true if object is copied to destination + */ + @Override + public boolean copyObjectIfAbsent(ObjectKey fromKey, String eTag, ObjectKey toKey) { + return cas(() -> { + CopyObjectRequest request = new CopyObjectRequest( + fromKey.getBucket(), + fromKey.getKey(), + toKey.getBucket(), + toKey.getKey()); + request.setMatchingETagConstraints(Collections.singletonList(eTag)); + request.putCustomRequestHeader("If-None-Match", "*"); + s3.copyObject(request); + }); + } + + /** + * cas error code + * + * @param fn is function that sending request + * @return true if cas operation succeed + */ + private boolean cas(Runnable fn) { + try { + fn.run(); + return true; + } catch (AmazonS3Exception e) { + if ("PreconditionFailed".equals(e.getErrorCode())) { + return false; + } else { + throw e; + } + } + } + + @Override + public void deleteObject(ObjectKey key) { + s3.deleteObject(key.getBucket(), key.getKey()); + } + + @Override + public List listDelimiterAll(ObjectKey prefix, Function> filterAndMapper) { + String delimiter = getKeys().getDelimiter(); + List result = new ArrayList<>(); + String prefixKey; + if (prefix.getKey().isEmpty()) { + prefixKey = ""; + } else if (prefix.getKey().endsWith(delimiter)) { + prefixKey = prefix.getKey(); + } else { + prefixKey = prefix.getKey() + delimiter; + } + String continuationToken = null; + do { + ListObjectsV2Result response = s3.listObjectsV2( + new ListObjectsV2Request() + .withDelimiter(delimiter) + .withPrefix(prefixKey) + .withContinuationToken(continuationToken)); + continuationToken = response.getNextContinuationToken(); + for (S3ObjectSummary objectSummary : response.getObjectSummaries()) { + Optional itemOpt = filterAndMapper.apply(new ObjectKey( + objectSummary.getBucketName(), + objectSummary.getKey())); + if (!itemOpt.isPresent()) { + continue; + } + result.add(itemOpt.get()); + } + } while (continuationToken != null); + return result; + } + + @Override + public void close() { + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectHeadInfoImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectHeadInfoImpl.java new file mode 100644 index 000000000000..6361d21fd826 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectHeadInfoImpl.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import java.util.Collections; +import java.util.Map; +import org.apache.iceberg.dell.ObjectHeadInfo; + +/** + * a record class of {@link ObjectHeadInfo} + */ +public class ObjectHeadInfoImpl implements ObjectHeadInfo { + private final long contentLength; + private final String eTag; + private final Map userMetadata; + + public ObjectHeadInfoImpl(long contentLength, String eTag, Map userMetadata) { + this.contentLength = contentLength; + this.eTag = eTag; + this.userMetadata = Collections.unmodifiableMap(userMetadata); + } + + @Override + public long getContentLength() { + return contentLength; + } + + @Override + public String getETag() { + return eTag; + } + + @Override + public Map getUserMetadata() { + return userMetadata; + } + + @Override + public String toString() { + return "ObjectHeadInfoImpl{" + + "contentLength=" + contentLength + + ", eTag='" + eTag + '\'' + + ", userMetadata=" + userMetadata + + '}'; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectKeysImpl.java b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectKeysImpl.java new file mode 100644 index 000000000000..977dbdeb1bd1 --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/impl/ObjectKeysImpl.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.impl; + +import java.util.List; +import org.apache.iceberg.dell.ObjectBaseKey; +import org.apache.iceberg.dell.ObjectKeys; + +/** + * impl of {@link ObjectKeys} + */ +public class ObjectKeysImpl implements ObjectKeys { + + private final ObjectBaseKey baseKey; + + /** + * lazy results of {@link #getBaseKeyParts()} + */ + private volatile List lazyBaseKeyParts; + + public ObjectKeysImpl(ObjectBaseKey baseKey) { + this.baseKey = baseKey; + } + + @Override + public ObjectBaseKey getBaseKey() { + return baseKey; + } + + /** + * use field to cache result parts. + * + * @return cached base key parts + */ + @Override + public List getBaseKeyParts() { + // code isn't full thread safe. but creating instances in multiple times is fine + if (lazyBaseKeyParts == null) { + lazyBaseKeyParts = ObjectKeys.super.getBaseKeyParts(); + } + return lazyBaseKeyParts; + } +} diff --git a/dell/src/main/java/org/apache/iceberg/dell/utils/PositionOutputStreamAdapter.java b/dell/src/main/java/org/apache/iceberg/dell/utils/PositionOutputStreamAdapter.java new file mode 100644 index 000000000000..81ae5f67b18e --- /dev/null +++ b/dell/src/main/java/org/apache/iceberg/dell/utils/PositionOutputStreamAdapter.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell.utils; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.iceberg.io.PositionOutputStream; + +/** + * Adapter of {@link PositionOutputStream} and {@link OutputStream} + */ +public class PositionOutputStreamAdapter extends PositionOutputStream { + + private final OutputStream internal; + private long pos = 0; + + public PositionOutputStreamAdapter(OutputStream internal) { + this.internal = internal; + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void write(int b) throws IOException { + internal.write(b); + pos += 1; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + internal.write(b, off, len); + pos += len; + } + + @Override + public void flush() throws IOException { + internal.flush(); + } + + @Override + public void close() throws IOException { + internal.close(); + } +} diff --git a/dell/src/test/java/org/apache/iceberg/dell/EcsCatalogTest.java b/dell/src/test/java/org/apache/iceberg/dell/EcsCatalogTest.java new file mode 100644 index 000000000000..d4b9cb24a49b --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/EcsCatalogTest.java @@ -0,0 +1,246 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.types.Types; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class EcsCatalogTest { + /** + * ecs catalog + */ + private EcsCatalog ecsCatalog; + + // namespaces + private Namespace namespaceL1 = Namespace.of("namespace"); + private Namespace namespaceL2 = Namespace.of("namespace", "namespace"); + private Namespace namespaceL2N2 = Namespace.of("namespace", "namespace2"); + + // tables + private TableIdentifier tableIdL0 = TableIdentifier.of("table"); + private TableIdentifier tableIdL1 = TableIdentifier.of(namespaceL1, "table"); + private TableIdentifier tableIdL2 = TableIdentifier.of(namespaceL2, "table"); + private TableIdentifier tableIdL2T2 = TableIdentifier.of(namespaceL2, "table2"); + + @Before + public void init() { + ecsCatalog = new EcsCatalog(); + + Map properties = new HashMap<>(); + properties.put(EcsCatalogProperties.ECS_CLIENT_FACTORY, "org.apache.iceberg.dell.MemoryEcsClient#create"); + properties.put(EcsCatalogProperties.BASE_KEY, "test"); + ecsCatalog.initialize("test", properties); + } + + public void prepareData() { + ecsCatalog.createNamespace(namespaceL1); + ecsCatalog.createNamespace(namespaceL2); + ecsCatalog.createNamespace(namespaceL2N2); + + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + + ecsCatalog.createTable(tableIdL0, schema); + ecsCatalog.createTable(tableIdL1, schema); + ecsCatalog.createTable(tableIdL2, schema); + ecsCatalog.createTable(tableIdL2T2, schema); + } + + @Test + public void listTables() { + assertTrue("no table in empty namespace", ecsCatalog.listTables(Namespace.empty()).isEmpty()); + + prepareData(); + + assertEquals( + "tables in empty namespace", + Collections.singletonList(tableIdL0), + ecsCatalog.listTables(Namespace.empty())); + + assertEquals( + "tables in l1 namespace", + Collections.singletonList(tableIdL1), + ecsCatalog.listTables(namespaceL1)); + + assertEquals( + "tables in l2 namespace", + Arrays.asList(tableIdL2, tableIdL2T2), + ecsCatalog.listTables(namespaceL2)); + } + + @Test(expected = NoSuchNamespaceException.class) + public void listTables_invalidInput() { + ecsCatalog.listTables(Namespace.of("l1")); + } + + @Test + public void dropTable() { + prepareData(); + + assertTrue("drop table success", ecsCatalog.dropTable(tableIdL0)); + assertFalse("re-drop table failed", ecsCatalog.dropTable(tableIdL0)); + + assertTrue("no table in empty namespace", ecsCatalog.listTables(Namespace.empty()).isEmpty()); + } + + @Test + public void renameTable() { + prepareData(); + TableIdentifier toTable = TableIdentifier.of(namespaceL2, "to"); + + ecsCatalog.renameTable(tableIdL0, toTable); + + assertTrue("no table in empty namespace", ecsCatalog.listTables(Namespace.empty()).isEmpty()); + + assertEquals( + "tables in l2 namespace", + Arrays.asList(tableIdL2, tableIdL2T2, toTable), + ecsCatalog.listTables(namespaceL2)); + } + + @Test(expected = NoSuchTableException.class) + public void renameTable_invalidFrom() { + ecsCatalog.renameTable(tableIdL0, tableIdL1); + } + + @Test(expected = AlreadyExistsException.class) + public void renameTable_invalidTo() { + prepareData(); + + ecsCatalog.renameTable(tableIdL0, tableIdL1); + } + + @Test(expected = AlreadyExistsException.class) + public void createNamespace_empty() { + ecsCatalog.createNamespace(Namespace.empty()); + } + + @Test(expected = AlreadyExistsException.class) + public void createNamespace_invalidInput() { + prepareData(); + + ecsCatalog.createNamespace(namespaceL1); + } + + @Test + public void listNamespaces() { + assertTrue("no namespace in empty namespace", ecsCatalog.listNamespaces().isEmpty()); + assertTrue("no namespace in empty namespace", ecsCatalog.listNamespaces(Namespace.empty()).isEmpty()); + + prepareData(); + + assertEquals( + "namespaces in empty namespace", + Collections.singletonList(namespaceL1), + ecsCatalog.listNamespaces(Namespace.empty())); + + assertEquals( + "namespace in l1 namespace", + Arrays.asList(namespaceL2, namespaceL2N2), + ecsCatalog.listNamespaces(namespaceL1)); + } + + @Test(expected = NoSuchNamespaceException.class) + public void listNamespaces_invalidInput() { + ecsCatalog.listNamespaces(Namespace.of("l1")); + } + + @Test + public void loadNamespaceMetadata() { + assertEquals("empty properties of empty namespace", Collections.emptyMap(), + ecsCatalog.loadNamespaceMetadata(Namespace.empty())); + + ecsCatalog.createNamespace(namespaceL1, Collections.singletonMap("k1", "v1")); + + Map resultMetadata = ecsCatalog.loadNamespaceMetadata(namespaceL1); + assertEquals("metadata contains k1", "v1", resultMetadata.get("k1")); + assertTrue("metadata contains eTag", resultMetadata.containsKey(EcsClient.E_TAG_KEY)); + } + + @Test(expected = NoSuchNamespaceException.class) + public void loadNamespaceMetadata_invalidInput() { + ecsCatalog.loadNamespaceMetadata(namespaceL1); + } + + @Test + public void dropNamespace() { + assertFalse("can't drop empty namespace", ecsCatalog.dropNamespace(Namespace.empty())); + + ecsCatalog.createNamespace(namespaceL1); + + assertEquals( + "namespaces in empty namespace", + Collections.singletonList(namespaceL1), + ecsCatalog.listNamespaces(Namespace.empty())); + + assertTrue("drop namespace", ecsCatalog.dropNamespace(namespaceL1)); + + assertTrue( + "no namespace in empty namespace", + ecsCatalog.listNamespaces(Namespace.empty()).isEmpty()); + } + + @Test(expected = NamespaceNotEmptyException.class) + public void dropNamespace_invalidInput() { + prepareData(); + + ecsCatalog.dropNamespace(namespaceL1); + } + + @Test + public void setProperties() { + ecsCatalog.createNamespace(namespaceL1, Collections.singletonMap("k1", "v1")); + + assertTrue( + "set properties", + ecsCatalog.setProperties(namespaceL1, Collections.singletonMap("k2", "v2"))); + + Map resultMap = ecsCatalog.loadNamespaceMetadata(namespaceL1); + assertEquals("metadata contains k1", "v1", resultMap.get("k1")); + assertEquals("metadata contains k2", "v2", resultMap.get("k2")); + } + + @Test + public void removeProperties() { + Map inputMetadata = new HashMap<>(); + inputMetadata.put("k1", "v1"); + inputMetadata.put("k2", "v2"); + ecsCatalog.createNamespace(namespaceL1, inputMetadata); + + assertTrue("remove properties", ecsCatalog.removeProperties(namespaceL1, Collections.singleton("k1"))); + + Map resultMap = ecsCatalog.loadNamespaceMetadata(namespaceL1); + assertFalse("k1 is removed", resultMap.containsKey("k1")); + assertEquals("metadata contains k2", "v2", resultMap.get("k2")); + } +} \ No newline at end of file diff --git a/dell/src/test/java/org/apache/iceberg/dell/EcsTableOperationsTest.java b/dell/src/test/java/org/apache/iceberg/dell/EcsTableOperationsTest.java new file mode 100644 index 000000000000..7cf636b79a21 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/EcsTableOperationsTest.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class EcsTableOperationsTest { + + /** + * table id + */ + private final TableIdentifier id = TableIdentifier.of("test"); + /** + * table schema + */ + private final Schema schema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + /** + * data file + */ + private final DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/test/data-a.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + /** + * ecs catalog + */ + private EcsCatalog ecsCatalog; + /** + * ecs table operations + */ + private EcsTableOperations ecsTableOperations; + + @Before + public void init() { + ecsCatalog = new EcsCatalog(); + + Map properties = new HashMap<>(); + properties.put(EcsCatalogProperties.ECS_CLIENT_FACTORY, "org.apache.iceberg.dell.MemoryEcsClient#create"); + properties.put(EcsCatalogProperties.BASE_KEY, "test"); + ecsCatalog.initialize("test", properties); + + ecsTableOperations = (EcsTableOperations) ecsCatalog.newTableOps(id); + } + + @Test + public void testTableOperations() { + assertNull("start up, null table", ecsTableOperations.refresh()); + + // create table -> version 0 + Table table = ecsCatalog.createTable(id, schema); + + // verify + assertEquals("current table version is -1", -1, + ((EcsTableOperations) ((HasTableOperations) table).operations()).currentVersion()); + + // append data file -> version 1 + table.newFastAppend().appendFile(dataFile).commit(); + + // verify + assertEquals("current table version is 1", 1, + ((EcsTableOperations) ((HasTableOperations) table).operations()).currentVersion()); + + // delete data file -> version 2 + table.newDelete().deleteFile(dataFile).commit(); + + assertEquals("current table has 2 history entries", 2, table.history().size()); + assertEquals("current table version is 2", 2, + ((EcsTableOperations) ((HasTableOperations) table).operations()).currentVersion()); + } +} \ No newline at end of file diff --git a/dell/src/test/java/org/apache/iceberg/dell/MemoryEcsClient.java b/dell/src/test/java/org/apache/iceberg/dell/MemoryEcsClient.java new file mode 100644 index 000000000000..d5da5a7265e6 --- /dev/null +++ b/dell/src/test/java/org/apache/iceberg/dell/MemoryEcsClient.java @@ -0,0 +1,207 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.dell; + +import com.amazonaws.util.Md5Utils; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.dell.impl.ContentAndETagImpl; +import org.apache.iceberg.dell.impl.ObjectHeadInfoImpl; +import org.apache.iceberg.dell.impl.ObjectKeysImpl; + +public class MemoryEcsClient implements EcsClient { + + private final ObjectBaseKey baseKey; + + private final ConcurrentMap data = new ConcurrentHashMap<>(); + + public static EcsClient create(Map properties) { + return new MemoryEcsClient(EcsCatalogProperties.getObjectBaseKey(properties)); + } + + public MemoryEcsClient(ObjectBaseKey baseKey) { + this.baseKey = baseKey; + } + + @Override + public ObjectKeys getKeys() { + return new ObjectKeysImpl(baseKey); + } + + @Override + public Map getProperties() { + return null; + } + + @Override + public Optional head(ObjectKey key) { + return Optional.ofNullable(data.get(key)).map(EcsObject::getHeadInfo); + } + + @Override + public InputStream inputStream(ObjectKey key, long pos) { + EcsObject object = data.get(key); + byte[] content = object.getContent(); + return new ByteArrayInputStream(content, (int) pos, content.length); + } + + @Override + public OutputStream outputStream(ObjectKey key) { + return new WrappedOutputStream(key); + } + + @Override + public ContentAndETag readAll(ObjectKey key) { + return data.get(key).getContentAndETag(); + } + + @Override + public boolean replace(ObjectKey key, String eTag, byte[] bytes, Map userMetadata) { + EcsObject original = data.get(key); + if (original == null) { + return false; + } + if (!original.getHeadInfo().getETag().equals(eTag)) { + return false; + } + return data.replace(key, original, EcsObject.create(bytes, userMetadata)); + } + + @Override + public boolean writeIfAbsent(ObjectKey key, byte[] bytes, Map userMetadata) { + return data.putIfAbsent(key, EcsObject.create(bytes, userMetadata)) == null; + } + + @Override + public boolean copyObjectIfAbsent(ObjectKey fromKey, String eTag, ObjectKey toKey) { + EcsObject original = data.get(fromKey); + if (original == null) { + return false; + } + if (!original.getHeadInfo().getETag().equals(eTag)) { + return false; + } + return data.putIfAbsent(toKey, original) == null; + } + + @Override + public void deleteObject(ObjectKey key) { + data.remove(key); + } + + @Override + public List listDelimiterAll(ObjectKey prefix, Function> filterAndMapper) { + String delimiter = getKeys().getDelimiter(); + String prefixKey; + if (prefix.getKey().isEmpty()) { + prefixKey = ""; + } else if (prefix.getKey().endsWith(delimiter)) { + prefixKey = prefix.getKey(); + } else { + prefixKey = prefix.getKey() + delimiter; + } + int prefixLength = prefixKey.length(); + return data.keySet().stream() + .filter(key -> { + if (!Objects.equals(key.getBucket(), prefix.getBucket())) { + return false; + } + if (!key.getKey().startsWith(prefixKey)) { + return false; + } + return key.getKey().indexOf(delimiter, prefixLength) < 0; + }) + .sorted(Comparator.comparing(ObjectKey::getBucket).thenComparing(ObjectKey::getKey)) + .flatMap(key -> filterAndMapper.apply(key).map(Stream::of).orElse(Stream.empty())) + .collect(Collectors.toList()); + } + + @Override + public EcsClient copy() { + return this; + } + + @Override + public void close() { + } + + public static class EcsObject { + + private final ObjectHeadInfo headInfo; + private final byte[] content; + + public EcsObject(ObjectHeadInfo headInfo, byte[] content) { + this.headInfo = headInfo; + this.content = content; + } + + public ObjectHeadInfo getHeadInfo() { + return headInfo; + } + + public byte[] getContent() { + return Arrays.copyOf(content, content.length); + } + + public ContentAndETag getContentAndETag() { + return new ContentAndETagImpl(getHeadInfo(), getContent()); + } + + public static EcsObject create(byte[] content, Map userMetadata) { + return new EcsObject( + new ObjectHeadInfoImpl(content.length, Md5Utils.md5AsBase64(content), userMetadata), + content); + } + } + + public class WrappedOutputStream extends OutputStream { + + private final ObjectKey key; + private final ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream(); + + public WrappedOutputStream(ObjectKey key) { + this.key = key; + } + + @Override + public void write(int b) { + byteArrayOutput.write(b); + } + + @Override + public void write(byte[] b, int off, int len) { + byteArrayOutput.write(b, off, len); + } + + @Override + public void close() { + data.put(key, EcsObject.create(byteArrayOutput.toByteArray(), Collections.emptyMap())); + } + } +} diff --git a/settings.gradle b/settings.gradle index 037bdf802385..fb4373e423c3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -38,6 +38,7 @@ include 'spark3-runtime' include 'pig' include 'hive-metastore' include 'nessie' +include 'dell' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -59,6 +60,7 @@ project(':spark3-runtime').name = 'iceberg-spark3-runtime' project(':pig').name = 'iceberg-pig' project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' +project(':dell').name = 'iceberg-dell' if (JavaVersion.current() == JavaVersion.VERSION_1_8) { include 'spark2' diff --git a/versions.props b/versions.props index ceee38ac3f15..00a9d441f62c 100644 --- a/versions.props +++ b/versions.props @@ -18,6 +18,7 @@ org.apache.arrow:arrow-vector = 2.0.0 org.apache.arrow:arrow-memory-netty = 2.0.0 com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 software.amazon.awssdk:* = 2.15.7 +com.amazonaws:* = 1.11.974 org.scala-lang:scala-library = 2.12.10 org.projectnessie:* = 0.5.1 javax.ws.rs:javax.ws.rs-api = 2.1.1