Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,22 @@ Property Name Description
Otherwise, it will be ignored.
======================================================= ============================================================= ============

Configure the `Amazon S3 <https://prestodb.io/docs/current/connector/hive.html#amazon-s3-configuration>`_
properties to specify a S3 location as the warehouse directory for the Hadoop catalog. This way, both metadata and data
of Iceberg tables are stored in S3. An example configuration includes:

.. code-block:: none

connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=s3://iceberg_bucket/warehouse

hive.s3.use-instance-credentials=false
hive.s3.aws-access-key=accesskey
hive.s3.aws-secret-key=secretkey
hive.s3.endpoint=http://192.168.0.103:9878
hive.s3.path-style-access=true

Configuration Properties
------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -287,9 +286,19 @@ else if (table.getTableType().equals(EXTERNAL_TABLE)) {
}

if (!table.getTableType().equals(VIRTUAL_VIEW)) {
File location = new File(new Path(table.getStorage().getLocation()).toUri());
checkArgument(location.isDirectory(), "Table location is not a directory: %s", location);
checkArgument(location.exists(), "Table directory does not exist: %s", location);
try {
Path tableLocation = new Path(table.getStorage().getLocation());
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation);
if (!fileSystem.isDirectory(tableLocation)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Table location is not a directory: " + tableLocation);
}
if (!fileSystem.exists(tableLocation)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Table directory does not exist: " + tableLocation);
}
}
catch (IOException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Could not validate table location", e);
}
}

writeSchemaFile("table", tableMetadataDirectory, tableCodec, new TableMetadata(table), false);
Expand Down Expand Up @@ -1168,25 +1177,13 @@ private synchronized void setTablePrivileges(
requireNonNull(tableName, "tableName is null");
requireNonNull(privileges, "privileges is null");

try {
Table table = getRequiredTable(metastoreContext, databaseName, tableName);

Path permissionsDirectory = getPermissionsDirectory(table);

metadataFileSystem.mkdirs(permissionsDirectory);
if (!metadataFileSystem.isDirectory(permissionsDirectory)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Could not create permissions directory");
}
Table table = getRequiredTable(metastoreContext, databaseName, tableName);

Path permissionFilePath = getPermissionsPath(permissionsDirectory, grantee);
List<PermissionMetadata> permissions = privileges.stream()
.map(PermissionMetadata::new)
.collect(toList());
writeFile("permissions", permissionFilePath, permissionsCodec, permissions, true);
}
catch (IOException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e);
}
Path permissionFilePath = getPermissionsPath(getPermissionsDirectory(table), grantee);
List<PermissionMetadata> permissions = privileges.stream()
.map(PermissionMetadata::new)
.collect(toList());
writeFile("permissions", permissionFilePath, permissionsCodec, permissions, true);
}

