diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index 9da22ff32c2ae..4c27b2a14964f 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -53,56 +53,12 @@ ${project.version} - - - org.apache.hadoop - hadoop-common - tests - test - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - - com.amazonaws dynamodb-lock-client ${dynamodb.lockclient.version} - - - ${hive.groupid} - hive-service - ${hive.version} - - - org.slf4j - slf4j-api - - - org.slf4j - slf4j-log4j12 - - - - - - org.apache.parquet - parquet-avro - - software.amazon.awssdk @@ -181,6 +137,7 @@ httpclient ${aws.sdk.httpclient.version} + org.apache.httpcomponents httpcore @@ -193,13 +150,57 @@ ${aws.sdk.version} - + + org.apache.parquet + parquet-avro + ${parquet.version} + provided + + + + + ${hive.groupid} + hive-exec + ${hive.exec.classifier} + test + + + org.pentaho + * + + + org.apache.parquet + * + + + + + + com.esotericsoftware + kryo-shaded + test + + org.apache.hudi hudi-tests-common ${project.version} test + + + org.apache.hudi + hudi-java-client + ${project.version} + test + + + + org.apache.hudi + hudi-client-common + ${project.version} + + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchMetricsReporter.java similarity index 96% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java rename to hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchMetricsReporter.java index d05632b9bbf85..8a9c8baf5c15f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchMetricsReporter.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.hudi.metrics.cloudwatch; +package org.apache.hudi.aws.cloudwatch; -import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metrics.MetricsReporter; @@ -76,4 +75,4 @@ public void stop() { LOG.info("Stopping CloudWatch Metrics Reporter."); reporter.stop(); } -} +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchMetricsReporter.java similarity index 94% rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java rename to hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchMetricsReporter.java index 7901d80246513..ea7603e974daa 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchMetricsReporter.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.hudi.metrics.cloudwatch; +package org.apache.hudi.aws.cloudwatch; -import org.apache.hudi.aws.cloudwatch.CloudWatchReporter; import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.MetricRegistry; diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java new file mode 100644 index 0000000000000..eada3c509391c --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieDataGenerator.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Class to be used to generate test data. + */ +public class HoodieDataGenerator> { + + public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01"; + public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02"; + public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03"; + + public static final String[] DEFAULT_PARTITION_PATHS = + {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; + + + private static final Random RAND = new Random(46474747); + + private final Map existingKeys; + + + private String[] partitionPaths; + private int numExistingKeys; + + public HoodieDataGenerator(String[] partitionPaths) { + this(partitionPaths, new HashMap<>()); + } + + public HoodieDataGenerator() { + this(DEFAULT_PARTITION_PATHS); + } + + public HoodieDataGenerator(String[] partitionPaths, Map keyPartitionMap) { + this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); + this.existingKeys = keyPartitionMap; + } + + public String getAvroSchemaString() { + return "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," + + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," + + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + } + + public Schema getAvroSchema() { + return new Schema.Parser().parse(getAvroSchemaString()); + } + + /** + * Generates a new avro record of the above schema format, retaining the key if optionally provided. + */ + @SuppressWarnings("unchecked") + public T generateRandomValue(HoodieKey key, String commitTime) { + GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0); + return (T) new HoodieAvroPayload(Option.of(rec)); + } + + public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, + long timestamp) { + GenericRecord rec = new GenericData.Record(getAvroSchema()); + rec.put("uuid", rowKey); + rec.put("ts", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + rec.put("begin_lat", RAND.nextDouble()); + rec.put("begin_lon", RAND.nextDouble()); + rec.put("end_lat", RAND.nextDouble()); + rec.put("end_lon", RAND.nextDouble()); + rec.put("fare", RAND.nextDouble() * 100); + return rec; + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public List> generateInserts(String commitTime, Integer n) { + return generateInsertsStream(commitTime, n).collect(Collectors.toList()); + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public Stream> generateInsertsStream(String commitTime, Integer n) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { + String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + KeyPartition kp = new KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + existingKeys.put(currSize + i, kp); + numExistingKeys++; + return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); + }); + } + + /** + * Generates new inserts, across a single partition path. It also updates the list of existing keys. + */ + public List> generateInsertsOnPartition(String commitTime, Integer n, String partitionPath) { + return generateInsertsStreamOnPartition(commitTime, n, partitionPath).collect(Collectors.toList()); + } + + /** + * Generates new inserts, across a single partition path. It also updates the list of existing keys. + */ + public Stream> generateInsertsStreamOnPartition(String commitTime, Integer n, String partitionPath) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + KeyPartition kp = new KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + existingKeys.put(currSize + i, kp); + numExistingKeys++; + return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); + }); + } + + /** + * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned + * list + * + * @param commitTime Commit Timestamp + * @param n Number of updates (including dups) + * @return list of hoodie record updates + */ + public List> generateUpdates(String commitTime, Integer n) { + List> updates = new ArrayList<>(); + for (int i = 0; i < n; i++) { + KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1)); + HoodieRecord record = generateUpdateRecord(kp.key, commitTime); + updates.add(record); + } + return updates; + } + + /** + * Generates new updates, one for each of the keys above + * list + * + * @param commitTime Commit Timestamp + * @return list of hoodie record updates + */ + public List> generateUniqueUpdates(String commitTime) { + List> updates = new ArrayList<>(); + for (int i = 0; i < numExistingKeys; i++) { + KeyPartition kp = existingKeys.get(i); + HoodieRecord record = generateUpdateRecord(kp.key, commitTime); + updates.add(record); + } + return updates; + } + + public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) { + return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); + } + + public int getNumExistingKeys() { + return numExistingKeys; + } + + public HoodieDataGenerator setPartitionPaths(String[] partitionPaths) { + this.partitionPaths = partitionPaths; + return this; + } + + public static class KeyPartition implements Serializable { + + HoodieKey key; + String partitionPath; + } + + public void close() { + existingKeys.clear(); + } + +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieNestedDataGenerator.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieNestedDataGenerator.java new file mode 100644 index 0000000000000..0c3e2bc43fa6c --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/HoodieNestedDataGenerator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +public class HoodieNestedDataGenerator extends HoodieDataGenerator { + + @Override + public String getAvroSchemaString() { + return "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"long\"}," + + "{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"}," + + "{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\":\"address\",\"type\":{\"type\":\"record\",\"name\":\"AddressUSRecord\",\"fields\":[{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"string\"}]}}" + + "]}"; + } + + @Override + public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, long timestamp) { + GenericRecord rec = new GenericData.Record(getAvroSchema()); + rec.put("uuid", rowKey); + rec.put("ts", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + GenericRecord address = new GenericData.Record(getAvroSchema().getField("address").schema()); + address.put("city", "paris"); + address.put("state", "france"); + rec.put("address", address); + return rec; + } +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java new file mode 100644 index 0000000000000..3c99b0d72ef11 --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestAwsGlueCatalogSyncTool.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class ITTestAwsGlueCatalogSyncTool extends ITTestGlueUtil { + + @Test + public void testWhenCreatePartitionsShouldExistsInGlue() throws IOException, ExecutionException, InterruptedException { + setupPartitions("driver"); + + hudiJavaClient = clientCOW(); + String newCommitTime = hudiJavaClient.startCommit(); + hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver1"), newCommitTime); + hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver2"), newCommitTime); + + getAwsGlueCatalogSyncTool().syncHoodieTable(); + + Assertions.assertTrue(glueClient.getDatabase(d -> + d.name(DB_NAME)).get().database().name().equals(DB_NAME)); + Assertions.assertTrue(glueClient.getTable(t -> + t.databaseName(DB_NAME).name(TABLE_NAME)).get().table().name().equals(TABLE_NAME)); + Assertions.assertEquals(2, glueClient.getPartitions(p -> + p.databaseName(DB_NAME).tableName(TABLE_NAME)).get().partitions().size()); + } + + @Test + public void testWhenCreateNestedTableShouldExistsInGlue() throws IOException, ExecutionException, InterruptedException { + setupPartitions("driver"); + setDataGenerator(HoodieNestedDataGenerator.class); + + hudiJavaClient = clientCOW(); + String newCommitTime = hudiJavaClient.startCommit(); + hudiJavaClient.insert(getHoodieRecords(newCommitTime, 1, "driver1"), newCommitTime); + + getAwsGlueCatalogSyncTool().syncHoodieTable(); + + Assertions.assertTrue(glueClient.getDatabase(d -> + d.name(DB_NAME)).get().database().name().equals(DB_NAME)); + Assertions.assertTrue(glueClient.getTable(t -> + t.databaseName(DB_NAME).name(TABLE_NAME)).get().table().name().equals(TABLE_NAME)); + Assertions.assertEquals(1, glueClient.getPartitions(p -> + p.databaseName(DB_NAME).tableName(TABLE_NAME)).get().partitions().size()); + } + +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java new file mode 100644 index 0000000000000..5c7afc6727108 --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGlueUtil.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieAWSConfig; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueAsyncClient; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +public class ITTestGlueUtil extends ITTestSyncUtil { + + protected static final String MOTO_ENDPOINT = "http://localhost:5000"; + public static final String AWS_REGION = "eu-west-1"; + + + private AwsGlueCatalogSyncTool awsGlueCatalogSyncTool; + protected GlueAsyncClient glueClient; + + @BeforeEach + @Override + public void setup() { + getAwsProperties().forEach((k, v) -> hiveProps.setProperty(k.toString(), v.toString())); + super.setup(); + try { + initGlueClient(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @AfterEach + @Override + public void cleanUp() { + super.cleanUp(); + glueClient.deleteDatabase(r -> r.name(DB_NAME)); + glueClient.deleteTable(r -> r.name(TABLE_NAME).databaseName(DB_NAME)); + awsGlueCatalogSyncTool.close(); + } + + protected Properties getAwsProperties() { + Properties hiveProps = new TypedProperties(); + hiveProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy"); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT); + hiveProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), AWS_REGION); + return hiveProps; + } + + protected void initGlueClient() throws URISyntaxException { + glueClient = GlueAsyncClient.builder() + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(getAwsProperties())) + .endpointOverride(new URI(MOTO_ENDPOINT)) + .region(Region.of(AWS_REGION)) + .build(); + } + + public AwsGlueCatalogSyncTool getAwsGlueCatalogSyncTool() { + awsGlueCatalogSyncTool = new AwsGlueCatalogSyncTool(hiveProps, hadoopConf); + awsGlueCatalogSyncTool.initSyncClient(new HiveSyncConfig(hiveProps)); + return awsGlueCatalogSyncTool; + } +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java new file mode 100644 index 0000000000000..2d9a4d969cc54 --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestSyncUtil.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.index.HoodieIndex; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; + +public class ITTestSyncUtil { + protected static final String TABLE_PATH = "file:///tmp/hoodie/sample-table"; + protected static final String TABLE_TYPE = "COPY_ON_WRITE"; + protected static final String DB_NAME = "db_name"; + protected static final String TABLE_NAME = "tbl_name"; + protected final Configuration hadoopConf = new Configuration(); + protected final Properties hiveProps = new Properties(); + protected HoodieJavaWriteClient hudiJavaClient; + private HoodieTableMetaClient.PropertyBuilder propertyBuilder; + private Class dataGenClass; + + @BeforeEach + protected void setup() { + hiveProps.setProperty(META_SYNC_BASE_PATH.key(), TABLE_PATH); + hiveProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME); + hiveProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), TABLE_PATH); + + propertyBuilder = HoodieTableMetaClient.withPropertyBuilder() + .setTableType(TABLE_TYPE) + .setTableName(TABLE_NAME) + .setPayloadClassName(HoodieAvroPayload.class.getName()); + + dataGenClass = HoodieDataGenerator.class; + } + + @AfterEach + public void cleanUp() { + if (hudiJavaClient != null) { + hudiJavaClient.close(); + } + try { + getFs().delete(new Path(TABLE_PATH), true); + } catch (IOException e) { + throw new RuntimeException("Failed to delete table path " + TABLE_PATH); + } + } + + protected void setupPartitions(String parts) { + hiveProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), parts); + propertyBuilder = propertyBuilder.setPartitionFields(parts); + } + + protected HoodieJavaWriteClient clientCOW() throws IOException { + propertyBuilder + .initTable(hadoopConf, TABLE_PATH); + + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(TABLE_PATH) + .withSchema(getDataGen().getAvroSchemaString()) + .withParallelism(1, 1) + .withDeleteParallelism(1).forTable(TABLE_NAME) + .withEmbeddedTimelineServerEnabled(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + + return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); + } + + protected List> getHoodieRecords(String newCommitTime, int numRecords, String... partitionPath) { + HoodieDataGenerator dataGen = getDataGen(partitionPath); + List> records = dataGen.generateInserts(newCommitTime, numRecords); + List> writeRecords = + records.stream().map(r -> new HoodieAvroRecord<>(r)).collect(Collectors.toList()); + return writeRecords; + } + + private HoodieDataGenerator getDataGen(String... partitionPath) { + HoodieDataGenerator dataGen = ReflectionUtils.loadClass(dataGenClass.getName()); + dataGen.setPartitionPaths(partitionPath); + return dataGen; + } + + protected FileSystem getFs() { + return HadoopFSUtils.getFs(TABLE_PATH, hadoopConf); + } + + protected void setDataGenerator(Class dataGenClass) { + this.dataGenClass = dataGenClass; + } +} diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 67705edb316f2..91599a7312560 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -45,9 +45,8 @@ org.apache.hudi - hudi-aws + hudi-hive-sync ${project.version} - provided org.apache.hudi diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index 27034735a040c..418983d0508ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -24,7 +24,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter; import org.apache.hudi.metrics.custom.CustomizableMetricsReporter; import org.apache.hudi.metrics.datadog.DatadogMetricsReporter; import org.apache.hudi.metrics.prometheus.PrometheusReporter; @@ -87,7 +86,7 @@ public static Option createReporter(HoodieWriteConfig config, M reporter = new ConsoleMetricsReporter(registry); break; case CLOUDWATCH: - reporter = new CloudWatchMetricsReporter(config, registry); + reporter = (MetricsReporter) ReflectionUtils.loadClass("org.apache.hudi.aws.cloudwatch.CloudWatchMetricsReporter", config, registry); break; default: LOG.error("Reporter type[" + type + "] is not supported.");