diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index c8a58d5a588b9..5ef04650a6c99 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -402,6 +402,14 @@ test + + + + com.amazonaws + aws-java-sdk-sqs + ${aws.sdk.version} + + ${hive.groupid} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index dd841f4276042..a217e6b7a8009 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -42,45 +42,51 @@ public class HoodieIncrSource extends RowSource { private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class); - protected static class Config { + static class Config { /** * {@value #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie table. */ - private static final String HOODIE_SRC_BASE_PATH = "hoodie.deltastreamer.source.hoodieincr.path"; + static final String HOODIE_SRC_BASE_PATH = "hoodie.deltastreamer.source.hoodieincr.path"; /** * {@value #NUM_INSTANTS_PER_FETCH} allows the max number of instants whose changes can be incrementally fetched. */ - private static final String NUM_INSTANTS_PER_FETCH = "hoodie.deltastreamer.source.hoodieincr.num_instants"; - private static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1; + static final String NUM_INSTANTS_PER_FETCH = "hoodie.deltastreamer.source.hoodieincr.num_instants"; + static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1; /** * {@value #HOODIE_SRC_PARTITION_FIELDS} specifies partition fields that needs to be added to source table after * parsing _hoodie_partition_path. */ - private static final String HOODIE_SRC_PARTITION_FIELDS = "hoodie.deltastreamer.source.hoodieincr.partition.fields"; + static final String HOODIE_SRC_PARTITION_FIELDS = "hoodie.deltastreamer.source.hoodieincr.partition.fields"; /** * {@value #HOODIE_SRC_PARTITION_EXTRACTORCLASS} PartitionValueExtractor class to extract partition fields from * _hoodie_partition_path. */ - private static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS = + static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS = "hoodie.deltastreamer.source.hoodieincr.partition.extractor.class"; - private static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS = + static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName(); /** * {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to incrementally fetch from latest committed * instant when checkpoint is not provided. */ - private static final String READ_LATEST_INSTANT_ON_MISSING_CKPT = + static final String READ_LATEST_INSTANT_ON_MISSING_CKPT = "hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt"; - private static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false; + static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false; + + /** + * {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet. + */ + static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format"; + static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet"; } public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { + SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); } @@ -123,10 +129,10 @@ public Pair>, String> fetchNextBatch(Option lastCkpt /* * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema()); - * + * * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema * = newSchema.add(field, DataTypes.StringType, true); } - * + * * /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if * configured * @@ -139,7 +145,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt * "#partition-fields != #partition-values-extracted"); List rowObjs = new * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return * RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema)); - * + * * log.info("Validated Source Schema :" + validated.schema()); */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java new file mode 100644 index 0000000000000..79e4abbcf9000 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX; + +/** + * This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}. + */ +public class S3EventsHoodieIncrSource extends HoodieIncrSource { + + private static final Logger LOG = LogManager.getLogger(S3EventsHoodieIncrSource.class); + + static class Config { + // control whether we do existence check for files before consuming them + static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists"; + static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false; + } + + public S3EventsHoodieIncrSource( + TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + } + + @Override + public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH)); + String srcPath = props.getString(HOODIE_SRC_BASE_PATH); + int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH); + boolean readLatestOnMissingCkpt = props.getBoolean( + READ_LATEST_INSTANT_ON_MISSING_CKPT, DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT); + + // Use begin Instant if set and non-empty + Option beginInstant = + lastCkptStr.isPresent() + ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr + : Option.empty(); + + Pair instantEndpts = + IncrSourceHelper.calculateBeginAndEndInstants( + sparkContext, srcPath, numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt); + + if (instantEndpts.getKey().equals(instantEndpts.getValue())) { + LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); + return Pair.of(Option.empty(), instantEndpts.getKey()); + } + + // Do incremental pull. Set end instant if available. + DataFrameReader metaReader = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft()) + .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight()); + Dataset source = metaReader.load(srcPath); + // Extract distinct file keys from s3 meta hoodie table + final List cloudMetaDf = source + .filter("s3.object.size > 0") + .select("s3.bucket.name", "s3.object.key") + .distinct() + .collectAsList(); + // Create S3 paths + final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK); + List cloudFiles = new ArrayList<>(); + for (Row row : cloudMetaDf) { + // construct file path, row index 0 refers to bucket and 1 refers to key + String bucket = row.getString(0); + String filePath = S3_PREFIX + bucket + "/" + row.getString(1); + if (checkExists) { + FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket, sparkSession.sparkContext().hadoopConfiguration()); + try { + if (fs.exists(new Path(filePath))) { + cloudFiles.add(filePath); + } + } catch (IOException e) { + LOG.error(String.format("Error while checking path exists for %s ", filePath), e); + } + } else { + cloudFiles.add(filePath); + } + } + String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT); + Option> dataset = Option.empty(); + if (!cloudFiles.isEmpty()) { + dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0]))); + } + return Pair.of(dataset, instantEndpts.getRight()); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java new file mode 100644 index 0000000000000..717c414a29bcf --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.util.ArrayList; +import java.util.List; + +/** + * This source provides capability to create the hudi table for S3 events metadata (eg. S3 + * put events data). It will use the SQS for receiving the object key events. This can be useful + * to check S3 files activity over time. The hudi table created by this source is consumed by + * {@link S3EventsHoodieIncrSource} to apply changes to the hudi table corresponding to user data. + */ +public class S3EventsSource extends RowSource { + + private final S3EventsMetaSelector pathSelector; + private final List processedMessages = new ArrayList<>(); + AmazonSQS sqs; + + public S3EventsSource( + TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + this.pathSelector = S3EventsMetaSelector.createSourceSelector(props); + this.sqs = this.pathSelector.createAmazonSqsClient(); + } + + /** + * Fetches next events from the queue. + * + * @param lastCkptStr The last checkpoint instant string, empty if first run. + * @param sourceLimit Limit on the size of data to fetch. For {@link S3EventsSource}, + * {@link org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config#S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH} is used. + * @return A pair of dataset of event records and the next checkpoint instant string + */ + @Override + public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + Pair, String> selectPathsWithLatestSqsMessage = + pathSelector.getNextEventsFromQueue(sqs, lastCkptStr, processedMessages); + if (selectPathsWithLatestSqsMessage.getLeft().isEmpty()) { + return Pair.of(Option.empty(), selectPathsWithLatestSqsMessage.getRight()); + } else { + Dataset eventRecords = sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(), Encoders.STRING()); + return Pair.of( + Option.of(sparkSession.read().json(eventRecords)), + selectPathsWithLatestSqsMessage.getRight()); + } + } + + @Override + public void onCommit(String lastCkptStr) { + pathSelector.deleteProcessedMessages(sqs, pathSelector.queueUrl, processedMessages); + processedMessages.clear(); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java new file mode 100644 index 0000000000000..7252494d989a8 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.TypedProperties; + +import com.amazonaws.regions.Regions; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.BatchResultErrorEntry; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.GetQueueAttributesResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.json.JSONObject; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class has methods for processing cloud objects. + * It currently supports only AWS S3 objects and AWS SQS queue. + */ +public class CloudObjectsSelector { + public static final List ALLOWED_S3_EVENT_PREFIX = + Collections.singletonList("ObjectCreated"); + public static final String S3_PREFIX = "s3://"; + public static volatile Logger log = LogManager.getLogger(CloudObjectsSelector.class); + public static final String SQS_ATTR_APPROX_MESSAGES = "ApproximateNumberOfMessages"; + static final String SQS_MODEL_MESSAGE = "Message"; + static final String SQS_MODEL_EVENT_RECORDS = "Records"; + static final String SQS_MODEL_EVENT_NAME = "eventName"; + static final String S3_MODEL_EVENT_TIME = "eventTime"; + static final String S3_FILE_SIZE = "fileSize"; + static final String S3_FILE_PATH = "filePath"; + public final String queueUrl; + public final int longPollWait; + public final int maxMessagesPerRequest; + public final int maxMessagePerBatch; + public final int visibilityTimeout; + public final TypedProperties props; + public final String fsName; + private final String regionName; + + /** + * Cloud Objects Selector Class. {@link CloudObjectsSelector} + */ + public CloudObjectsSelector(TypedProperties props) { + DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.S3_SOURCE_QUEUE_URL, Config.S3_SOURCE_QUEUE_REGION)); + this.props = props; + this.queueUrl = props.getString(Config.S3_SOURCE_QUEUE_URL); + this.regionName = props.getString(Config.S3_SOURCE_QUEUE_REGION); + this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS, "s3").toLowerCase(); + this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20); + this.maxMessagePerBatch = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5); + this.visibilityTimeout = props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30); + this.maxMessagesPerRequest = 10; + } + + /** + * Get SQS queue attributes. + * + * @param sqsClient AWSClient for sqsClient + * @param queueUrl queue full url + * @return map of attributes needed + */ + protected Map getSqsQueueAttributes(AmazonSQS sqsClient, String queueUrl) { + GetQueueAttributesResult queueAttributesResult = sqsClient.getQueueAttributes( + new GetQueueAttributesRequest(queueUrl).withAttributeNames(SQS_ATTR_APPROX_MESSAGES) + ); + return queueAttributesResult.getAttributes(); + } + + /** + * Get the file attributes filePath, eventTime and size from JSONObject record. + * + * @param record of object event + * @return map of file attribute + */ + protected Map getFileAttributesFromRecord(JSONObject record) throws UnsupportedEncodingException { + Map fileRecord = new HashMap<>(); + String eventTimeStr = record.getString(S3_MODEL_EVENT_TIME); + long eventTime = + Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime(); + JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object"); + String bucket = URLDecoder.decode(record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8"); + String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8"); + String filePath = this.fsName + "://" + bucket + "/" + key; + fileRecord.put(S3_MODEL_EVENT_TIME, eventTime); + fileRecord.put(S3_FILE_SIZE, s3Object.getLong("size")); + fileRecord.put(S3_FILE_PATH, filePath); + return fileRecord; + } + + /** + * Amazon SQS Client Builder. + */ + public AmazonSQS createAmazonSqsClient() { + return AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build(); + } + + /** + * List messages from queue. + */ + protected List getMessagesToProcess( + AmazonSQS sqsClient, + String queueUrl, + int longPollWait, + int visibilityTimeout, + int maxMessagePerBatch, + int maxMessagesPerRequest) { + List messagesToProcess = new ArrayList<>(); + ReceiveMessageRequest receiveMessageRequest = + new ReceiveMessageRequest() + .withQueueUrl(queueUrl) + .withWaitTimeSeconds(longPollWait) + .withVisibilityTimeout(visibilityTimeout); + receiveMessageRequest.setMaxNumberOfMessages(maxMessagesPerRequest); + // Get count for available messages + Map queueAttributesResult = getSqsQueueAttributes(sqsClient, queueUrl); + long approxMessagesAvailable = Long.parseLong(queueAttributesResult.get(SQS_ATTR_APPROX_MESSAGES)); + log.info("Approximately " + approxMessagesAvailable + " messages available in queue."); + long numMessagesToProcess = Math.min(approxMessagesAvailable, maxMessagePerBatch); + for (int i = 0; + i < (int) Math.ceil((double) numMessagesToProcess / maxMessagesPerRequest); + ++i) { + List messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); + log.debug("Number of messages: " + messages.size()); + messagesToProcess.addAll(messages); + if (messages.isEmpty()) { + // ApproximateNumberOfMessages value is eventually consistent. + // So, we still need to check and break if there are no messages. + break; + } + } + return messagesToProcess; + } + + /** + * Create partitions of list using specific batch size. we can't use third party API for this + * functionality, due to https://github.com/apache/hudi/blob/master/style/checkstyle.xml#L270 + */ + protected List> createListPartitions(List singleList, int eachBatchSize) { + List> listPartitions = new ArrayList<>(); + if (singleList.size() == 0 || eachBatchSize < 1) { + return listPartitions; + } + for (int start = 0; start < singleList.size(); start += eachBatchSize) { + int end = Math.min(start + eachBatchSize, singleList.size()); + if (start > end) { + throw new IndexOutOfBoundsException( + "Index " + start + " is out of the list range <0," + (singleList.size() - 1) + ">"); + } + listPartitions.add(new ArrayList<>(singleList.subList(start, end))); + } + return listPartitions; + } + + /** + * Delete batch of messages from queue. + */ + protected void deleteBatchOfMessages(AmazonSQS sqs, String queueUrl, List messagesToBeDeleted) { + DeleteMessageBatchRequest deleteBatchReq = + new DeleteMessageBatchRequest().withQueueUrl(queueUrl); + List deleteEntries = deleteBatchReq.getEntries(); + for (Message message : messagesToBeDeleted) { + deleteEntries.add( + new DeleteMessageBatchRequestEntry() + .withId(message.getMessageId()) + .withReceiptHandle(message.getReceiptHandle())); + } + DeleteMessageBatchResult deleteResult = sqs.deleteMessageBatch(deleteBatchReq); + List deleteFailures = + deleteResult.getFailed().stream() + .map(BatchResultErrorEntry::getId) + .collect(Collectors.toList()); + if (!deleteFailures.isEmpty()) { + log.warn( + "Failed to delete " + + deleteFailures.size() + + " messages out of " + + deleteEntries.size() + + " from queue."); + } else { + log.info("Successfully deleted " + deleteEntries.size() + " messages from queue."); + } + } + + /** + * Delete Queue Messages after hudi commit. This method will be invoked by source.onCommit. + */ + public void deleteProcessedMessages(AmazonSQS sqs, String queueUrl, List processedMessages) { + if (!processedMessages.isEmpty()) { + // create batch for deletion, SES DeleteMessageBatchRequest only accept max 10 entries + List> deleteBatches = createListPartitions(processedMessages, 10); + for (List deleteBatch : deleteBatches) { + deleteBatchOfMessages(sqs, queueUrl, deleteBatch); + } + } + } + + /** + * Configs supported. + */ + public static class Config { + private static final String HOODIE_DELTASTREAMER_S3_SOURCE = "hoodie.deltastreamer.s3.source"; + /** + * {@value #S3_SOURCE_QUEUE_URL} is the queue url for cloud object events. + */ + public static final String S3_SOURCE_QUEUE_URL = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.url"; + + /** + * {@value #S3_SOURCE_QUEUE_REGION} is the case-sensitive region name of the cloud provider for the queue. For example, "us-east-1". + */ + public static final String S3_SOURCE_QUEUE_REGION = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.region"; + + /** + * {@value #S3_SOURCE_QUEUE_FS} is file system corresponding to queue. For example, for AWS SQS it is s3/s3a. + */ + public static final String S3_SOURCE_QUEUE_FS = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.fs"; + + /** + * {@value #S3_QUEUE_LONG_POLL_WAIT} is the long poll wait time in seconds If set as 0 then + * client will fetch on short poll basis. + */ + public static final String S3_QUEUE_LONG_POLL_WAIT = + HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.long.poll.wait"; + + /** + * {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH} is max messages for each batch of delta streamer + * run. Source will process these maximum number of message at a time. + */ + public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH = + HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch"; + + /** + * {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for messages in queue. After we + * consume the message, queue will move the consumed messages to in-flight state, these messages + * can't be consumed again by source for this timeout period. + */ + public static final String S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT = + HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.visibility.timeout"; + + /** + * {@value #SOURCE_INPUT_SELECTOR} source input selector. + */ + public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector"; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java new file mode 100644 index 0000000000000..68ac7aba5cb13 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.json.JSONException; +import org.json.JSONObject; + +import java.io.IOException; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * S3 events metadata selector class. This class provides methods to process the + * messages from SQS for {@link org.apache.hudi.utilities.sources.S3EventsSource}. + */ +public class S3EventsMetaSelector extends CloudObjectsSelector { + + private static final String S3_EVENT_RESPONSE_ELEMENTS = "responseElements"; + + /** + * Cloud Objects Meta Selector Class. {@link CloudObjectsSelector} + */ + public S3EventsMetaSelector(TypedProperties props) { + super(props); + } + + /** + * Factory method for creating custom CloudObjectsMetaSelector. Default selector to use is {@link + * S3EventsMetaSelector} + */ + public static S3EventsMetaSelector createSourceSelector(TypedProperties props) { + String sourceSelectorClass = + props.getString( + S3EventsMetaSelector.Config.SOURCE_INPUT_SELECTOR, + S3EventsMetaSelector.class.getName()); + try { + S3EventsMetaSelector selector = + (S3EventsMetaSelector) + ReflectionUtils.loadClass( + sourceSelectorClass, new Class[] {TypedProperties.class}, props); + + log.info("Using path selector " + selector.getClass().getName()); + return selector; + } catch (Exception e) { + throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e); + } + } + + /** + * List messages from queue, filter out illegible events while doing so. It will also delete the + * ineligible messages from queue. + * + * @param processedMessages array of processed messages to add more messages + * @return the filtered list of valid S3 events in SQS. + */ + protected List> getValidEvents(AmazonSQS sqs, List processedMessages) throws IOException { + List messages = + getMessagesToProcess( + sqs, + this.queueUrl, + this.longPollWait, + this.visibilityTimeout, + this.maxMessagePerBatch, + this.maxMessagesPerRequest); + return processAndDeleteInvalidMessages(processedMessages, messages); + } + + private List> processAndDeleteInvalidMessages(List processedMessages, + List messages) throws IOException { + List> validEvents = new ArrayList<>(); + for (Message message : messages) { + JSONObject messageBody = new JSONObject(message.getBody()); + Map messageMap; + ObjectMapper mapper = new ObjectMapper(); + if (messageBody.has(SQS_MODEL_MESSAGE)) { + // If this messages is from S3Event -> SNS -> SQS + messageMap = (Map) mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class); + } else { + // If this messages is from S3Event -> SQS + messageMap = (Map) mapper.readValue(messageBody.toString(), Map.class); + } + if (messageMap.containsKey(SQS_MODEL_EVENT_RECORDS)) { + List> events = (List>) messageMap.get(SQS_MODEL_EVENT_RECORDS); + for (Map event : events) { + event.remove(S3_EVENT_RESPONSE_ELEMENTS); + String eventName = (String) event.get(SQS_MODEL_EVENT_NAME); + // filter only allowed s3 event types + if (ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) { + validEvents.add(event); + } else { + log.debug(String.format("This S3 event %s is not allowed, so ignoring it.", eventName)); + } + } + } else { + log.debug(String.format("Message is not expected format or it's s3:TestEvent. Message: %s", message)); + } + processedMessages.add(message); + } + return validEvents; + } + + /** + * Get the list of events from queue. + * + * @param lastCheckpointStr The last checkpoint instant string, empty if first run. + * @return A pair of dataset of event records and the next checkpoint instant string. + */ + public Pair, String> getNextEventsFromQueue(AmazonSQS sqs, + Option lastCheckpointStr, + List processedMessages) { + processedMessages.clear(); + log.info("Reading messages...."); + try { + log.info("Start Checkpoint : " + lastCheckpointStr); + List> eventRecords = getValidEvents(sqs, processedMessages); + log.info("Number of valid events: " + eventRecords.size()); + List filteredEventRecords = new ArrayList<>(); + long newCheckpointTime = eventRecords.stream() + .mapToLong(eventRecord -> Date.from(Instant.from( + DateTimeFormatter.ISO_INSTANT.parse((String) eventRecord.get(S3_MODEL_EVENT_TIME)))) + .getTime()).max().orElse(lastCheckpointStr.map(Long::parseLong).orElse(0L)); + + for (Map eventRecord : eventRecords) { + filteredEventRecords.add(new ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "=")); + } + return new ImmutablePair<>(filteredEventRecords, String.valueOf(newCheckpointTime)); + } catch (JSONException | IOException e) { + throw new HoodieException("Unable to read from SQS: ", e); + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java new file mode 100644 index 0000000000000..3d89dc2bc9dec --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; +import org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; + +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_FS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Basic tests for {@link S3EventsSource}. + */ +public class TestS3EventsSource extends AbstractCloudObjectsSourceTestBase { + + @BeforeEach + public void setup() throws Exception { + super.setup(); + this.dfsRoot = dfsBasePath + "/parquetFiles"; + this.fileSuffix = ".parquet"; + dfs.mkdirs(new Path(dfsRoot)); + } + + @AfterEach + public void teardown() throws Exception { + super.teardown(); + } + + /** + * Runs the test scenario of reading data from the source. + * + * @throws IOException + */ + @Test + public void testReadingFromSource() throws IOException { + + SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareCloudObjectSource()); + + // 1. Extract without any checkpoint => (no data available) + generateMessageInQueue(null); + assertEquals( + Option.empty(), + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + + // 2. Extract without any checkpoint => (adding new file) + generateMessageInQueue("1"); + + // Test fetching Avro format + InputBatch> fetch1 = + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(1, fetch1.getBatch().get().count()); + + // 3. Produce new data, extract new data + generateMessageInQueue("2"); + // Test fetching Avro format + InputBatch> fetch2 = + sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(1, fetch2.getBatch().get().count()); + + GenericRecord s3 = (GenericRecord) fetch2.getBatch().get().rdd().first().get("s3"); + GenericRecord s3Object = (GenericRecord) s3.get("object"); + assertEquals("2.parquet", s3Object.get("key").toString()); + } + + @Override + public Source prepareCloudObjectSource() { + TypedProperties props = new TypedProperties(); + props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl); + props.setProperty(S3_SOURCE_QUEUE_REGION, regionName); + props.setProperty(S3_SOURCE_QUEUE_FS, "hdfs"); + S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession, null); + dfsSource.sqs = this.sqs; + return dfsSource; + } + + @Override + public void writeNewDataToFile(List records, Path path) throws IOException { + Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java new file mode 100644 index 0000000000000..02eaccf1f7282 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.utilities.testutils.CloudObjectTestUtils; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.json.JSONObject; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_PATH; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_SIZE; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_MODEL_EVENT_TIME; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_EVENT_RECORDS; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_MESSAGE; +import static org.apache.hudi.utilities.testutils.CloudObjectTestUtils.deleteMessagesInQueue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestCloudObjectsSelector extends HoodieClientTestHarness { + + static final String REGION_NAME = "us-east-1"; + + TypedProperties props; + String sqsUrl; + + @Mock + AmazonSQS sqs; + + @Mock + private CloudObjectsSelector cloudObjectsSelector; + + @BeforeEach + void setUp() { + initSparkContexts(); + initPath(); + initFileSystem(); + MockitoAnnotations.initMocks(this); + + props = new TypedProperties(); + sqsUrl = "test-queue"; + props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl); + props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME); + } + + @AfterEach + public void teardown() throws Exception { + Mockito.reset(cloudObjectsSelector); + cleanupResources(); + } + + @ParameterizedTest + @ValueSource(classes = {CloudObjectsSelector.class}) + public void testSqsQueueAttributesShouldReturnsRequiredAttribute(Class clazz) { + CloudObjectsSelector selector = + (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props); + + // setup the queue attributes + CloudObjectTestUtils.setMessagesInQueue(sqs, null); + + // test the return values + Map queueAttributes = selector.getSqsQueueAttributes(sqs, sqsUrl); + assertEquals(1, queueAttributes.size()); + // ApproximateNumberOfMessages is a required queue attribute for Cloud object selector + assertEquals("0", queueAttributes.get(SQS_ATTR_APPROX_MESSAGES)); + } + + @ParameterizedTest + @ValueSource(classes = {CloudObjectsSelector.class}) + public void testFileAttributesFromRecordShouldReturnsExpectOutput(Class clazz) + throws IOException { + + CloudObjectsSelector selector = + (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props); + + // setup s3 record + String bucket = "test-bucket"; + String key = "test/year=test1/month=test2/day=test3/part-foo-bar.snappy.parquet"; + + String s3Records = + "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"1\",\n \"TopicArn\" : \"arn:aws:sns:foo:123:" + + "foo-bar\",\n \"Subject\" : \"Amazon S3 Notification\",\n \"Message\" : \"{\\\"Records\\\":" + + "[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us" + + "-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated" + + ":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":" + + "{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\"" + + "test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\"" + + "configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\"" + + bucket + + "\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}" + + ",\\\"object\\\":{\\\"key\\\":\\\"" + + key + + "\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}"; + JSONObject messageBody = new JSONObject(s3Records); + Map messageMap = new HashMap<>(); + if (messageBody.has(SQS_MODEL_MESSAGE)) { + ObjectMapper mapper = new ObjectMapper(); + messageMap = + (Map) mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class); + } + List> records = (List>) messageMap.get(SQS_MODEL_EVENT_RECORDS); + + // test the return values + Map fileAttributes = + selector.getFileAttributesFromRecord(new JSONObject(records.get(0))); + + assertEquals(3, fileAttributes.size()); + assertEquals(123L, (long) fileAttributes.get(S3_FILE_SIZE)); + assertEquals(S3_PREFIX + bucket + "/" + key, fileAttributes.get(S3_FILE_PATH)); + assertEquals(1627376736755L, (long) fileAttributes.get(S3_MODEL_EVENT_TIME)); + } + + @ParameterizedTest + @ValueSource(classes = {CloudObjectsSelector.class}) + public void testCreateListPartitionsReturnsExpectedSetOfBatch(Class clazz) { + + CloudObjectsSelector selector = + (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props); + + // setup lists + List testSingleList = new ArrayList<>(); + testSingleList.add(new Message().addAttributesEntry("id", "1")); + testSingleList.add(new Message().addAttributesEntry("id", "2")); + testSingleList.add(new Message().addAttributesEntry("id", "3")); + testSingleList.add(new Message().addAttributesEntry("id", "4")); + testSingleList.add(new Message().addAttributesEntry("id", "5")); + + List expectedFirstList = new ArrayList<>(); + expectedFirstList.add(new Message().addAttributesEntry("id", "1")); + expectedFirstList.add(new Message().addAttributesEntry("id", "2")); + + List expectedSecondList = new ArrayList<>(); + expectedSecondList.add(new Message().addAttributesEntry("id", "3")); + expectedSecondList.add(new Message().addAttributesEntry("id", "4")); + + List expectedFinalList = new ArrayList<>(); + expectedFinalList.add(new Message().addAttributesEntry("id", "5")); + + // test the return values + List> partitionedList = selector.createListPartitions(testSingleList, 2); + + assertEquals(3, partitionedList.size()); + assertEquals(expectedFirstList, partitionedList.get(0)); + assertEquals(expectedSecondList, partitionedList.get(1)); + assertEquals(expectedFinalList, partitionedList.get(2)); + } + + @ParameterizedTest + @ValueSource(classes = {CloudObjectsSelector.class}) + public void testCreateListPartitionsReturnsEmptyIfBatchSizeIsZero(Class clazz) { + + CloudObjectsSelector selector = + (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props); + + // setup lists + List testSingleList = new ArrayList<>(); + testSingleList.add(new Message().addAttributesEntry("id", "1")); + testSingleList.add(new Message().addAttributesEntry("id", "2")); + + // test the return values + List> partitionedList = selector.createListPartitions(testSingleList, 0); + + assertEquals(0, partitionedList.size()); + } + + @ParameterizedTest + @ValueSource(classes = {CloudObjectsSelector.class}) + public void testOnCommitDeleteProcessedMessages(Class clazz) { + + CloudObjectsSelector selector = + (CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props); + + // setup lists + List testSingleList = new ArrayList<>(); + testSingleList.add( + new Message() + .addAttributesEntry("MessageId", "1") + .addAttributesEntry("ReceiptHandle", "1")); + testSingleList.add( + new Message() + .addAttributesEntry("MessageId", "2") + .addAttributesEntry("ReceiptHandle", "1")); + + deleteMessagesInQueue(sqs); + + // test the return values + selector.deleteProcessedMessages(sqs, sqsUrl, testSingleList); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java new file mode 100644 index 0000000000000..2208543c08d1d --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.utilities.testutils.CloudObjectTestUtils; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import org.apache.hadoop.fs.Path; +import org.json.JSONObject; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL; +import static org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestS3EventsMetaSelector extends HoodieClientTestHarness { + + TypedProperties props; + String sqsUrl; + + @Mock + AmazonSQS sqs; + + @Mock + private S3EventsMetaSelector s3EventsMetaSelector; + + @BeforeEach + void setUp() { + initSparkContexts(); + initPath(); + initFileSystem(); + MockitoAnnotations.initMocks(this); + + props = new TypedProperties(); + sqsUrl = "test-queue"; + props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl); + props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME); + } + + @AfterEach + public void teardown() throws Exception { + Mockito.reset(s3EventsMetaSelector); + cleanupResources(); + } + + @ParameterizedTest + @ValueSource(classes = {S3EventsMetaSelector.class}) + public void testNextEventsFromQueueShouldReturnsEventsFromQueue(Class clazz) { + S3EventsMetaSelector selector = (S3EventsMetaSelector) ReflectionUtils.loadClass(clazz.getName(), props); + // setup s3 record + String bucket = "test-bucket"; + String key = "part-foo-bar.snappy.parquet"; + Path path = new Path(bucket, key); + CloudObjectTestUtils.setMessagesInQueue(sqs, path); + + List processed = new ArrayList<>(); + + // test the return values + Pair, String> eventFromQueue = + selector.getNextEventsFromQueue(sqs, Option.empty(), processed); + + assertEquals(1, eventFromQueue.getLeft().size()); + assertEquals(1, processed.size()); + assertEquals( + key, + new JSONObject(eventFromQueue.getLeft().get(0)) + .getJSONObject("s3") + .getJSONObject("object") + .getString("key")); + assertEquals("1627376736755", eventFromQueue.getRight()); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java new file mode 100644 index 0000000000000..49ea2de0480e0 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.testutils; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.GetQueueAttributesResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.apache.hadoop.fs.Path; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +/** + * Utils Class for unit testing on CloudObject related sources. + */ +public class CloudObjectTestUtils { + + /** + * Set a return value for mocked sqs instance. It will add a new messages (s3 Event) and set + * ApproximateNumberOfMessages attribute of the queue. + * + * @param sqs Mocked instance of AmazonSQS + * @param path Absolute Path of file in FileSystem + */ + public static void setMessagesInQueue(AmazonSQS sqs, Path path) { + + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); + String approximateNumberOfMessages = "0"; + + if (path != null) { + String body = + "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"1\",\n \"TopicArn\" : \"arn:aws:sns:foo:123:" + + "foo-bar\",\n \"Subject\" : \"Amazon S3 Notification\",\n \"Message\" : \"{\\\"Records\\\":" + + "[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us" + + "-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated" + + ":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":" + + "{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\"" + + "test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\"" + + "configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\"" + + path.getParent().toString().replace("hdfs://", "") + + "\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}" + + ",\\\"object\\\":{\\\"key\\\":\\\"" + + path.getName() + + "\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}"; + + Message message = new Message(); + message.setReceiptHandle("1"); + message.setMessageId("1"); + message.setBody(body); + + List messages = new ArrayList<>(); + messages.add(message); + receiveMessageResult.setMessages(messages); + approximateNumberOfMessages = "1"; + } + when(sqs.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))) + .thenReturn( + new GetQueueAttributesResult() + .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, approximateNumberOfMessages)); + } + + /** + * Mock the sqs.deleteMessageBatch() method from queue. + * + * @param sqs Mocked instance of AmazonSQS + */ + public static void deleteMessagesInQueue(AmazonSQS sqs) { + when(sqs.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(new DeleteMessageBatchResult()); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java new file mode 100644 index 0000000000000..dfd37950dce4d --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.testutils.sources; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.sources.Source; +import org.apache.hudi.utilities.testutils.CloudObjectTestUtils; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; + +import com.amazonaws.services.sqs.AmazonSQS; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.List; + +/** + * An abstract test base for {@link Source} using CloudObjects as the file system. + */ +public abstract class AbstractCloudObjectsSourceTestBase extends UtilitiesTestBase { + + protected FilebasedSchemaProvider schemaProvider; + protected String dfsRoot; + protected String fileSuffix; + protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + protected boolean useFlattenedSchema = false; + protected String sqsUrl = "test-queue"; + protected String regionName = "us-east-1"; + @Mock + protected AmazonSQS sqs; + + @BeforeAll + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(); + } + + @AfterAll + public static void cleanupClass() { + UtilitiesTestBase.cleanupClass(); + } + + @BeforeEach + public void setup() throws Exception { + super.setup(); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); + MockitoAnnotations.initMocks(this); + } + + @AfterEach + public void teardown() throws Exception { + super.teardown(); + } + + /** + * Prepares the specific {@link Source} to test, by passing in necessary configurations. + * + * @return A {@link Source} using DFS as the file system. + */ + protected abstract Source prepareCloudObjectSource(); + + /** + * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS. + * + * @param records Test data. + * @param path The path in {@link Path} of the file to write. + */ + protected abstract void writeNewDataToFile(List records, Path path) + throws IOException; + + /** + * Generates a batch of test data and writes the data to a file. + * + * @param filename The name of the file. + * @param instantTime The commit time. + * @param n The number of records to generate. + * @return The file path. + */ + protected Path generateOneFile(String filename, String instantTime, int n) throws IOException { + Path path = new Path(dfsRoot, filename + fileSuffix); + writeNewDataToFile(dataGenerator.generateInserts(instantTime, n, useFlattenedSchema), path); + return path; + } + + public void generateMessageInQueue(String filename) { + Path path = null; + if (filename != null) { + path = new Path(dfsRoot, filename + fileSuffix); + } + CloudObjectTestUtils.setMessagesInQueue(sqs, path); + } +} diff --git a/pom.xml b/pom.xml index 9f523aa9169f4..53d25fb58a5d8 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,7 @@ true 2.7.1 4.7 + 1.12.22