Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

<properties>
<dynamodb-local.version>1.15.0</dynamodb-local.version>
<moto.version>latest</moto.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -261,6 +262,21 @@
</wait>
</run>
</image>
<image>
<name>motoserver/moto:${moto.version}</name>
<alias>it-aws</alias>
<run>
<ports>
<port>${moto.port}:${moto.port}</port>
</ports>
<wait>
<http>
<url>${moto.endpoint}/moto-api/</url>
</http>
<time>10000</time>
</wait>
</run>
</image>
</images>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CollectionUtils;
Expand All @@ -28,7 +29,9 @@
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.GlueAsyncClientBuilder;
import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse;
Expand Down Expand Up @@ -66,6 +69,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -84,6 +89,8 @@
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_METADATA_FILE_LISTING;
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS;
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS_ENABLE;
import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_ENDPOINT;
import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_REGION;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
Expand All @@ -104,7 +111,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
private static final Logger LOG = LoggerFactory.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private static final int MAX_DELETE_PARTITIONS_PER_REQUEST = 25;
private final GlueAsyncClient awsGlue;
protected final GlueAsyncClient awsGlue;
private static final String GLUE_PARTITION_INDEX_ENABLE = "partition_filtering.enabled";
private static final int PARTITION_INDEX_MAX_NUMBER = 3;
/**
Expand All @@ -119,9 +126,17 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {

public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
this.awsGlue = GlueAsyncClient.builder()
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(config.getProps()))
.build();
try {
GlueAsyncClientBuilder awsGlueBuilder = GlueAsyncClient.builder()
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(config.getProps()));
awsGlueBuilder = config.getString(AWS_GLUE_ENDPOINT) == null ? awsGlueBuilder :
awsGlueBuilder.endpointOverride(new URI(config.getString(AWS_GLUE_ENDPOINT)));
awsGlueBuilder = config.getString(AWS_GLUE_REGION) == null ? awsGlueBuilder :
awsGlueBuilder.region(Region.of(config.getString(AWS_GLUE_REGION)));
this.awsGlue = awsGlueBuilder.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.skipTableArchive = config.getBooleanOrDefault(GlueCatalogSyncClientConfig.GLUE_SKIP_TABLE_ARCHIVE);
this.enableMetadataTable = Boolean.toString(config.getBoolean(GLUE_METADATA_FILE_LISTING)).toUpperCase();
Expand All @@ -130,25 +145,42 @@ public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
@Override
public List<Partition> getAllPartitions(String tableName) {
try {
List<Partition> partitions = new ArrayList<>();
String nextToken = null;
do {
GetPartitionsResponse result = awsGlue.getPartitions(GetPartitionsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.nextToken(nextToken)
.build()).get();
partitions.addAll(result.partitions().stream()
.map(p -> new Partition(p.values(), p.storageDescriptor().location()))
.collect(Collectors.toList()));
nextToken = result.nextToken();
} while (nextToken != null);
return partitions;
return getPartitions(GetPartitionsRequest.builder()
.databaseName(databaseName)
.tableName(tableName));
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get all partitions for table " + tableId(databaseName, tableName), e);
}
}

@Override
public List<Partition> getPartitionsByFilter(String tableName, String filter) {
try {
return getPartitions(GetPartitionsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.expression(filter));
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get partitions for table " + tableId(databaseName, tableName) + " from expression: " + filter, e);
}
}

private List<Partition> getPartitions(GetPartitionsRequest.Builder partitionRequestBuilder) throws InterruptedException, ExecutionException {
List<Partition> partitions = new ArrayList<>();
String nextToken = null;
do {
GetPartitionsResponse result = awsGlue.getPartitions(partitionRequestBuilder
.excludeColumnSchema(true)
.nextToken(nextToken)
.build()).get();
partitions.addAll(result.partitions().stream()
.map(p -> new Partition(p.values(), p.storageDescriptor().location()))
.collect(Collectors.toList()));
nextToken = result.nextToken();
} while (nextToken != null);
return partitions;
}

@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
Expand Down Expand Up @@ -700,6 +732,11 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
}

@Override
public String generatePushDownFilter(List<String> writtenPartitions, List<FieldSchema> partitionFields) {
return new GluePartitionFilterGenerator().generatePushDownFilter(writtenPartitions, partitionFields, (HiveSyncConfig) config);
}

private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync.util;

import org.apache.hudi.hive.util.FilterGenVisitor;

public class GlueFilterGenVisitor extends FilterGenVisitor {

@Override
protected String quoteStringLiteral(String value) {
// Glue uses jSQLParser.
// https://jsqlparser.github.io/JSqlParser/usage.html#define-the-parser-features
return "'" + (value.contains("'") ? value.replaceAll("'", "''") : value) + "'";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync.util;

import org.apache.hudi.expression.Expression;
import org.apache.hudi.hive.util.PartitionFilterGenerator;

public class GluePartitionFilterGenerator extends PartitionFilterGenerator {

protected String generateFilterString(Expression filter) {
return filter.accept(new GlueFilterGenVisitor());
}
}
14 changes: 14 additions & 0 deletions hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ public class HoodieAWSConfig extends HoodieConfig {
.sinceVersion("0.13.2")
.withDocumentation("AWS Role ARN to assume");

public static final ConfigProperty<String> AWS_GLUE_ENDPOINT = ConfigProperty
.key("hoodie.aws.glue.endpoint")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.14.2")
.withDocumentation("Aws glue endpoint");

public static final ConfigProperty<String> AWS_GLUE_REGION = ConfigProperty
.key("hoodie.aws.glue.region")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.14.2")
.withDocumentation("Aws glue endpoint");

private HoodieAWSConfig() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieAWSConfig;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
import software.amazon.awssdk.services.glue.model.PartitionInput;
import software.amazon.awssdk.services.glue.model.SerDeInfo;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.TableInput;

import java.io.IOException;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;

public class ITTestGluePartitionPushdown {

private static final String MOTO_ENDPOINT = "http://localhost:5000";
private static final String DB_NAME = "db_name";
private static final String TABLE_NAME = "tbl_name";
private String basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();
private String tablePath = basePath + "/" + TABLE_NAME;
private TypedProperties hiveSyncProps;
private AWSGlueCatalogSyncClient glueSync;
private FileSystem fileSystem;
private Column[] partitionsColumn = {Column.builder().name("part1").type("int").build(), Column.builder().name("part2").type("string").build()};
List<FieldSchema> partitionsFieldSchema = Arrays.asList(new FieldSchema("part1", "int"), new FieldSchema("part2", "string"));

public ITTestGluePartitionPushdown() throws IOException {}

@BeforeEach
public void setUp() throws Exception {
hiveSyncProps = new TypedProperties();
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy");
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy");
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy");
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT);
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), "eu-west-1");
hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), tablePath);
hiveSyncProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME);

HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, new Configuration());
fileSystem = hiveSyncConfig.getHadoopFileSystem();
fileSystem.mkdirs(new Path(tablePath));
Configuration configuration = new Configuration();
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, tablePath);

glueSync = new AWSGlueCatalogSyncClient(new HiveSyncConfig(hiveSyncProps));
glueSync.awsGlue.createDatabase(CreateDatabaseRequest.builder().databaseInput(DatabaseInput.builder().name(DB_NAME).build()).build()).get();

glueSync.awsGlue.createTable(CreateTableRequest.builder().databaseName(DB_NAME)
.tableInput(TableInput.builder().name(TABLE_NAME).partitionKeys(
partitionsColumn)
.storageDescriptor(
StorageDescriptor.builder()
.serdeInfo(SerDeInfo.builder().serializationLibrary("").build())
.location(tablePath)
.columns(
Column.builder().name("col1").type("string").build()
)
.build())
.build()).build()).get();
}

@AfterEach
public void teardown() throws Exception {
glueSync.awsGlue.deleteTable(DeleteTableRequest.builder().databaseName(DB_NAME).name(TABLE_NAME).build()).get();
glueSync.awsGlue.deleteDatabase(DeleteDatabaseRequest.builder().name(DB_NAME).build()).get();
fileSystem.delete(new Path(tablePath), true);
}

@Test
public void testEmptyPartitionShouldReturnEmpty() {
Assertions.assertEquals(0, glueSync.getPartitionsByFilter(TABLE_NAME,
glueSync.generatePushDownFilter(Arrays.asList("1/bar"), partitionsFieldSchema)).size());
}

@Test
public void testPresentPartitionShouldReturnIt() throws ExecutionException, InterruptedException {
glueSync.awsGlue.createPartition(CreatePartitionRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME)
.partitionInput(PartitionInput.builder()
.storageDescriptor(StorageDescriptor.builder().columns(partitionsColumn).build())
.values("1", "b'ar").build()).build()).get();

Assertions.assertEquals(1, glueSync.getPartitionsByFilter(TABLE_NAME,
glueSync.generatePushDownFilter(Arrays.asList("1/b'ar", "2/foo", "1/b''ar"), partitionsFieldSchema)).size());
}
}
Loading