From a6624428696d198d31bb68fb541134b3bce5f7dd Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sat, 3 Feb 2024 00:00:48 +0100 Subject: [PATCH 01/14] Setup maven docker moto for IT tests --- .../org/apache/hudi/aws/sync/ITTestDummy.java | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java new file mode 100644 index 0000000000000..3be315f2c8ce3 --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieAWSConfig; +import org.apache.hudi.hive.HiveSyncConfig; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.glue.GlueAsyncClient; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutionException; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * docker run --rm -p 5000:5000 --name moto motoserver/moto:latest + * see http://localhost:5000/moto-api/# to check aws resources + */ +public class ITTestDummy { + @Test + public void testAwsCreateDatabase() throws URISyntaxException, ExecutionException, InterruptedException { + HoodieConfig cfg = new HoodieConfig(); + cfg.setValue(HoodieAWSConfig.AWS_ACCESS_KEY, "random-access-key"); + cfg.setValue(HoodieAWSConfig.AWS_SECRET_KEY, "random-secret-key"); + cfg.setValue(HoodieAWSConfig.AWS_SESSION_TOKEN, "random-session-token"); + + GlueAsyncClient awsGlue = GlueAsyncClient.builder() + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(cfg.getProps())) + .endpointOverride(new URI("http://localhost:5000")) + .build(); + + String dummyDb = "dummy_db"; + awsGlue.deleteDatabase(DeleteDatabaseRequest.builder().name(dummyDb).build()).get(); + CreateDatabaseResponse resp = awsGlue.createDatabase(CreateDatabaseRequest.builder() + .databaseInput(DatabaseInput.builder().name(dummyDb).build()).build()).get(); + + assertTrue(resp.sdkHttpResponse().isSuccessful()); + } + + @Test + public void testClientCreateDatabase() { + // see more details here org/apache/hudi/hive/TestHiveSyncTool.java + TypedProperties hiveSyncProps = new TypedProperties(); + hiveSyncProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); + hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); + hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); + hiveSyncProps.setProperty(HoodieAWSConfig.AWS_ENDPOINT.key(), "http://localhost:5000"); + hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), "/tmp/bar"); + AWSGlueCatalogSyncClient glueSync = new AWSGlueCatalogSyncClient(new HiveSyncConfig(hiveSyncProps)); + glueSync.createDatabase("foo_db"); + } +} From 26def33604480ab30cb5247ddb84ff8a42eba767 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Tue, 6 Feb 2024 22:27:31 +0100 Subject: [PATCH 02/14] Temporary hide hudi-aws deps from hudi-client-common --- hudi-client/hudi-client-common/pom.xml | 9 +- .../hudi/metrics/MetricsReporterFactory.java | 8 +- .../cloudwatch/CloudWatchMetricsReporter.java | 122 +++++++++--------- .../TestCloudWatchMetricsReporter.java | 88 ++++++------- 4 files changed, 116 insertions(+), 111 deletions(-) diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 67705edb316f2..2575972f133c8 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -43,11 +43,16 @@ hudi-common ${project.version} + + + + + + org.apache.hudi - hudi-aws + hudi-hive-sync ${project.version} - provided org.apache.hudi diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index 27034735a040c..8847643f4cb2b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -24,7 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter; +//import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter; import org.apache.hudi.metrics.custom.CustomizableMetricsReporter; import org.apache.hudi.metrics.datadog.DatadogMetricsReporter; import org.apache.hudi.metrics.prometheus.PrometheusReporter; @@ -86,9 +86,9 @@ public static Option createReporter(HoodieWriteConfig config, M case CONSOLE: reporter = new ConsoleMetricsReporter(registry); break; - case CLOUDWATCH: - reporter = new CloudWatchMetricsReporter(config, registry); - break; +// case CLOUDWATCH: +// reporter = new CloudWatchMetricsReporter(config, registry); +// break; default: LOG.error("Reporter type[" + type + "] is not supported."); break; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java index d05632b9bbf85..9be20cbbdd642 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java @@ -16,64 +16,64 @@ * limitations under the License. */ -package org.apache.hudi.metrics.cloudwatch; - -import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.metrics.MetricsReporter; - -import com.codahale.metrics.MetricRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * Hudi Amazon CloudWatch metrics reporter. Responsible for reading Hoodie metrics configurations and hooking up with - * {@link org.apache.hudi.metrics.Metrics}. Internally delegates reporting tasks to {@link CloudWatchReporter}. - */ -public class CloudWatchMetricsReporter extends MetricsReporter { - - private static final Logger LOG = LoggerFactory.getLogger(CloudWatchMetricsReporter.class); - - private final MetricRegistry registry; - private final HoodieWriteConfig config; - private final CloudWatchReporter reporter; - - public CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { - this.config = config; - this.registry = registry; - this.reporter = createCloudWatchReporter(); - } - - CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry, CloudWatchReporter reporter) { - this.config = config; - this.registry = registry; - this.reporter = reporter; - } - - private CloudWatchReporter createCloudWatchReporter() { - return CloudWatchReporter.forRegistry(registry) - .prefixedWith(config.getCloudWatchMetricPrefix()) - .namespace(config.getCloudWatchMetricNamespace()) - .maxDatumsPerRequest(config.getCloudWatchMaxDatumsPerRequest()) - .build(config.getProps()); - } - - @Override - public void start() { - LOG.info("Starting CloudWatch Metrics Reporter."); - reporter.start(config.getCloudWatchReportPeriodSeconds(), TimeUnit.SECONDS); - } - - @Override - public void report() { - reporter.report(); - } - - @Override - public void stop() { - LOG.info("Stopping CloudWatch Metrics Reporter."); - reporter.stop(); - } -} +//package org.apache.hudi.metrics.cloudwatch; +// +//import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; +//import org.apache.hudi.config.HoodieWriteConfig; +//import org.apache.hudi.metrics.MetricsReporter; +// +//import com.codahale.metrics.MetricRegistry; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.util.concurrent.TimeUnit; +// +///** +// * Hudi Amazon CloudWatch metrics reporter. Responsible for reading Hoodie metrics configurations and hooking up with +// * {@link org.apache.hudi.metrics.Metrics}. Internally delegates reporting tasks to {@link CloudWatchReporter}. +// */ +//public class CloudWatchMetricsReporter extends MetricsReporter { +// +// private static final Logger LOG = LoggerFactory.getLogger(CloudWatchMetricsReporter.class); +// +// private final MetricRegistry registry; +// private final HoodieWriteConfig config; +// private final CloudWatchReporter reporter; +// +// public CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { +// this.config = config; +// this.registry = registry; +// this.reporter = createCloudWatchReporter(); +// } +// +// CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry, CloudWatchReporter reporter) { +// this.config = config; +// this.registry = registry; +// this.reporter = reporter; +// } +// +// private CloudWatchReporter createCloudWatchReporter() { +// return CloudWatchReporter.forRegistry(registry) +// .prefixedWith(config.getCloudWatchMetricPrefix()) +// .namespace(config.getCloudWatchMetricNamespace()) +// .maxDatumsPerRequest(config.getCloudWatchMaxDatumsPerRequest()) +// .build(config.getProps()); +// } +// +// @Override +// public void start() { +// LOG.info("Starting CloudWatch Metrics Reporter."); +// reporter.start(config.getCloudWatchReportPeriodSeconds(), TimeUnit.SECONDS); +// } +// +// @Override +// public void report() { +// reporter.report(); +// } +// +// @Override +// public void stop() { +// LOG.info("Stopping CloudWatch Metrics Reporter."); +// reporter.stop(); +// } +//} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java index 7901d80246513..9d8228ccbbb80 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java @@ -16,47 +16,47 @@ * limitations under the License. */ -package org.apache.hudi.metrics.cloudwatch; - -import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; -import org.apache.hudi.config.HoodieWriteConfig; - -import com.codahale.metrics.MetricRegistry; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class TestCloudWatchMetricsReporter { - - @Mock - private HoodieWriteConfig config; - - @Mock - private MetricRegistry registry; - - @Mock - private CloudWatchReporter reporter; - - @Test - public void testReporter() { - when(config.getCloudWatchReportPeriodSeconds()).thenReturn(30); - CloudWatchMetricsReporter metricsReporter = new CloudWatchMetricsReporter(config, registry, reporter); - - metricsReporter.start(); - verify(reporter, times(1)).start(30, TimeUnit.SECONDS); - - metricsReporter.report(); - verify(reporter, times(1)).report(); - - metricsReporter.stop(); - verify(reporter, times(1)).stop(); - } -} +//package org.apache.hudi.metrics.cloudwatch; +// +//import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; +//import org.apache.hudi.config.HoodieWriteConfig; +// +//import com.codahale.metrics.MetricRegistry; +//import org.junit.jupiter.api.Test; +//import org.junit.jupiter.api.extension.ExtendWith; +//import org.mockito.Mock; +//import org.mockito.junit.jupiter.MockitoExtension; +// +//import java.util.concurrent.TimeUnit; +// +//import static org.mockito.Mockito.times; +//import static org.mockito.Mockito.verify; +//import static org.mockito.Mockito.when; +// +//@ExtendWith(MockitoExtension.class) +//public class TestCloudWatchMetricsReporter { +// +// @Mock +// private HoodieWriteConfig config; +// +// @Mock +// private MetricRegistry registry; +// +// @Mock +// private CloudWatchReporter reporter; +// +// @Test +// public void testReporter() { +// when(config.getCloudWatchReportPeriodSeconds()).thenReturn(30); +// CloudWatchMetricsReporter metricsReporter = new CloudWatchMetricsReporter(config, registry, reporter); +// +// metricsReporter.start(); +// verify(reporter, times(1)).start(30, TimeUnit.SECONDS); +// +// metricsReporter.report(); +// verify(reporter, times(1)).report(); +// +// metricsReporter.stop(); +// verify(reporter, times(1)).stop(); +// } +//} From 9c20da84677856b3de2984ed38c5fc1d9edf1bdd Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Tue, 6 Feb 2024 22:28:53 +0100 Subject: [PATCH 03/14] Use hudi-java-client from hudi-aws many dependency conflicts (hive) solved --- hudi-aws/pom.xml | 107 +++++++++++------- .../org/apache/hudi/aws/sync/ITTestDummy.java | 77 ------------- .../org/apache/hudi/aws/sync/TestDummy.java | 81 +++++++++++++ 3 files changed, 149 insertions(+), 116 deletions(-) delete mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java create mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestDummy.java diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index 9da22ff32c2ae..f8e35850461c7 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -54,26 +54,26 @@ - - org.apache.hadoop - hadoop-common - tests - test - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - + + + + + + + + + + + + + + + + + + + + com.amazonaws @@ -82,26 +82,26 @@ - - ${hive.groupid} - hive-service - ${hive.version} - - - org.slf4j - slf4j-api - - - org.slf4j - slf4j-log4j12 - - - + + + + + + + + + + + + + + + + + + + - - org.apache.parquet - parquet-avro - @@ -194,12 +194,41 @@ + + org.apache.parquet + parquet-avro + ${parquet.version} + provided + + + com.esotericsoftware + kryo-shaded + test + org.apache.hudi hudi-tests-common ${project.version} test + + org.apache.hudi + hudi-java-client + ${project.version} + test + + + org.apache.hudi + hudi-client-common + ${project.version} + test + + + org.apache.hudi + hudi-examples-common + ${project.version} + test + diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java deleted file mode 100644 index 3be315f2c8ce3..0000000000000 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestDummy.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.aws.sync; - -import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; -import org.apache.hudi.common.config.HoodieConfig; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.config.HoodieAWSConfig; -import org.apache.hudi.hive.HiveSyncConfig; -import org.junit.jupiter.api.Test; -import software.amazon.awssdk.services.glue.GlueAsyncClient; -import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; -import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; -import software.amazon.awssdk.services.glue.model.DatabaseInput; -import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.ExecutionException; - -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * docker run --rm -p 5000:5000 --name moto motoserver/moto:latest - * see http://localhost:5000/moto-api/# to check aws resources - */ -public class ITTestDummy { - @Test - public void testAwsCreateDatabase() throws URISyntaxException, ExecutionException, InterruptedException { - HoodieConfig cfg = new HoodieConfig(); - cfg.setValue(HoodieAWSConfig.AWS_ACCESS_KEY, "random-access-key"); - cfg.setValue(HoodieAWSConfig.AWS_SECRET_KEY, "random-secret-key"); - cfg.setValue(HoodieAWSConfig.AWS_SESSION_TOKEN, "random-session-token"); - - GlueAsyncClient awsGlue = GlueAsyncClient.builder() - .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(cfg.getProps())) - .endpointOverride(new URI("http://localhost:5000")) - .build(); - - String dummyDb = "dummy_db"; - awsGlue.deleteDatabase(DeleteDatabaseRequest.builder().name(dummyDb).build()).get(); - CreateDatabaseResponse resp = awsGlue.createDatabase(CreateDatabaseRequest.builder() - .databaseInput(DatabaseInput.builder().name(dummyDb).build()).build()).get(); - - assertTrue(resp.sdkHttpResponse().isSuccessful()); - } - - @Test - public void testClientCreateDatabase() { - // see more details here org/apache/hudi/hive/TestHiveSyncTool.java - TypedProperties hiveSyncProps = new TypedProperties(); - hiveSyncProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); - hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); - hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); - hiveSyncProps.setProperty(HoodieAWSConfig.AWS_ENDPOINT.key(), "http://localhost:5000"); - hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), "/tmp/bar"); - AWSGlueCatalogSyncClient glueSync = new AWSGlueCatalogSyncClient(new HiveSyncConfig(hiveSyncProps)); - glueSync.createDatabase("foo_db"); - } -} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestDummy.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestDummy.java new file mode 100644 index 0000000000000..fbbe3ded92029 --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestDummy.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieAWSConfig; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.examples.common.HoodieExampleDataGenerator; +import org.apache.hudi.index.HoodieIndex; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; + +/** + * docker run --rm -p 5000:5000 --name moto motoserver/moto:latest + * see http://localhost:5000/moto-api/# to check aws resources + */ +public class TestDummy { + + @Test + public void testJavaClient() throws IOException { + String tablePath = "file:///tmp/hoodie/sample-table"; + Configuration hadoopConf = new Configuration(); + String tableType = "COPY_ON_WRITE"; + String tableName = "hoodie_rt"; + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setTableName(tableName) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hadoopConf, tablePath); + + String schema = HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA; + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(schema).withParallelism(2, 2) + .withDeleteParallelism(2).forTable(tableName) + .withEmbeddedTimelineServerEnabled(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + HoodieJavaWriteClient client = + new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); +// + String newCommitTime = client.startCommit(); + HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + List> records = dataGen.generateInserts(newCommitTime, 10); + List> recordsSoFar = new ArrayList<>(records); + List> writeRecords = + recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); + client.insert(writeRecords, newCommitTime); + client.close(); + } +} From d29fd4e583a76828e6cbaffe24e8618db868e6ce Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Fri, 9 Feb 2024 22:54:00 +0100 Subject: [PATCH 04/14] Move cloudwatch metrics to hudi-aws and use reflexion to instantiate it --- hudi-aws/pom.xml | 1 - .../cloudwatch/CloudWatchMetricsReporter.java | 78 ++++++++++++++++++ .../TestCloudWatchMetricsReporter.java | 61 ++++++++++++++ .../hudi/metrics/MetricsReporterFactory.java | 7 +- .../cloudwatch/CloudWatchMetricsReporter.java | 79 ------------------- .../TestCloudWatchMetricsReporter.java | 62 --------------- 6 files changed, 142 insertions(+), 146 deletions(-) create mode 100644 hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchMetricsReporter.java create mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchMetricsReporter.java delete mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java delete mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index f8e35850461c7..6d78e60c0609b 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -221,7 +221,6 @@ org.apache.hudi hudi-client-common ${project.version} - test org.apache.hudi diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchMetricsReporter.java b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchMetricsReporter.java new file mode 100644 index 0000000000000..8a9c8baf5c15f --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchMetricsReporter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.cloudwatch; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.MetricsReporter; + +import com.codahale.metrics.MetricRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Hudi Amazon CloudWatch metrics reporter. Responsible for reading Hoodie metrics configurations and hooking up with + * {@link org.apache.hudi.metrics.Metrics}. Internally delegates reporting tasks to {@link CloudWatchReporter}. + */ +public class CloudWatchMetricsReporter extends MetricsReporter { + + private static final Logger LOG = LoggerFactory.getLogger(CloudWatchMetricsReporter.class); + + private final MetricRegistry registry; + private final HoodieWriteConfig config; + private final CloudWatchReporter reporter; + + public CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { + this.config = config; + this.registry = registry; + this.reporter = createCloudWatchReporter(); + } + + CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry, CloudWatchReporter reporter) { + this.config = config; + this.registry = registry; + this.reporter = reporter; + } + + private CloudWatchReporter createCloudWatchReporter() { + return CloudWatchReporter.forRegistry(registry) + .prefixedWith(config.getCloudWatchMetricPrefix()) + .namespace(config.getCloudWatchMetricNamespace()) + .maxDatumsPerRequest(config.getCloudWatchMaxDatumsPerRequest()) + .build(config.getProps()); + } + + @Override + public void start() { + LOG.info("Starting CloudWatch Metrics Reporter."); + reporter.start(config.getCloudWatchReportPeriodSeconds(), TimeUnit.SECONDS); + } + + @Override + public void report() { + reporter.report(); + } + + @Override + public void stop() { + LOG.info("Stopping CloudWatch Metrics Reporter."); + reporter.stop(); + } +} \ No newline at end of file diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchMetricsReporter.java b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchMetricsReporter.java new file mode 100644 index 0000000000000..ea7603e974daa --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchMetricsReporter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.cloudwatch; + +import org.apache.hudi.config.HoodieWriteConfig; + +import com.codahale.metrics.MetricRegistry; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestCloudWatchMetricsReporter { + + @Mock + private HoodieWriteConfig config; + + @Mock + private MetricRegistry registry; + + @Mock + private CloudWatchReporter reporter; + + @Test + public void testReporter() { + when(config.getCloudWatchReportPeriodSeconds()).thenReturn(30); + CloudWatchMetricsReporter metricsReporter = new CloudWatchMetricsReporter(config, registry, reporter); + + metricsReporter.start(); + verify(reporter, times(1)).start(30, TimeUnit.SECONDS); + + metricsReporter.report(); + verify(reporter, times(1)).report(); + + metricsReporter.stop(); + verify(reporter, times(1)).stop(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index 8847643f4cb2b..418983d0508ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -24,7 +24,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; -//import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter; import org.apache.hudi.metrics.custom.CustomizableMetricsReporter; import org.apache.hudi.metrics.datadog.DatadogMetricsReporter; import org.apache.hudi.metrics.prometheus.PrometheusReporter; @@ -86,9 +85,9 @@ public static Option createReporter(HoodieWriteConfig config, M case CONSOLE: reporter = new ConsoleMetricsReporter(registry); break; -// case CLOUDWATCH: -// reporter = new CloudWatchMetricsReporter(config, registry); -// break; + case CLOUDWATCH: + reporter = (MetricsReporter) ReflectionUtils.loadClass("org.apache.hudi.aws.cloudwatch.CloudWatchMetricsReporter", config, registry); + break; default: LOG.error("Reporter type[" + type + "] is not supported."); break; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java deleted file mode 100644 index 9be20cbbdd642..0000000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//package org.apache.hudi.metrics.cloudwatch; -// -//import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; -//import org.apache.hudi.config.HoodieWriteConfig; -//import org.apache.hudi.metrics.MetricsReporter; -// -//import com.codahale.metrics.MetricRegistry; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.util.concurrent.TimeUnit; -// -///** -// * Hudi Amazon CloudWatch metrics reporter. Responsible for reading Hoodie metrics configurations and hooking up with -// * {@link org.apache.hudi.metrics.Metrics}. Internally delegates reporting tasks to {@link CloudWatchReporter}. -// */ -//public class CloudWatchMetricsReporter extends MetricsReporter { -// -// private static final Logger LOG = LoggerFactory.getLogger(CloudWatchMetricsReporter.class); -// -// private final MetricRegistry registry; -// private final HoodieWriteConfig config; -// private final CloudWatchReporter reporter; -// -// public CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { -// this.config = config; -// this.registry = registry; -// this.reporter = createCloudWatchReporter(); -// } -// -// CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry, CloudWatchReporter reporter) { -// this.config = config; -// this.registry = registry; -// this.reporter = reporter; -// } -// -// private CloudWatchReporter createCloudWatchReporter() { -// return CloudWatchReporter.forRegistry(registry) -// .prefixedWith(config.getCloudWatchMetricPrefix()) -// .namespace(config.getCloudWatchMetricNamespace()) -// .maxDatumsPerRequest(config.getCloudWatchMaxDatumsPerRequest()) -// .build(config.getProps()); -// } -// -// @Override -// public void start() { -// LOG.info("Starting CloudWatch Metrics Reporter."); -// reporter.start(config.getCloudWatchReportPeriodSeconds(), TimeUnit.SECONDS); -// } -// -// @Override -// public void report() { -// reporter.report(); -// } -// -// @Override -// public void stop() { -// LOG.info("Stopping CloudWatch Metrics Reporter."); -// reporter.stop(); -// } -//} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java deleted file mode 100644 index 9d8228ccbbb80..0000000000000 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//package org.apache.hudi.metrics.cloudwatch; -// -//import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; -//import org.apache.hudi.config.HoodieWriteConfig; -// -//import com.codahale.metrics.MetricRegistry; -//import org.junit.jupiter.api.Test; -//import org.junit.jupiter.api.extension.ExtendWith; -//import org.mockito.Mock; -//import org.mockito.junit.jupiter.MockitoExtension; -// -//import java.util.concurrent.TimeUnit; -// -//import static org.mockito.Mockito.times; -//import static org.mockito.Mockito.verify; -//import static org.mockito.Mockito.when; -// -//@ExtendWith(MockitoExtension.class) -//public class TestCloudWatchMetricsReporter { -// -// @Mock -// private HoodieWriteConfig config; -// -// @Mock -// private MetricRegistry registry; -// -// @Mock -// private CloudWatchReporter reporter; -// -// @Test -// public void testReporter() { -// when(config.getCloudWatchReportPeriodSeconds()).thenReturn(30); -// CloudWatchMetricsReporter metricsReporter = new CloudWatchMetricsReporter(config, registry, reporter); -// -// metricsReporter.start(); -// verify(reporter, times(1)).start(30, TimeUnit.SECONDS); -// -// metricsReporter.report(); -// verify(reporter, times(1)).report(); -// -// metricsReporter.stop(); -// verify(reporter, times(1)).stop(); -// } -//} From 769e6a84a214f607f5f0525506bfc01dfaacebd4 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sat, 10 Feb 2024 00:40:49 +0100 Subject: [PATCH 05/14] First IT test for glue sync tool --- hudi-aws/pom.xml | 17 ++++++ ...java => ITTestAwsGlueCatalogSyncTool.java} | 61 ++++++++++++++++--- 2 files changed, 70 insertions(+), 8 deletions(-) rename hudi-aws/src/test/java/org/apache/hudi/aws/sync/{TestDummy.java => ITTestAwsGlueCatalogSyncTool.java} (51%) diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index 6d78e60c0609b..9d017985e9c30 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -193,6 +193,23 @@ ${aws.sdk.version} + + ${hive.groupid} + hive-exec + ${hive.exec.classifier} + test + + + org.pentaho + * + + + org.apache.parquet + * + + + + org.apache.parquet diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestDummy.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java similarity index 51% rename from hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestDummy.java rename to hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index fbbe3ded92029..321dc0e16dc7e 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestDummy.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -19,6 +19,7 @@ package org.apache.hudi.aws.sync; import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.config.TypedProperties; @@ -31,44 +32,59 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.examples.common.HoodieExampleDataGenerator; +import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.index.HoodieIndex; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueAsyncClient; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; -/** - * docker run --rm -p 5000:5000 --name moto motoserver/moto:latest - * see http://localhost:5000/moto-api/# to check aws resources - */ -public class TestDummy { +public class ITTestAwsGlueCatalogSyncTool { + private static final String MOTO_ENDPOINT = "http://localhost:5000"; + private static final String DB_NAME = "db_name"; + private static final String TABLE_NAME = "tbl_name"; @Test - public void testJavaClient() throws IOException { + public void testJavaClient() throws IOException, ExecutionException, InterruptedException, URISyntaxException { String tablePath = "file:///tmp/hoodie/sample-table"; Configuration hadoopConf = new Configuration(); String tableType = "COPY_ON_WRITE"; String tableName = "hoodie_rt"; + String parts = "driver"; HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setTableName(tableName) + .setPartitionFields(parts) .setPayloadClassName(HoodieAvroPayload.class.getName()) .initTable(hadoopConf, tablePath); String schema = HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA; HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(schema).withParallelism(2, 2) - .withDeleteParallelism(2).forTable(tableName) .withEmbeddedTimelineServerEnabled(false) + .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); HoodieJavaWriteClient client = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); -// String newCommitTime = client.startCommit(); HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); List> records = dataGen.generateInserts(newCommitTime, 10); @@ -77,5 +93,34 @@ public void testJavaClient() throws IOException { recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); client.insert(writeRecords, newCommitTime); client.close(); + Properties hiveProps = new TypedProperties(); + hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), "eu-west-1"); + hiveProps.setProperty(META_SYNC_BASE_PATH.key(), tablePath); + hiveProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), tablePath); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); + AwsGlueCatalogSyncTool awsGlueCatalogSyncTool = new AwsGlueCatalogSyncTool(hiveProps, hadoopConf); + awsGlueCatalogSyncTool.initSyncClient(new HiveSyncConfig(hiveProps)); + awsGlueCatalogSyncTool.syncHoodieTable(); + awsGlueCatalogSyncTool.close(); + GlueAsyncClient testclient = GlueAsyncClient.builder() + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(hiveProps)) + .endpointOverride(new URI(MOTO_ENDPOINT)) + .region(Region.of("eu-west-1")) + .build(); + + GetDatabaseResponse db = testclient.getDatabase(GetDatabaseRequest.builder().name(DB_NAME).build()).get(); + Assertions.assertTrue(db.database().name().equals(DB_NAME)); + GetTableResponse tbl = testclient.getTable(GetTableRequest.builder().databaseName(DB_NAME).name(TABLE_NAME).build()).get(); + Assertions.assertTrue(tbl.table().name().equals(TABLE_NAME)); + + GetPartitionsResponse partitions = testclient.getPartitions(GetPartitionsRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME).build()).get(); + Assertions.assertEquals(3, partitions.partitions().size()); } } From b3713e198af9dc3f3cffa66094a420bb7748cb02 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sat, 10 Feb 2024 01:04:02 +0100 Subject: [PATCH 06/14] cleanup pom --- hudi-aws/pom.xml | 65 ++++++-------------------- hudi-client/hudi-client-common/pom.xml | 6 --- 2 files changed, 14 insertions(+), 57 deletions(-) diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index 9d017985e9c30..9fb0b5f1d88d5 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -53,56 +53,12 @@ ${project.version} - - - - - - - - - - - - - - - - - - - - - - com.amazonaws dynamodb-lock-client ${dynamodb.lockclient.version} - - - - - - - - - - - - - - - - - - - - - - software.amazon.awssdk @@ -181,6 +137,7 @@ httpclient ${aws.sdk.httpclient.version} + org.apache.httpcomponents httpcore @@ -193,6 +150,14 @@ ${aws.sdk.version} + + org.apache.parquet + parquet-avro + ${parquet.version} + provided + + + ${hive.groupid} hive-exec @@ -210,41 +175,39 @@ - - - org.apache.parquet - parquet-avro - ${parquet.version} - provided - com.esotericsoftware kryo-shaded test + org.apache.hudi hudi-tests-common ${project.version} test + org.apache.hudi hudi-java-client ${project.version} test + org.apache.hudi hudi-client-common ${project.version} + org.apache.hudi hudi-examples-common ${project.version} test + diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 2575972f133c8..91599a7312560 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -43,12 +43,6 @@ hudi-common ${project.version} - - - - - - org.apache.hudi hudi-hive-sync From 3dd6bd9877080268790a0de974850054f5925990 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sat, 10 Feb 2024 15:27:43 +0100 Subject: [PATCH 07/14] Attempt to refactor --- .../sync/ITTestAwsGlueCatalogSyncTool.java | 74 +++-------------- .../apache/hudi/aws/sync/ITTestGlueUtil.java | 41 ++++++++++ .../apache/hudi/aws/sync/ITTestSyncUtil.java | 79 +++++++++++++++++++ 3 files changed, 131 insertions(+), 63 deletions(-) create mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java create mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index 321dc0e16dc7e..e1c855a60094a 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -18,25 +18,13 @@ package org.apache.hudi.aws.sync; -import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; import org.apache.hudi.client.HoodieJavaWriteClient; -import org.apache.hudi.client.common.HoodieJavaEngineContext; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.config.HoodieAWSConfig; -import org.apache.hudi.config.HoodieArchivalConfig; -import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.examples.common.HoodieExampleDataGenerator; import org.apache.hudi.hive.HiveSyncConfig; -import org.apache.hudi.index.HoodieIndex; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.glue.GlueAsyncClient; import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; @@ -46,74 +34,33 @@ import software.amazon.awssdk.services.glue.model.GetTableResponse; import java.io.IOException; -import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; +public class ITTestAwsGlueCatalogSyncTool extends ITTestGlueUtil { -public class ITTestAwsGlueCatalogSyncTool { - - private static final String MOTO_ENDPOINT = "http://localhost:5000"; - private static final String DB_NAME = "db_name"; - private static final String TABLE_NAME = "tbl_name"; @Test public void testJavaClient() throws IOException, ExecutionException, InterruptedException, URISyntaxException { - String tablePath = "file:///tmp/hoodie/sample-table"; - Configuration hadoopConf = new Configuration(); - String tableType = "COPY_ON_WRITE"; - String tableName = "hoodie_rt"; + String parts = "driver"; - HoodieTableMetaClient.withPropertyBuilder() - .setTableType(tableType) - .setTableName(tableName) - .setPartitionFields(parts) - .setPayloadClassName(HoodieAvroPayload.class.getName()) - .initTable(hadoopConf, tablePath); + HoodieJavaWriteClient client = clientCOW(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA, Optional.of(parts)); - String schema = HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA; - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) - .withSchema(schema).withParallelism(2, 2) - .withEmbeddedTimelineServerEnabled(false) - .withDeleteParallelism(2).forTable(tableName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); - HoodieJavaWriteClient client = - new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); String newCommitTime = client.startCommit(); - HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); - List> records = dataGen.generateInserts(newCommitTime, 10); - List> recordsSoFar = new ArrayList<>(records); - List> writeRecords = - recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); + List> writeRecords = getHoodieRecords(newCommitTime, 10); client.insert(writeRecords, newCommitTime); client.close(); - Properties hiveProps = new TypedProperties(); - hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); - hiveProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); - hiveProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); - hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT); - hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), "eu-west-1"); - hiveProps.setProperty(META_SYNC_BASE_PATH.key(), tablePath); - hiveProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), tablePath); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); + + Properties hiveProps = getAwsProperties(); + addMetaSyncProps(hiveProps, parts); + AwsGlueCatalogSyncTool awsGlueCatalogSyncTool = new AwsGlueCatalogSyncTool(hiveProps, hadoopConf); awsGlueCatalogSyncTool.initSyncClient(new HiveSyncConfig(hiveProps)); awsGlueCatalogSyncTool.syncHoodieTable(); awsGlueCatalogSyncTool.close(); - GlueAsyncClient testclient = GlueAsyncClient.builder() - .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(hiveProps)) - .endpointOverride(new URI(MOTO_ENDPOINT)) - .region(Region.of("eu-west-1")) - .build(); + GlueAsyncClient testclient = getGlueAsyncClient(hiveProps); GetDatabaseResponse db = testclient.getDatabase(GetDatabaseRequest.builder().name(DB_NAME).build()).get(); Assertions.assertTrue(db.database().name().equals(DB_NAME)); @@ -123,4 +70,5 @@ public void testJavaClient() throws IOException, ExecutionException, Interrupted GetPartitionsResponse partitions = testclient.getPartitions(GetPartitionsRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME).build()).get(); Assertions.assertEquals(3, partitions.partitions().size()); } + } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java new file mode 100644 index 0000000000000..87dc474f1d03c --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java @@ -0,0 +1,41 @@ +package org.apache.hudi.aws.sync; + +import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieAWSConfig; +import org.junit.jupiter.api.AfterEach; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueAsyncClient; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +public class ITTestGlueUtil extends ITTestSyncUtil { + + protected static final String MOTO_ENDPOINT = "http://localhost:5000"; + public static final String AWS_REGION = "eu-west-1"; + + protected static Properties getAwsProperties() { + Properties hiveProps = new TypedProperties(); + hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), AWS_REGION); + return hiveProps; + } + + protected static GlueAsyncClient getGlueAsyncClient(Properties hiveProps) throws URISyntaxException { + GlueAsyncClient testclient = GlueAsyncClient.builder() + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(hiveProps)) + .endpointOverride(new URI(MOTO_ENDPOINT)) + .region(Region.of(AWS_REGION)) + .build(); + return testclient; + } + @AfterEach + public void cleanUp() { + // drop database and table + } +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java new file mode 100644 index 0000000000000..7e7efb96ef578 --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -0,0 +1,79 @@ +package org.apache.hudi.aws.sync; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.examples.common.HoodieExampleDataGenerator; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.index.HoodieIndex; +import org.junit.jupiter.api.AfterEach; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; + +public class ITTestSyncUtil { + protected static final String tablePath = "file:///tmp/hoodie/sample-table"; + protected static final String tableType = "COPY_ON_WRITE"; + protected static final String tableName = "hoodie_rt"; + protected static final String DB_NAME = "db_name"; + protected static final String TABLE_NAME = "tbl_name"; + protected static final Configuration hadoopConf = new Configuration(); + + protected static void addMetaSyncProps(Properties hiveProps, String parts) { + hiveProps.setProperty(META_SYNC_BASE_PATH.key(), tablePath); + hiveProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), tablePath); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); + } + + + protected HoodieJavaWriteClient clientCOW(String avroSchema, Optional hudiPartitions) throws IOException { + HoodieTableMetaClient.PropertyBuilder propertyBuilder = HoodieTableMetaClient.withPropertyBuilder(); + if(hudiPartitions.isPresent()) { + propertyBuilder=propertyBuilder.setPartitionFields(hudiPartitions.get()); + } + propertyBuilder + .setTableType(tableType) + .setTableName(tableName) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hadoopConf, tablePath); + + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(avroSchema).withParallelism(1, 1) + .withDeleteParallelism(1).forTable(tableName) + .withEmbeddedTimelineServerEnabled(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + + return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); + } + + protected static List> getHoodieRecords(String newCommitTime, int numRecords) { + HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + List> records = dataGen.generateInserts(newCommitTime, numRecords); + List> recordsSoFar = new ArrayList<>(records); + List> writeRecords = + recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); + return writeRecords; + } + @AfterEach + public void cleanUp() { + // rm hoodie files + } +} From 9c2af1e3123583758505952770447edad1dbb5ad Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sun, 11 Feb 2024 15:04:29 +0100 Subject: [PATCH 08/14] Rm hudi-example deps keep a copy of data generator, to be adapted --- hudi-aws/pom.xml | 7 - .../hudi/aws/sync/HoodieDataGenerator.java | 226 ++++++++++++++++++ .../sync/ITTestAwsGlueCatalogSyncTool.java | 3 +- .../apache/hudi/aws/sync/ITTestSyncUtil.java | 3 +- 4 files changed, 228 insertions(+), 11 deletions(-) create mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index 9fb0b5f1d88d5..4c27b2a14964f 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -201,13 +201,6 @@ ${project.version} - - org.apache.hudi - hudi-examples-common - ${project.version} - test - - diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java new file mode 100644 index 0000000000000..608970a91dfdd --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Class to be used to generate test data. + */ +public class HoodieDataGenerator> { + + public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01"; + public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02"; + public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03"; + + public static final String[] DEFAULT_PARTITION_PATHS = + {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; + public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," + + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," + + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); + + private static final Random RAND = new Random(46474747); + + private final Map existingKeys; + private final String[] partitionPaths; + private int numExistingKeys; + + public HoodieDataGenerator(String[] partitionPaths) { + this(partitionPaths, new HashMap<>()); + } + + public HoodieDataGenerator() { + this(DEFAULT_PARTITION_PATHS); + } + + public HoodieDataGenerator(String[] partitionPaths, Map keyPartitionMap) { + this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); + this.existingKeys = keyPartitionMap; + } + + /** + * Generates a new avro record of the above schema format, retaining the key if optionally provided. + */ + @SuppressWarnings("unchecked") + public T generateRandomValue(HoodieKey key, String commitTime) { + GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0); + return (T) new HoodieAvroPayload(Option.of(rec)); + } + + public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, + long timestamp) { + GenericRecord rec = new GenericData.Record(avroSchema); + rec.put("uuid", rowKey); + rec.put("ts", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + rec.put("begin_lat", RAND.nextDouble()); + rec.put("begin_lon", RAND.nextDouble()); + rec.put("end_lat", RAND.nextDouble()); + rec.put("end_lon", RAND.nextDouble()); + rec.put("fare", RAND.nextDouble() * 100); + return rec; + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public List> generateInserts(String commitTime, Integer n) { + return generateInsertsStream(commitTime, n).collect(Collectors.toList()); + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public Stream> generateInsertsStream(String commitTime, Integer n) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { + String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + KeyPartition kp = new KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + existingKeys.put(currSize + i, kp); + numExistingKeys++; + return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); + }); + } + + /** + * Generates new inserts, across a single partition path. It also updates the list of existing keys. + */ + public List> generateInsertsOnPartition(String commitTime, Integer n, String partitionPath) { + return generateInsertsStreamOnPartition(commitTime, n, partitionPath).collect(Collectors.toList()); + } + + /** + * Generates new inserts, across a single partition path. It also updates the list of existing keys. + */ + public Stream> generateInsertsStreamOnPartition(String commitTime, Integer n, String partitionPath) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + KeyPartition kp = new KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + existingKeys.put(currSize + i, kp); + numExistingKeys++; + return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); + }); + } + + /** + * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned + * list + * + * @param commitTime Commit Timestamp + * @param n Number of updates (including dups) + * @return list of hoodie record updates + */ + public List> generateUpdates(String commitTime, Integer n) { + List> updates = new ArrayList<>(); + for (int i = 0; i < n; i++) { + KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1)); + HoodieRecord record = generateUpdateRecord(kp.key, commitTime); + updates.add(record); + } + return updates; + } + + /** + * Generates new updates, one for each of the keys above + * list + * + * @param commitTime Commit Timestamp + * @return list of hoodie record updates + */ + public List> generateUniqueUpdates(String commitTime) { + List> updates = new ArrayList<>(); + for (int i = 0; i < numExistingKeys; i++) { + KeyPartition kp = existingKeys.get(i); + HoodieRecord record = generateUpdateRecord(kp.key, commitTime); + updates.add(record); + } + return updates; + } + + public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) { + return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); + } + + private Option convertToString(HoodieRecord record) { + try { + String str = HoodieAvroUtils + .bytesToAvro(((HoodieAvroPayload) record.getData()).getRecordBytes(), avroSchema) + .toString(); + str = "{" + str.substring(str.indexOf("\"ts\":")); + return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); + } catch (IOException e) { + return Option.empty(); + } + } + + public List convertToStringList(List> records) { + return records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get) + .collect(Collectors.toList()); + } + + public int getNumExistingKeys() { + return numExistingKeys; + } + + public static class KeyPartition implements Serializable { + + HoodieKey key; + String partitionPath; + } + + public void close() { + existingKeys.clear(); + } + +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index e1c855a60094a..5353ac142cfa2 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -21,7 +21,6 @@ import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.examples.common.HoodieExampleDataGenerator; import org.apache.hudi.hive.HiveSyncConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -46,7 +45,7 @@ public class ITTestAwsGlueCatalogSyncTool extends ITTestGlueUtil { public void testJavaClient() throws IOException, ExecutionException, InterruptedException, URISyntaxException { String parts = "driver"; - HoodieJavaWriteClient client = clientCOW(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA, Optional.of(parts)); + HoodieJavaWriteClient client = clientCOW(HoodieDataGenerator.TRIP_EXAMPLE_SCHEMA, Optional.of(parts)); String newCommitTime = client.startCommit(); List> writeRecords = getHoodieRecords(newCommitTime, 10); diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java index 7e7efb96ef578..f929d16becd0f 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -10,7 +10,6 @@ import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.examples.common.HoodieExampleDataGenerator; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.index.HoodieIndex; import org.junit.jupiter.api.AfterEach; @@ -65,7 +64,7 @@ protected HoodieJavaWriteClient clientCOW(String avroSchema, } protected static List> getHoodieRecords(String newCommitTime, int numRecords) { - HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + HoodieDataGenerator dataGen = new HoodieDataGenerator<>(); List> records = dataGen.generateInserts(newCommitTime, numRecords); List> recordsSoFar = new ArrayList<>(records); List> writeRecords = From 191f4dcf54e00405d18177e471bfdf3d78e2437c Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sun, 11 Feb 2024 15:53:38 +0100 Subject: [PATCH 09/14] Fix partitions --- .../apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java | 2 +- .../test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index 5353ac142cfa2..3ea61c20add9c 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -48,7 +48,7 @@ public void testJavaClient() throws IOException, ExecutionException, Interrupted HoodieJavaWriteClient client = clientCOW(HoodieDataGenerator.TRIP_EXAMPLE_SCHEMA, Optional.of(parts)); String newCommitTime = client.startCommit(); - List> writeRecords = getHoodieRecords(newCommitTime, 10); + List> writeRecords = getHoodieRecords(newCommitTime, 1, "driver1"); client.insert(writeRecords, newCommitTime); client.close(); diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java index f929d16becd0f..285b8eace576c 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -63,8 +63,8 @@ protected HoodieJavaWriteClient clientCOW(String avroSchema, return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); } - protected static List> getHoodieRecords(String newCommitTime, int numRecords) { - HoodieDataGenerator dataGen = new HoodieDataGenerator<>(); + protected static List> getHoodieRecords(String newCommitTime, int numRecords, String...partitionPath) { + HoodieDataGenerator dataGen = new HoodieDataGenerator<>(partitionPath); List> records = dataGen.generateInserts(newCommitTime, numRecords); List> recordsSoFar = new ArrayList<>(records); List> writeRecords = From d7ec2e657446125700d28ccb9ac8e4ff0e420b9a Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sun, 11 Feb 2024 16:28:09 +0100 Subject: [PATCH 10/14] TestCleanup --- .../sync/ITTestAwsGlueCatalogSyncTool.java | 13 ++- .../apache/hudi/aws/sync/ITTestGlueUtil.java | 14 ++- .../apache/hudi/aws/sync/ITTestSyncUtil.java | 97 ++++++++++--------- 3 files changed, 74 insertions(+), 50 deletions(-) diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index 3ea61c20add9c..1d548d05c7ac1 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -18,11 +18,15 @@ package org.apache.hudi.aws.sync; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncConfig; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.glue.GlueAsyncClient; import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; @@ -39,6 +43,8 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; + public class ITTestAwsGlueCatalogSyncTool extends ITTestGlueUtil { @Test @@ -50,6 +56,8 @@ public void testJavaClient() throws IOException, ExecutionException, Interrupted String newCommitTime = client.startCommit(); List> writeRecords = getHoodieRecords(newCommitTime, 1, "driver1"); client.insert(writeRecords, newCommitTime); + writeRecords = getHoodieRecords(newCommitTime, 1, "driver2"); + client.insert(writeRecords, newCommitTime); client.close(); Properties hiveProps = getAwsProperties(); @@ -59,7 +67,7 @@ public void testJavaClient() throws IOException, ExecutionException, Interrupted awsGlueCatalogSyncTool.initSyncClient(new HiveSyncConfig(hiveProps)); awsGlueCatalogSyncTool.syncHoodieTable(); awsGlueCatalogSyncTool.close(); - GlueAsyncClient testclient = getGlueAsyncClient(hiveProps); + GlueAsyncClient testclient = getGlueAsyncClient(); GetDatabaseResponse db = testclient.getDatabase(GetDatabaseRequest.builder().name(DB_NAME).build()).get(); Assertions.assertTrue(db.database().name().equals(DB_NAME)); @@ -67,7 +75,6 @@ public void testJavaClient() throws IOException, ExecutionException, Interrupted Assertions.assertTrue(tbl.table().name().equals(TABLE_NAME)); GetPartitionsResponse partitions = testclient.getPartitions(GetPartitionsRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME).build()).get(); - Assertions.assertEquals(3, partitions.partitions().size()); + Assertions.assertEquals(2, partitions.partitions().size()); } - } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java index 87dc474f1d03c..ec62196d8a537 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java @@ -16,7 +16,7 @@ public class ITTestGlueUtil extends ITTestSyncUtil { protected static final String MOTO_ENDPOINT = "http://localhost:5000"; public static final String AWS_REGION = "eu-west-1"; - protected static Properties getAwsProperties() { + protected Properties getAwsProperties() { Properties hiveProps = new TypedProperties(); hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); hiveProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); @@ -26,16 +26,24 @@ protected static Properties getAwsProperties() { return hiveProps; } - protected static GlueAsyncClient getGlueAsyncClient(Properties hiveProps) throws URISyntaxException { + protected GlueAsyncClient getGlueAsyncClient() throws URISyntaxException { GlueAsyncClient testclient = GlueAsyncClient.builder() - .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(hiveProps)) + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(getAwsProperties())) .endpointOverride(new URI(MOTO_ENDPOINT)) .region(Region.of(AWS_REGION)) .build(); return testclient; } @AfterEach + @Override public void cleanUp() { + super.cleanUp(); // drop database and table + try { + getGlueAsyncClient().deleteDatabase(r -> r.name(DB_NAME)); + getGlueAsyncClient().deleteTable(r -> r.name(TABLE_NAME).databaseName(DB_NAME)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } } } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java index 285b8eace576c..c473b0808425c 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -1,6 +1,8 @@ package org.apache.hudi.aws.sync; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; @@ -10,9 +12,10 @@ import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.index.HoodieIndex; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import java.io.IOException; import java.util.ArrayList; @@ -25,54 +28,60 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; public class ITTestSyncUtil { - protected static final String tablePath = "file:///tmp/hoodie/sample-table"; - protected static final String tableType = "COPY_ON_WRITE"; - protected static final String tableName = "hoodie_rt"; - protected static final String DB_NAME = "db_name"; - protected static final String TABLE_NAME = "tbl_name"; - protected static final Configuration hadoopConf = new Configuration(); + protected static final String TABLE_PATH = "file:///tmp/hoodie/sample-table"; + protected static final String TABLE_TYPE = "COPY_ON_WRITE"; + protected static final String DB_NAME = "db_name"; + protected static final String TABLE_NAME = "tbl_name"; + protected final Configuration hadoopConf = new Configuration(); - protected static void addMetaSyncProps(Properties hiveProps, String parts) { - hiveProps.setProperty(META_SYNC_BASE_PATH.key(), tablePath); - hiveProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), tablePath); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); - } + protected static void addMetaSyncProps(Properties hiveProps, String parts) { + hiveProps.setProperty(META_SYNC_BASE_PATH.key(), TABLE_PATH); + hiveProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), TABLE_PATH); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); + } - protected HoodieJavaWriteClient clientCOW(String avroSchema, Optional hudiPartitions) throws IOException { - HoodieTableMetaClient.PropertyBuilder propertyBuilder = HoodieTableMetaClient.withPropertyBuilder(); - if(hudiPartitions.isPresent()) { - propertyBuilder=propertyBuilder.setPartitionFields(hudiPartitions.get()); - } - propertyBuilder - .setTableType(tableType) - .setTableName(tableName) - .setPayloadClassName(HoodieAvroPayload.class.getName()) - .initTable(hadoopConf, tablePath); + protected HoodieJavaWriteClient clientCOW(String avroSchema, Optional hudiPartitions) throws IOException { + HoodieTableMetaClient.PropertyBuilder propertyBuilder = HoodieTableMetaClient.withPropertyBuilder(); + if(hudiPartitions.isPresent()) { + propertyBuilder=propertyBuilder.setPartitionFields(hudiPartitions.get()); + } + propertyBuilder + .setTableType(TABLE_TYPE) + .setTableName(TABLE_NAME) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hadoopConf, TABLE_PATH); - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) - .withSchema(avroSchema).withParallelism(1, 1) - .withDeleteParallelism(1).forTable(tableName) - .withEmbeddedTimelineServerEnabled(false) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(TABLE_PATH) + .withSchema(avroSchema).withParallelism(1, 1) + .withDeleteParallelism(1).forTable(TABLE_NAME) + .withEmbeddedTimelineServerEnabled(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); - return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); - } + return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); + } - protected static List> getHoodieRecords(String newCommitTime, int numRecords, String...partitionPath) { - HoodieDataGenerator dataGen = new HoodieDataGenerator<>(partitionPath); - List> records = dataGen.generateInserts(newCommitTime, numRecords); - List> recordsSoFar = new ArrayList<>(records); - List> writeRecords = - recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); - return writeRecords; - } - @AfterEach - public void cleanUp() { - // rm hoodie files + protected static List> getHoodieRecords(String newCommitTime, int numRecords, String...partitionPath) { + HoodieDataGenerator dataGen = new HoodieDataGenerator<>(partitionPath); + List> records = dataGen.generateInserts(newCommitTime, numRecords); + List> recordsSoFar = new ArrayList<>(records); + List> writeRecords = + recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); + return writeRecords; + } + protected FileSystem getFs() { + return HadoopFSUtils.getFs(TABLE_PATH, hadoopConf); + } + @BeforeEach + public void cleanUp() { + try { + getFs().delete(new Path(TABLE_PATH), true); + } catch (IOException e) { + throw new RuntimeException("Failed to delete table path " + TABLE_PATH); } + } } From fb0c3abcb071cc96a5686c076f6d385ed60bf398 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sun, 11 Feb 2024 16:50:47 +0100 Subject: [PATCH 11/14] Fix style --- .../sync/ITTestAwsGlueCatalogSyncTool.java | 11 +--- .../apache/hudi/aws/sync/ITTestGlueUtil.java | 64 ++++++++++--------- .../apache/hudi/aws/sync/ITTestSyncUtil.java | 35 +++++----- 3 files changed, 55 insertions(+), 55 deletions(-) diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index 1d548d05c7ac1..31f4239809b1a 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -18,15 +18,13 @@ package org.apache.hudi.aws.sync; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; -import org.junit.jupiter.api.AfterEach; + import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.glue.GlueAsyncClient; import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; @@ -39,19 +37,16 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.List; -import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; -import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; - public class ITTestAwsGlueCatalogSyncTool extends ITTestGlueUtil { @Test public void testJavaClient() throws IOException, ExecutionException, InterruptedException, URISyntaxException { String parts = "driver"; - HoodieJavaWriteClient client = clientCOW(HoodieDataGenerator.TRIP_EXAMPLE_SCHEMA, Optional.of(parts)); + HoodieJavaWriteClient client = clientCOW(HoodieDataGenerator.TRIP_EXAMPLE_SCHEMA, Option.of(parts)); String newCommitTime = client.startCommit(); List> writeRecords = getHoodieRecords(newCommitTime, 1, "driver1"); diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java index ec62196d8a537..9cd71f78f29b7 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java @@ -3,6 +3,7 @@ import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.config.HoodieAWSConfig; + import org.junit.jupiter.api.AfterEach; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.glue.GlueAsyncClient; @@ -13,37 +14,38 @@ public class ITTestGlueUtil extends ITTestSyncUtil { - protected static final String MOTO_ENDPOINT = "http://localhost:5000"; - public static final String AWS_REGION = "eu-west-1"; - - protected Properties getAwsProperties() { - Properties hiveProps = new TypedProperties(); - hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); - hiveProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); - hiveProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); - hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT); - hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), AWS_REGION); - return hiveProps; - } + protected static final String MOTO_ENDPOINT = "http://localhost:5000"; + public static final String AWS_REGION = "eu-west-1"; - protected GlueAsyncClient getGlueAsyncClient() throws URISyntaxException { - GlueAsyncClient testclient = GlueAsyncClient.builder() - .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(getAwsProperties())) - .endpointOverride(new URI(MOTO_ENDPOINT)) - .region(Region.of(AWS_REGION)) - .build(); - return testclient; - } - @AfterEach - @Override - public void cleanUp() { - super.cleanUp(); - // drop database and table - try { - getGlueAsyncClient().deleteDatabase(r -> r.name(DB_NAME)); - getGlueAsyncClient().deleteTable(r -> r.name(TABLE_NAME).databaseName(DB_NAME)); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + protected Properties getAwsProperties() { + Properties hiveProps = new TypedProperties(); + hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), AWS_REGION); + return hiveProps; + } + + protected GlueAsyncClient getGlueAsyncClient() throws URISyntaxException { + GlueAsyncClient testclient = GlueAsyncClient.builder() + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(getAwsProperties())) + .endpointOverride(new URI(MOTO_ENDPOINT)) + .region(Region.of(AWS_REGION)) + .build(); + return testclient; + } + + @AfterEach + @Override + public void cleanUp() { + super.cleanUp(); + // drop database and table + try { + getGlueAsyncClient().deleteDatabase(r -> r.name(DB_NAME)); + getGlueAsyncClient().deleteTable(r -> r.name(TABLE_NAME).databaseName(DB_NAME)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); } + } } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java index c473b0808425c..198f15565beef 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -3,11 +3,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -15,12 +17,12 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.index.HoodieIndex; + import org.junit.jupiter.api.BeforeEach; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; @@ -43,39 +45,40 @@ protected static void addMetaSyncProps(Properties hiveProps, String parts) { hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); } - - protected HoodieJavaWriteClient clientCOW(String avroSchema, Optional hudiPartitions) throws IOException { + protected HoodieJavaWriteClient clientCOW(String avroSchema, Option hudiPartitions) throws IOException { HoodieTableMetaClient.PropertyBuilder propertyBuilder = HoodieTableMetaClient.withPropertyBuilder(); - if(hudiPartitions.isPresent()) { - propertyBuilder=propertyBuilder.setPartitionFields(hudiPartitions.get()); + if (hudiPartitions.isPresent()) { + propertyBuilder = propertyBuilder.setPartitionFields(hudiPartitions.get()); } propertyBuilder - .setTableType(TABLE_TYPE) - .setTableName(TABLE_NAME) - .setPayloadClassName(HoodieAvroPayload.class.getName()) - .initTable(hadoopConf, TABLE_PATH); + .setTableType(TABLE_TYPE) + .setTableName(TABLE_NAME) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hadoopConf, TABLE_PATH); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(TABLE_PATH) - .withSchema(avroSchema).withParallelism(1, 1) - .withDeleteParallelism(1).forTable(TABLE_NAME) - .withEmbeddedTimelineServerEnabled(false) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + .withSchema(avroSchema).withParallelism(1, 1) + .withDeleteParallelism(1).forTable(TABLE_NAME) + .withEmbeddedTimelineServerEnabled(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); } - protected static List> getHoodieRecords(String newCommitTime, int numRecords, String...partitionPath) { + protected static List> getHoodieRecords(String newCommitTime, int numRecords, String... partitionPath) { HoodieDataGenerator dataGen = new HoodieDataGenerator<>(partitionPath); List> records = dataGen.generateInserts(newCommitTime, numRecords); List> recordsSoFar = new ArrayList<>(records); List> writeRecords = - recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); + recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); return writeRecords; } + protected FileSystem getFs() { return HadoopFSUtils.getFs(TABLE_PATH, hadoopConf); } + @BeforeEach public void cleanUp() { try { From 43d691ee54a9a9e5dc9b60cd297f6385d8e89bd4 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sun, 11 Feb 2024 19:05:53 +0100 Subject: [PATCH 12/14] Fix rats --- .../apache/hudi/aws/sync/ITTestGlueUtil.java | 18 ++++++++++++++++++ .../apache/hudi/aws/sync/ITTestSyncUtil.java | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java index 9cd71f78f29b7..e6a3dfe0a3385 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.aws.sync; import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java index 198f15565beef..6563b3d77d14f 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.aws.sync; import org.apache.hadoop.conf.Configuration; From f1d46fb9831136f1899698f8366bba87ab2c166b Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sun, 11 Feb 2024 21:25:17 +0100 Subject: [PATCH 13/14] Refactor --- .../sync/ITTestAwsGlueCatalogSyncTool.java | 53 +++++-------------- .../apache/hudi/aws/sync/ITTestGlueUtil.java | 48 ++++++++++++----- .../apache/hudi/aws/sync/ITTestSyncUtil.java | 53 +++++++++++-------- 3 files changed, 78 insertions(+), 76 deletions(-) diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index 31f4239809b1a..0aad8d27252a1 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -18,58 +18,31 @@ package org.apache.hudi.aws.sync; -import org.apache.hudi.client.HoodieJavaWriteClient; -import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.hive.HiveSyncConfig; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.services.glue.GlueAsyncClient; -import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; -import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; -import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; -import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; -import software.amazon.awssdk.services.glue.model.GetTableRequest; -import software.amazon.awssdk.services.glue.model.GetTableResponse; import java.io.IOException; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Properties; import java.util.concurrent.ExecutionException; public class ITTestAwsGlueCatalogSyncTool extends ITTestGlueUtil { @Test - public void testJavaClient() throws IOException, ExecutionException, InterruptedException, URISyntaxException { - - String parts = "driver"; - HoodieJavaWriteClient client = clientCOW(HoodieDataGenerator.TRIP_EXAMPLE_SCHEMA, Option.of(parts)); - - String newCommitTime = client.startCommit(); - List> writeRecords = getHoodieRecords(newCommitTime, 1, "driver1"); - client.insert(writeRecords, newCommitTime); - writeRecords = getHoodieRecords(newCommitTime, 1, "driver2"); - client.insert(writeRecords, newCommitTime); - client.close(); + public void testWhenCreatePartitionsShouldExistsInGlue() throws IOException, ExecutionException, InterruptedException { + setupPartitions("driver"); - Properties hiveProps = getAwsProperties(); - addMetaSyncProps(hiveProps, parts); + hudiJavaClient = clientCOW(HoodieDataGenerator.TRIP_EXAMPLE_SCHEMA); + String newCommitTime = hudiJavaClient.startCommit(); + hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver1"), newCommitTime); + hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver2"), newCommitTime); - AwsGlueCatalogSyncTool awsGlueCatalogSyncTool = new AwsGlueCatalogSyncTool(hiveProps, hadoopConf); - awsGlueCatalogSyncTool.initSyncClient(new HiveSyncConfig(hiveProps)); - awsGlueCatalogSyncTool.syncHoodieTable(); - awsGlueCatalogSyncTool.close(); - GlueAsyncClient testclient = getGlueAsyncClient(); + getAwsGlueCatalogSyncTool().syncHoodieTable(); - GetDatabaseResponse db = testclient.getDatabase(GetDatabaseRequest.builder().name(DB_NAME).build()).get(); - Assertions.assertTrue(db.database().name().equals(DB_NAME)); - GetTableResponse tbl = testclient.getTable(GetTableRequest.builder().databaseName(DB_NAME).name(TABLE_NAME).build()).get(); - Assertions.assertTrue(tbl.table().name().equals(TABLE_NAME)); + Assertions.assertTrue(glueClient.getDatabase(d -> + d.name(DB_NAME)).get().database().name().equals(DB_NAME)); + Assertions.assertTrue(glueClient.getTable(t -> + t.databaseName(DB_NAME).name(TABLE_NAME)).get().table().name().equals(TABLE_NAME)); - GetPartitionsResponse partitions = testclient.getPartitions(GetPartitionsRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME).build()).get(); - Assertions.assertEquals(2, partitions.partitions().size()); + Assertions.assertEquals(2, glueClient.getPartitions(p -> + p.databaseName(DB_NAME).tableName(TABLE_NAME)).get().partitions().size()); } } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java index e6a3dfe0a3385..4b08f946fde64 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java @@ -21,8 +21,10 @@ import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.config.HoodieAWSConfig; +import org.apache.hudi.hive.HiveSyncConfig; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.glue.GlueAsyncClient; @@ -35,6 +37,32 @@ public class ITTestGlueUtil extends ITTestSyncUtil { protected static final String MOTO_ENDPOINT = "http://localhost:5000"; public static final String AWS_REGION = "eu-west-1"; + + private AwsGlueCatalogSyncTool awsGlueCatalogSyncTool; + protected GlueAsyncClient glueClient; + + @BeforeEach + @Override + public void setup() { + getAwsProperties().forEach((k, v) -> hiveProps.setProperty(k.toString(), v.toString())); + super.setup(); + try { + initGlueClient(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @AfterEach + @Override + public void cleanUp() { + super.cleanUp(); + // drop database and table + glueClient.deleteDatabase(r -> r.name(DB_NAME)); + glueClient.deleteTable(r -> r.name(TABLE_NAME).databaseName(DB_NAME)); + awsGlueCatalogSyncTool.close(); + } + protected Properties getAwsProperties() { Properties hiveProps = new TypedProperties(); hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); @@ -45,25 +73,17 @@ protected Properties getAwsProperties() { return hiveProps; } - protected GlueAsyncClient getGlueAsyncClient() throws URISyntaxException { - GlueAsyncClient testclient = GlueAsyncClient.builder() + protected void initGlueClient() throws URISyntaxException { + glueClient = GlueAsyncClient.builder() .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(getAwsProperties())) .endpointOverride(new URI(MOTO_ENDPOINT)) .region(Region.of(AWS_REGION)) .build(); - return testclient; } - @AfterEach - @Override - public void cleanUp() { - super.cleanUp(); - // drop database and table - try { - getGlueAsyncClient().deleteDatabase(r -> r.name(DB_NAME)); - getGlueAsyncClient().deleteTable(r -> r.name(TABLE_NAME).databaseName(DB_NAME)); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + public AwsGlueCatalogSyncTool getAwsGlueCatalogSyncTool() { + awsGlueCatalogSyncTool = new AwsGlueCatalogSyncTool(hiveProps, hadoopConf); + awsGlueCatalogSyncTool.initSyncClient(new HiveSyncConfig(hiveProps)); + return awsGlueCatalogSyncTool; } } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java index 6563b3d77d14f..964c739fdec4a 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -18,16 +18,11 @@ package org.apache.hudi.aws.sync; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -36,6 +31,10 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import java.io.IOException; @@ -53,25 +52,43 @@ public class ITTestSyncUtil { protected static final String DB_NAME = "db_name"; protected static final String TABLE_NAME = "tbl_name"; protected final Configuration hadoopConf = new Configuration(); + protected final Properties hiveProps = new Properties(); + protected HoodieJavaWriteClient hudiJavaClient; + private HoodieTableMetaClient.PropertyBuilder propertyBuilder = HoodieTableMetaClient.withPropertyBuilder(); - protected static void addMetaSyncProps(Properties hiveProps, String parts) { + @BeforeEach + protected void setup() { hiveProps.setProperty(META_SYNC_BASE_PATH.key(), TABLE_PATH); hiveProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME); hiveProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME); hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), TABLE_PATH); - hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); + + propertyBuilder = propertyBuilder + .setTableType(TABLE_TYPE) + .setTableName(TABLE_NAME) + .setPayloadClassName(HoodieAvroPayload.class.getName()); } - protected HoodieJavaWriteClient clientCOW(String avroSchema, Option hudiPartitions) throws IOException { - HoodieTableMetaClient.PropertyBuilder propertyBuilder = HoodieTableMetaClient.withPropertyBuilder(); - if (hudiPartitions.isPresent()) { - propertyBuilder = propertyBuilder.setPartitionFields(hudiPartitions.get()); + @AfterEach + public void cleanUp() { + try { + getFs().delete(new Path(TABLE_PATH), true); + } catch (IOException e) { + throw new RuntimeException("Failed to delete table path " + TABLE_PATH); } + if (hudiJavaClient != null) { + hudiJavaClient.close(); + } + } + + protected void setupPartitions(String parts) { + hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); + propertyBuilder = propertyBuilder.setPartitionFields(parts); + } + + protected HoodieJavaWriteClient clientCOW(String avroSchema) throws IOException { propertyBuilder - .setTableType(TABLE_TYPE) - .setTableName(TABLE_NAME) - .setPayloadClassName(HoodieAvroPayload.class.getName()) .initTable(hadoopConf, TABLE_PATH); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(TABLE_PATH) @@ -97,12 +114,4 @@ protected FileSystem getFs() { return HadoopFSUtils.getFs(TABLE_PATH, hadoopConf); } - @BeforeEach - public void cleanUp() { - try { - getFs().delete(new Path(TABLE_PATH), true); - } catch (IOException e) { - throw new RuntimeException("Failed to delete table path " + TABLE_PATH); - } - } } From 9add9dd6030aba044c2996be099be8b336720b00 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Sun, 11 Feb 2024 23:35:38 +0100 Subject: [PATCH 14/14] Allow change data generator --- .../hudi/aws/sync/HoodieDataGenerator.java | 52 +++++++++---------- .../aws/sync/HoodieNestedDataGenerator.java | 50 ++++++++++++++++++ .../sync/ITTestAwsGlueCatalogSyncTool.java | 23 +++++++- .../apache/hudi/aws/sync/ITTestGlueUtil.java | 1 - .../apache/hudi/aws/sync/ITTestSyncUtil.java | 36 ++++++++----- 5 files changed, 119 insertions(+), 43 deletions(-) create mode 100644 hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieNestedDataGenerator.java diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java index 608970a91dfdd..eada3c509391c 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java @@ -21,7 +21,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.avro.HoodieAvroUtils; + import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; @@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -53,18 +52,14 @@ public class HoodieDataGenerator> { public static final String[] DEFAULT_PARTITION_PATHS = {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; - public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " - + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"}," - + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," - + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," - + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," - + "{\"name\":\"fare\",\"type\": \"double\"}]}"; - public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); + private static final Random RAND = new Random(46474747); private final Map existingKeys; - private final String[] partitionPaths; + + + private String[] partitionPaths; private int numExistingKeys; public HoodieDataGenerator(String[] partitionPaths) { @@ -80,6 +75,19 @@ public HoodieDataGenerator(String[] partitionPaths, Map k this.existingKeys = keyPartitionMap; } + public String getAvroSchemaString() { + return "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," + + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," + + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + } + + public Schema getAvroSchema() { + return new Schema.Parser().parse(getAvroSchemaString()); + } + /** * Generates a new avro record of the above schema format, retaining the key if optionally provided. */ @@ -91,7 +99,7 @@ public T generateRandomValue(HoodieKey key, String commitTime) { public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, long timestamp) { - GenericRecord rec = new GenericData.Record(avroSchema); + GenericRecord rec = new GenericData.Record(getAvroSchema()); rec.put("uuid", rowKey); rec.put("ts", timestamp); rec.put("rider", riderName); @@ -192,27 +200,15 @@ public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) { return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); } - private Option convertToString(HoodieRecord record) { - try { - String str = HoodieAvroUtils - .bytesToAvro(((HoodieAvroPayload) record.getData()).getRecordBytes(), avroSchema) - .toString(); - str = "{" + str.substring(str.indexOf("\"ts\":")); - return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); - } catch (IOException e) { - return Option.empty(); - } - } - - public List convertToStringList(List> records) { - return records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get) - .collect(Collectors.toList()); - } - public int getNumExistingKeys() { return numExistingKeys; } + public HoodieDataGenerator setPartitionPaths(String[] partitionPaths) { + this.partitionPaths = partitionPaths; + return this; + } + public static class KeyPartition implements Serializable { HoodieKey key; diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieNestedDataGenerator.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieNestedDataGenerator.java new file mode 100644 index 0000000000000..0c3e2bc43fa6c --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieNestedDataGenerator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +public class HoodieNestedDataGenerator extends HoodieDataGenerator { + + @Override + public String getAvroSchemaString() { + return "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"long\"}," + + "{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"}," + + "{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\":\"address\",\"type\":{\"type\":\"record\",\"name\":\"AddressUSRecord\",\"fields\":[{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"string\"}]}}" + + "]}"; + } + + @Override + public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, long timestamp) { + GenericRecord rec = new GenericData.Record(getAvroSchema()); + rec.put("uuid", rowKey); + rec.put("ts", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + GenericRecord address = new GenericData.Record(getAvroSchema().getField("address").schema()); + address.put("city", "paris"); + address.put("state", "france"); + rec.put("address", address); + return rec; + } +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java index 0aad8d27252a1..3c99b0d72ef11 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -30,7 +30,7 @@ public class ITTestAwsGlueCatalogSyncTool extends ITTestGlueUtil { public void testWhenCreatePartitionsShouldExistsInGlue() throws IOException, ExecutionException, InterruptedException { setupPartitions("driver"); - hudiJavaClient = clientCOW(HoodieDataGenerator.TRIP_EXAMPLE_SCHEMA); + hudiJavaClient = clientCOW(); String newCommitTime = hudiJavaClient.startCommit(); hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver1"), newCommitTime); hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver2"), newCommitTime); @@ -41,8 +41,27 @@ public void testWhenCreatePartitionsShouldExistsInGlue() throws IOException, Exe d.name(DB_NAME)).get().database().name().equals(DB_NAME)); Assertions.assertTrue(glueClient.getTable(t -> t.databaseName(DB_NAME).name(TABLE_NAME)).get().table().name().equals(TABLE_NAME)); - Assertions.assertEquals(2, glueClient.getPartitions(p -> p.databaseName(DB_NAME).tableName(TABLE_NAME)).get().partitions().size()); } + + @Test + public void testWhenCreateNestedTableShouldExistsInGlue() throws IOException, ExecutionException, InterruptedException { + setupPartitions("driver"); + setDataGenerator(HoodieNestedDataGenerator.class); + + hudiJavaClient = clientCOW(); + String newCommitTime = hudiJavaClient.startCommit(); + hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver1"), newCommitTime); + + getAwsGlueCatalogSyncTool().syncHoodieTable(); + + Assertions.assertTrue(glueClient.getDatabase(d -> + d.name(DB_NAME)).get().database().name().equals(DB_NAME)); + Assertions.assertTrue(glueClient.getTable(t -> + t.databaseName(DB_NAME).name(TABLE_NAME)).get().table().name().equals(TABLE_NAME)); + Assertions.assertEquals(1, glueClient.getPartitions(p -> + p.databaseName(DB_NAME).tableName(TABLE_NAME)).get().partitions().size()); + } + } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java index 4b08f946fde64..5c7afc6727108 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java @@ -57,7 +57,6 @@ public void setup() { @Override public void cleanUp() { super.cleanUp(); - // drop database and table glueClient.deleteDatabase(r -> r.name(DB_NAME)); glueClient.deleteTable(r -> r.name(TABLE_NAME).databaseName(DB_NAME)); awsGlueCatalogSyncTool.close(); diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java index 964c739fdec4a..2d9a4d969cc54 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -38,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; @@ -54,7 +54,8 @@ public class ITTestSyncUtil { protected final Configuration hadoopConf = new Configuration(); protected final Properties hiveProps = new Properties(); protected HoodieJavaWriteClient hudiJavaClient; - private HoodieTableMetaClient.PropertyBuilder propertyBuilder = HoodieTableMetaClient.withPropertyBuilder(); + private HoodieTableMetaClient.PropertyBuilder propertyBuilder; + private Class dataGenClass; @BeforeEach protected void setup() { @@ -64,22 +65,24 @@ protected void setup() { hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), TABLE_PATH); - propertyBuilder = propertyBuilder + propertyBuilder = HoodieTableMetaClient.withPropertyBuilder() .setTableType(TABLE_TYPE) .setTableName(TABLE_NAME) .setPayloadClassName(HoodieAvroPayload.class.getName()); + + dataGenClass = HoodieDataGenerator.class; } @AfterEach public void cleanUp() { + if (hudiJavaClient != null) { + hudiJavaClient.close(); + } try { getFs().delete(new Path(TABLE_PATH), true); } catch (IOException e) { throw new RuntimeException("Failed to delete table path " + TABLE_PATH); } - if (hudiJavaClient != null) { - hudiJavaClient.close(); - } } protected void setupPartitions(String parts) { @@ -87,12 +90,13 @@ protected void setupPartitions(String parts) { propertyBuilder = propertyBuilder.setPartitionFields(parts); } - protected HoodieJavaWriteClient clientCOW(String avroSchema) throws IOException { + protected HoodieJavaWriteClient clientCOW() throws IOException { propertyBuilder .initTable(hadoopConf, TABLE_PATH); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(TABLE_PATH) - .withSchema(avroSchema).withParallelism(1, 1) + .withSchema(getDataGen().getAvroSchemaString()) + .withParallelism(1, 1) .withDeleteParallelism(1).forTable(TABLE_NAME) .withEmbeddedTimelineServerEnabled(false) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) @@ -101,17 +105,25 @@ protected HoodieJavaWriteClient clientCOW(String avroSchema) return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); } - protected static List> getHoodieRecords(String newCommitTime, int numRecords, String... partitionPath) { - HoodieDataGenerator dataGen = new HoodieDataGenerator<>(partitionPath); + protected List> getHoodieRecords(String newCommitTime, int numRecords, String... partitionPath) { + HoodieDataGenerator dataGen = getDataGen(partitionPath); List> records = dataGen.generateInserts(newCommitTime, numRecords); - List> recordsSoFar = new ArrayList<>(records); List> writeRecords = - recordsSoFar.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); + records.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); return writeRecords; } + private HoodieDataGenerator getDataGen(String... partitionPath) { + HoodieDataGenerator dataGen = ReflectionUtils.loadClass(dataGenClass.getName()); + dataGen.setPartitionPaths(partitionPath); + return dataGen; + } + protected FileSystem getFs() { return HadoopFSUtils.getFs(TABLE_PATH, hadoopConf); } + protected void setDataGenerator(Class dataGenClass) { + this.dataGenClass = dataGenClass; + } }