Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@

import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.PATH_BASED_PARTITION_FIELDS;
import static org.apache.spark.sql.functions.input_file_name;
import static org.apache.spark.sql.functions.split;

/**
* Generic helper methods to fetch from Cloud Storage during incremental fetch from cloud storage buckets.
Expand Down Expand Up @@ -176,6 +179,18 @@ public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudO
totalSize *= 1.1;
long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
return Option.of(reader.load(paths.toArray(new String[cloudObjectMetadata.size()])).coalesce(numPartitions));
Dataset<Row> dataset = reader.load(paths.toArray(new String[cloudObjectMetadata.size()])).coalesce(numPartitions);

// add partition column from source path if configured
if (props.containsKey(PATH_BASED_PARTITION_FIELDS)) {
String[] partitionKeysToAdd = props.getString(PATH_BASED_PARTITION_FIELDS).split(",");
// Add partition column for all path-based partition keys. If key is not present in path, the value will be null.
for (String partitionKey : partitionKeysToAdd) {
String partitionPathPattern = String.format("%s=", partitionKey);
LOG.info(String.format("Adding column %s to dataset", partitionKey));
dataset = dataset.withColumn(partitionKey, split(split(input_file_name(), partitionPathPattern).getItem(1), "/").getItem(0));
}
}
return Option.of(dataset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ public class CloudStoreIngestionConfig {
@Deprecated
public static final String DATAFILE_FORMAT = CloudSourceConfig.DATAFILE_FORMAT.key();

/**
* A comma delimited list of path-based partition fields in the source file structure
*/
public static final String PATH_BASED_PARTITION_FIELDS = "hoodie.deltastreamer.source.cloud.data.partition.fields.from.path";
Comment on lines +106 to +109
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add new configs in CloudSourceConfig.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources.helpers;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.HoodieClientTestHarness;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;

public class TestCloudObjectsSelectorCommon extends HoodieClientTestHarness {

@BeforeEach
void setUp() {
initSparkContexts();
}

@AfterEach
public void teardown() throws Exception {
cleanupResources();
}

@Test
public void emptyMetadataReturnsEmptyOption() {
Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(), new TypedProperties(), "json");
Assertions.assertFalse(result.isPresent());
}

@Test
public void filesFromMetadataRead() {
List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1));
Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, new TypedProperties(), "json");
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
Row expected = RowFactory.create("some data");
Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList());
}

@Test
public void partitionValueAddedToRow() {
List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1));

TypedProperties properties = new TypedProperties();
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "country,state");
Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json");
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
Row expected = RowFactory.create("some data", "US", "CA");
Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList());
}

@Test
public void partitionKeyNotPresentInPath() {
List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1));
TypedProperties properties = new TypedProperties();
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "unknown");
Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json");
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
Row expected = RowFactory.create("some data", null);
Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data": "some data"}