Skip to content
91 changes: 46 additions & 45 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,56 +53,12 @@
<version>${project.version}</version>
</dependency>

<!-- Hadoop -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parisni : has this change been tested with real glue and emr setup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet. Do you have insight why hadoop/hive depts were added ?

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-lock-client</artifactId>
<version>${dynamodb.lockclient.version}</version>
</dependency>

<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>

<!-- AWS SDK -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down Expand Up @@ -181,6 +137,7 @@
<artifactId>httpclient</artifactId>
<version>${aws.sdk.httpclient.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
Expand All @@ -193,13 +150,57 @@
<version>${aws.sdk.version}</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
<scope>provided</scope>
</dependency>

<!-- Testing -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-exec</artifactId>
<classifier>${hive.exec.classifier}</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-java-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
* limitations under the License.
*/

package org.apache.hudi.metrics.cloudwatch;
package org.apache.hudi.aws.cloudwatch;

import org.apache.hudi.aws.cloudwatch.CloudWatchReporter;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metrics.MetricsReporter;

Expand Down Expand Up @@ -76,4 +75,4 @@ public void stop() {
LOG.info("Stopping CloudWatch Metrics Reporter.");
reporter.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
* limitations under the License.
*/

package org.apache.hudi.metrics.cloudwatch;
package org.apache.hudi.aws.cloudwatch;

import org.apache.hudi.aws.cloudwatch.CloudWatchReporter;
import org.apache.hudi.config.HoodieWriteConfig;

import com.codahale.metrics.MetricRegistry;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
* Class to be used to generate test data.
*/
public class HoodieDataGenerator<T extends HoodieRecordPayload<T>> {

public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02";
public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03";

public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};


private static final Random RAND = new Random(46474747);

private final Map<Integer, KeyPartition> existingKeys;


private String[] partitionPaths;
private int numExistingKeys;

public HoodieDataGenerator(String[] partitionPaths) {
this(partitionPaths, new HashMap<>());
}

public HoodieDataGenerator() {
this(DEFAULT_PARTITION_PATHS);
}

public HoodieDataGenerator(String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeys = keyPartitionMap;
}

public String getAvroSchemaString() {
return "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
}

public Schema getAvroSchema() {
return new Schema.Parser().parse(getAvroSchemaString());
}

/**
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/
@SuppressWarnings("unchecked")
public T generateRandomValue(HoodieKey key, String commitTime) {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0);
return (T) new HoodieAvroPayload(Option.of(rec));
}

public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
long timestamp) {
GenericRecord rec = new GenericData.Record(getAvroSchema());
rec.put("uuid", rowKey);
rec.put("ts", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("fare", RAND.nextDouble() * 100);
return rec;
}

/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public List<HoodieRecord<T>> generateInserts(String commitTime, Integer n) {
return generateInsertsStream(commitTime, n).collect(Collectors.toList());
}

/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public Stream<HoodieRecord<T>> generateInsertsStream(String commitTime, Integer n) {
int currSize = getNumExistingKeys();

return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
existingKeys.put(currSize + i, kp);
numExistingKeys++;
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
});
}

/**
* Generates new inserts, across a single partition path. It also updates the list of existing keys.
*/
public List<HoodieRecord<T>> generateInsertsOnPartition(String commitTime, Integer n, String partitionPath) {
return generateInsertsStreamOnPartition(commitTime, n, partitionPath).collect(Collectors.toList());
}

/**
* Generates new inserts, across a single partition path. It also updates the list of existing keys.
*/
public Stream<HoodieRecord<T>> generateInsertsStreamOnPartition(String commitTime, Integer n, String partitionPath) {
int currSize = getNumExistingKeys();

return IntStream.range(0, n).boxed().map(i -> {
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
existingKeys.put(currSize + i, kp);
numExistingKeys++;
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
});
}

/**
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
* list
*
* @param commitTime Commit Timestamp
* @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
List<HoodieRecord<T>> updates = new ArrayList<>();
for (int i = 0; i < n; i++) {
KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1));
HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
updates.add(record);
}
return updates;
}

/**
* Generates new updates, one for each of the keys above
* list
*
* @param commitTime Commit Timestamp
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUniqueUpdates(String commitTime) {
List<HoodieRecord<T>> updates = new ArrayList<>();
for (int i = 0; i < numExistingKeys; i++) {
KeyPartition kp = existingKeys.get(i);
HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
updates.add(record);
}
return updates;
}

public HoodieRecord<T> generateUpdateRecord(HoodieKey key, String commitTime) {
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
}

public int getNumExistingKeys() {
return numExistingKeys;
}

public HoodieDataGenerator<T> setPartitionPaths(String[] partitionPaths) {
this.partitionPaths = partitionPaths;
return this;
}

public static class KeyPartition implements Serializable {

HoodieKey key;
String partitionPath;
}

public void close() {
existingKeys.clear();
}

}
Loading