Skip to content

Commit e528dd7

Browse files
rmahindra123Rajesh Mahindravinothchandar
authored
[HUDI-2394] Implement Kafka Sink Protocol for Hudi for Ingesting Immutable Data (apache#3592)
- Fixing packaging, naming of classes - Use of log4j over slf4j for uniformity - More follow-on fixes - Added a version to control/coordinator events. - Eliminated the config added to write config - Fixed fetching of checkpoints based on table type - Clean up of naming, code placement Co-authored-by: Rajesh Mahindra <[email protected]> Co-authored-by: Vinoth Chandar <[email protected]>
1 parent bd1d2d4 commit e528dd7

File tree

51 files changed

+4710
-22
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4710
-22
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,4 @@ dependency-reduced-pom.xml
7878
#######################################
7979
hudi-integ-test/compose_env
8080
node_modules
81-
package-lock.json
81+
package-lock.json

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

+16
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.hudi.keygen.constant.KeyGeneratorType;
4747
import org.apache.hudi.metrics.MetricsReporterType;
4848
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
49+
import org.apache.hudi.table.RandomFileIdPrefixProvider;
4950
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
5051
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
5152

@@ -413,6 +414,12 @@ public class HoodieWriteConfig extends HoodieConfig {
413414
.withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. "
414415
+ "Once enabled, all the changes of a record are persisted to the delta log directly without merge");
415416

417+
public static final ConfigProperty<String> FILEID_PREFIX_PROVIDER_CLASS = ConfigProperty
418+
.key("hoodie.fileid.prefix.provider.class")
419+
.defaultValue(RandomFileIdPrefixProvider.class.getName())
420+
.sinceVersion("0.10.0")
421+
.withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`");
422+
416423
private ConsistencyGuardConfig consistencyGuardConfig;
417424

418425
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -1748,6 +1755,10 @@ public boolean allowOperationMetadataField() {
17481755
return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD);
17491756
}
17501757

1758+
public String getFileIdPrefixProviderClassName() {
1759+
return getString(FILEID_PREFIX_PROVIDER_CLASS);
1760+
}
1761+
17511762
public static class Builder {
17521763

17531764
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2079,6 +2090,11 @@ public Builder withAllowOperationMetadataField(boolean allowOperationMetadataFie
20792090
return this;
20802091
}
20812092

2093+
public Builder withFileIdPrefixProviderClassName(String fileIdPrefixProviderClassName) {
2094+
writeConfig.setValue(FILEID_PREFIX_PROVIDER_CLASS, fileIdPrefixProviderClassName);
2095+
return this;
2096+
}
2097+
20822098
public Builder withProperties(Properties properties) {
20832099
this.writeConfig.getProps().putAll(properties);
20842100
return this;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.table;
20+
21+
import java.util.Properties;
22+
23+
public abstract class FileIdPrefixProvider {
24+
25+
private final Properties props;
26+
27+
public FileIdPrefixProvider(Properties props) {
28+
this.props = props;
29+
}
30+
31+
public Properties getProps() {
32+
return props;
33+
}
34+
35+
public abstract String createFilePrefix(String partitionPath);
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.table;
20+
21+
import org.apache.hudi.common.fs.FSUtils;
22+
23+
import java.util.Properties;
24+
25+
public class RandomFileIdPrefixProvider extends FileIdPrefixProvider {
26+
27+
public RandomFileIdPrefixProvider(Properties props) {
28+
super(props);
29+
}
30+
31+
@Override
32+
public String createFilePrefix(String partitionPath) {
33+
return FSUtils.createNewFileIdPfx();
34+
}
35+
}

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.hudi.client;
2020

21-
import com.codahale.metrics.Timer;
22-
import org.apache.hadoop.conf.Configuration;
2321
import org.apache.hudi.client.common.HoodieJavaEngineContext;
2422
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
2523
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -30,6 +28,7 @@
3028
import org.apache.hudi.common.model.HoodieWriteStat;
3129
import org.apache.hudi.common.model.WriteOperationType;
3230
import org.apache.hudi.common.table.HoodieTableMetaClient;
31+
import org.apache.hudi.common.table.timeline.HoodieInstant;
3332
import org.apache.hudi.common.table.timeline.HoodieTimeline;
3433
import org.apache.hudi.common.util.Option;
3534
import org.apache.hudi.config.HoodieWriteConfig;
@@ -41,6 +40,9 @@
4140
import org.apache.hudi.table.HoodieTable;
4241
import org.apache.hudi.table.action.HoodieWriteMetadata;
4342

43+
import com.codahale.metrics.Timer;
44+
import org.apache.hadoop.conf.Configuration;
45+
4446
import java.io.IOException;
4547
import java.util.List;
4648
import java.util.Map;
@@ -153,11 +155,23 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
153155
throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient");
154156
}
155157

158+
public void transitionInflight(String instantTime) {
159+
HoodieTableMetaClient metaClient = createMetaClient(true);
160+
metaClient.getActiveTimeline().transitionRequestedToInflight(
161+
new HoodieInstant(HoodieInstant.State.REQUESTED, metaClient.getCommitActionType(), instantTime),
162+
Option.empty(), config.shouldAllowMultiWriteOnSameInstant());
163+
}
164+
156165
@Override
157166
public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
158167
String instantTime,
159168
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
160-
throw new HoodieNotSupportedException("BulkInsertPreppedRecords is not supported in HoodieJavaClient");
169+
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
170+
getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
171+
table.validateInsertSchema();
172+
preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
173+
HoodieWriteMetadata<List<WriteStatus>> result = table.bulkInsertPrepped(context, instantTime, preppedRecords, bulkInsertPartitioner);
174+
return postWrite(result, instantTime, table);
161175
}
162176

163177
@Override

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java

+17-8
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919
package org.apache.hudi.table.action.commit;
2020

2121
import org.apache.hudi.client.WriteStatus;
22-
import org.apache.hudi.common.fs.FSUtils;
2322
import org.apache.hudi.common.model.HoodieKey;
2423
import org.apache.hudi.common.model.HoodieRecord;
2524
import org.apache.hudi.common.model.HoodieRecordPayload;
2625
import org.apache.hudi.common.table.timeline.HoodieInstant;
2726
import org.apache.hudi.common.util.Option;
27+
import org.apache.hudi.common.util.ReflectionUtils;
2828
import org.apache.hudi.config.HoodieWriteConfig;
2929
import org.apache.hudi.execution.JavaLazyInsertIterable;
3030
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
3131
import org.apache.hudi.io.CreateHandleFactory;
3232
import org.apache.hudi.table.BulkInsertPartitioner;
33+
import org.apache.hudi.table.FileIdPrefixProvider;
3334
import org.apache.hudi.table.HoodieTable;
3435
import org.apache.hudi.table.action.HoodieWriteMetadata;
3536

@@ -66,10 +67,14 @@ public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(final List<HoodieRecord
6667
final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
6768
HoodieWriteMetadata result = new HoodieWriteMetadata();
6869

69-
//transition bulk_insert state to inflight
70-
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
71-
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
72-
config.shouldAllowMultiWriteOnSameInstant());
70+
// It's possible the transition to inflight could have already happened.
71+
if (!table.getActiveTimeline().filterInflights().containsInstant(instantTime)) {
72+
table.getActiveTimeline().transitionRequestedToInflight(
73+
new HoodieInstant(HoodieInstant.State.REQUESTED, table.getMetaClient().getCommitActionType(), instantTime),
74+
Option.empty(),
75+
config.shouldAllowMultiWriteOnSameInstant());
76+
}
77+
7378
// write new files
7479
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);
7580
//update index
@@ -102,12 +107,16 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
102107
: JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
103108
repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
104109

105-
String idPfx = FSUtils.createNewFileIdPfx();
110+
FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
111+
config.getFileIdPrefixProviderClassName(),
112+
config.getProps());
106113

107114
List<WriteStatus> writeStatuses = new ArrayList<>();
108115

109-
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, idPfx,
110-
table.getTaskContextSupplier(), new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
116+
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
117+
config, instantTime, table,
118+
fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(),
119+
new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
111120

112121
return writeStatuses;
113122
}

hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public enum Names {
3030
FLINK_SQL("Flink Sql Configs"),
3131
WRITE_CLIENT("Write Client Configs"),
3232
METRICS("Metrics Configs"),
33-
RECORD_PAYLOAD("Record Payload Config");
33+
RECORD_PAYLOAD("Record Payload Config"),
34+
KAFKA_CONNECT("Kafka Connect Configs");
3435

3536
public final String name;
3637

@@ -72,6 +73,9 @@ public static String getDescription(Names names) {
7273
description = "These set of configs are used to enable monitoring and reporting of key"
7374
+ "Hudi stats and metrics.";
7475
break;
76+
case KAFKA_CONNECT:
77+
description = "These set of configs are used for Kakfa Connect Sink Connector for writing Hudi Tables";
78+
break;
7579
default:
7680
description = "Please fill in the description for Config Group Name: " + names.name;
7781
break;

hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
2727
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2828
import org.apache.hudi.exception.HoodieException;
29+
2930
import org.apache.log4j.LogManager;
3031
import org.apache.log4j.Logger;
3132

hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java

+26-8
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,20 @@ public static List<IndexedRecord> generateTestRecords(int from, int limit) throw
7171
return toRecords(getSimpleSchema(), getSimpleSchema(), from, limit);
7272
}
7373

74+
public static List<String> generateTestJsonRecords(int from, int limit) throws IOException, URISyntaxException {
75+
Path dataPath = initializeSampleDataPath();
76+
77+
try (Stream<String> stream = Files.lines(dataPath)) {
78+
return stream.skip(from).limit(limit).collect(Collectors.toList());
79+
} catch (IOException e) {
80+
throw new HoodieIOException("Could not read data from " + RESOURCE_SAMPLE_DATA, e);
81+
}
82+
}
83+
7484
private static List<IndexedRecord> toRecords(Schema writerSchema, Schema readerSchema, int from, int limit)
7585
throws IOException, URISyntaxException {
7686
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
77-
// Required to register the necessary JAR:// file system
78-
URI resource = SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI();
79-
Path dataPath;
80-
if (resource.toString().contains("!")) {
81-
dataPath = uriToPath(resource);
82-
} else {
83-
dataPath = Paths.get(SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI());
84-
}
87+
Path dataPath = initializeSampleDataPath();
8588

8689
try (Stream<String> stream = Files.lines(dataPath)) {
8790
return stream.skip(from).limit(limit).map(s -> {
@@ -96,6 +99,21 @@ private static List<IndexedRecord> toRecords(Schema writerSchema, Schema readerS
9699
}
97100
}
98101

102+
/**
103+
* Required to register the necessary JAR:// file system.
104+
* @return Path to the sample data in the resource file.
105+
* @throws IOException
106+
* @throws URISyntaxException
107+
*/
108+
private static Path initializeSampleDataPath() throws IOException, URISyntaxException {
109+
URI resource = SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI();
110+
if (resource.toString().contains("!")) {
111+
return uriToPath(resource);
112+
} else {
113+
return Paths.get(SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI());
114+
}
115+
}
116+
99117
public static Path uriToPath(URI uri) throws IOException {
100118
final Map<String, String> env = new HashMap<>();
101119
final String[] array = uri.toString().split("!");

0 commit comments

Comments
 (0)