private Set<TableConstraint> readConstraintsFile(String databaseName, String tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class HiveS3Module
{
private static final String EMR_FS_CLASS_NAME = "com.amazon.ws.emr.hadoop.fs.EmrFileSystem";

private final String connectorId;
protected final String connectorId;

public HiveS3Module(String connectorId)
{
Expand Down Expand Up @@ -80,7 +80,7 @@ public AWSSecurityMappingsSupplier provideAWSSecurityMappingsSupplier(AWSSecurit
return new AWSSecurityMappingsSupplier(config.getConfigFile(), config.getRefreshPeriod());
}

private void bindSecurityMapping(Binder binder)
protected void bindSecurityMapping(Binder binder)
{
if (buildConfigObject(AWSSecurityMappingConfig.class).getConfigFile().isPresent() &&
buildConfigObject(AWSSecurityMappingConfig.class).getMappingType().equals(S3)) {
Expand All @@ -89,7 +89,7 @@ private void bindSecurityMapping(Binder binder)
}
}

private static void validateEmrFsClass()
protected static void validateEmrFsClass()
{
// verify that the class exists
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,16 @@ public class PrestoS3FileSystem
private static final String DIRECTORY_SUFFIX = "_$folder$";
private static final DataSize BLOCK_SIZE = new DataSize(32, MEGABYTE);
private static final DataSize MAX_SKIP_SIZE = new DataSize(1, MEGABYTE);
private static final String PATH_SEPARATOR = "/";
protected static final String PATH_SEPARATOR = "/";
private static final Duration BACKOFF_MIN_SLEEP = new Duration(1, SECONDS);
private static final int HTTP_RANGE_NOT_SATISFIABLE = 416;
private static final MediaType X_DIRECTORY_MEDIA_TYPE = MediaType.create("application", "x-directory");
private static final MediaType OCTET_STREAM_MEDIA_TYPE = MediaType.create("application", "octet-stream");
private static final Set<String> GLACIER_STORAGE_CLASSES = ImmutableSet.of(Glacier.toString(), DeepArchive.toString());

private URI uri;
protected URI uri;
private Path workingDirectory;
private AmazonS3 s3;
protected AmazonS3 s3;
private AWSCredentialsProvider credentialsProvider;
private File stagingDirectory;
private int maxAttempts;
Expand Down Expand Up @@ -396,8 +396,7 @@ private static boolean isDirectory(PrestoS3ObjectMetadata metadata)
}

return mediaType.is(X_DIRECTORY_MEDIA_TYPE) ||
(mediaType.is(OCTET_STREAM_MEDIA_TYPE)
&& metadata.isKeyNeedsPathSeparator()
(metadata.isKeyNeedsPathSeparator()
&& objectMetadata.getContentLength() == 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;

import static com.facebook.airlift.testing.Closeables.closeAllRuntimeException;
import static com.facebook.presto.hive.containers.HiveMinIODataLake.EMPTY_DIR;
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -134,6 +135,21 @@ public void testInsertOverwritePartitionedAndBucketedExternalTable()
assertOverwritePartition(externalTableName);
}

@Test
public void testCreateExternalTableOnEmptyS3Directory()
{
String testTable = getTestTableName();
String tableName = testTable.substring(testTable.lastIndexOf('.') + 1);
computeActual(getCreateTableStatement(
tableName,
"partitioned_by=ARRAY['regionkey']",
"bucketed_by = ARRAY['nationkey']",
"bucket_count = 3",
format("external_location = 's3a://%s/%s'", this.bucketName, EMPTY_DIR)));
MaterializedResult materializedRows = computeActual("select * from " + tableName);
assertEquals(materializedRows.getRowCount(), 0);
}

protected void assertOverwritePartition(String testTable)
{
computeActual(format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.hive.containers.HiveHadoopContainer.HIVE3_IMAGE;
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
import static java.util.Objects.requireNonNull;
import static org.testcontainers.containers.Network.newNetwork;

public class HiveMinIODataLake
implements Closeable
{
public static final String EMPTY_DIR = "test-empty-dir-" + randomTableSuffix() + "/";
public static final String ACCESS_KEY = "accesskey";
public static final String SECRET_KEY = "secretkey";

Expand Down Expand Up @@ -99,6 +101,7 @@ public void start()
new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY)))
.build();
s3Client.createBucket(this.bucketName);
s3Client.putObject(this.bucketName, EMPTY_DIR, "");
}
finally {
isStarted.set(true);
Expand Down
22 changes: 22 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,28 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.facebook.presto.hive.authentication.HiveAuthenticationModule;
import com.facebook.presto.hive.gcs.HiveGcsModule;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.s3.HiveS3Module;
import com.facebook.presto.iceberg.s3.IcebergS3Module;
import com.facebook.presto.plugin.base.security.AllowAllAccessControl;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PageIndexerFactory;
Expand Down Expand Up @@ -81,7 +81,7 @@ public static Connector createConnector(
new JsonModule(),
new IcebergCommonModule(catalogName),
new IcebergCatalogModule(catalogName, metastore),
new HiveS3Module(catalogName),
new IcebergS3Module(catalogName),
new HiveGcsModule(),
new HiveAuthenticationModule(),
new CachingModule(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.s3;

import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.HiveS3Module;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
import com.facebook.presto.hive.s3.PrestoS3FileSystemStats;
import com.facebook.presto.hive.s3.S3ConfigurationUpdater;
import com.facebook.presto.hive.s3.S3FileSystemType;
import com.google.inject.Binder;
import com.google.inject.Scopes;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.ObjectNames.generatedNameOf;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergS3Module
extends HiveS3Module
{
public IcebergS3Module(String connectorId)
{
super(connectorId);
}

@Override
protected void setup(Binder binder)
{
S3FileSystemType type = buildConfigObject(HiveClientConfig.class).getS3FileSystemType();
if (type == S3FileSystemType.PRESTO) {
bindSecurityMapping(binder);

binder.bind(S3ConfigurationUpdater.class).to(PrestoIcebergS3ConfigurationUpdater.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(HiveS3Config.class);

binder.bind(PrestoS3FileSystemStats.class).toInstance(PrestoS3FileSystem.getFileSystemStats());
newExporter(binder).export(PrestoS3FileSystemStats.class).as(generatedNameOf(PrestoS3FileSystem.class, connectorId));
}
else if (type == S3FileSystemType.EMRFS) {
validateEmrFsClass();
binder.bind(S3ConfigurationUpdater.class).to(EmrFsS3ConfigurationUpdater.class).in(Scopes.SINGLETON);
}
else if (type == S3FileSystemType.HADOOP_DEFAULT) {
// configuration is done using Hadoop configuration files
binder.bind(S3ConfigurationUpdater.class).to(HadoopDefaultConfigurationUpdater.class).in(Scopes.SINGLETON);
}
else {
throw new RuntimeException("Unknown file system type: " + type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.s3;

import com.amazonaws.AmazonClientException;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;

public class PrestoIcebergNativeS3FileSystem
extends PrestoS3FileSystem
{
@Override
public boolean mkdirs(Path f, FsPermission permission)
{
try {
s3.putObject(getBucketName(uri), keyFromPath(f) + PATH_SEPARATOR, "");
return true;
}
catch (AmazonClientException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.s3;

import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater;
import com.facebook.presto.iceberg.IcebergConfig;
import org.apache.hadoop.conf.Configuration;

import javax.inject.Inject;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;

public class PrestoIcebergS3ConfigurationUpdater
extends PrestoS3ConfigurationUpdater
{
private final IcebergConfig icebergConfig;

@Inject
public PrestoIcebergS3ConfigurationUpdater(HiveS3Config config, IcebergConfig icebergConfig)
{
super(config);
this.icebergConfig = icebergConfig;
}

@Override
public void updateConfiguration(Configuration config)
{
super.updateConfiguration(config);

if (this.icebergConfig.getCatalogType().equals(HADOOP)) {
// re-map filesystem schemes to match Amazon Elastic MapReduce
config.set("fs.s3.impl", PrestoIcebergNativeS3FileSystem.class.getName());
config.set("fs.s3a.impl", PrestoIcebergNativeS3FileSystem.class.getName());
config.set("fs.s3n.impl", PrestoIcebergNativeS3FileSystem.class.getName());
}
}
}
Loading
Loading