|
28 | 28 | import org.apache.hudi.common.engine.EngineType; |
29 | 29 | import org.apache.hudi.common.fs.FSUtils; |
30 | 30 | import org.apache.hudi.common.model.HoodieCleaningPolicy; |
| 31 | +import org.apache.hudi.common.table.HoodieTableConfig; |
31 | 32 | import org.apache.hudi.common.table.HoodieTableMetaClient; |
32 | 33 | import org.apache.hudi.common.table.log.HoodieLogFormat; |
33 | 34 | import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; |
@@ -210,7 +211,6 @@ public static HoodieWriteConfig getHoodieClientConfig( |
210 | 211 | .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) |
211 | 212 | .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) |
212 | 213 | .build()) |
213 | | - .withKeyGenerator(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) // needed by TwoToThreeUpgradeHandler |
214 | 214 | .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) |
215 | 215 | .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton |
216 | 216 | .withAutoCommit(false) |
@@ -239,6 +239,8 @@ public static TypedProperties flinkConf2TypedProperties(Configuration conf) { |
239 | 239 | Properties properties = new Properties(); |
240 | 240 | // put all the set options |
241 | 241 | flatConf.addAllToProperties(properties); |
| 242 | + // ugly: table keygen clazz, needed by TwoToThreeUpgradeHandler |
| 243 | + properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)); |
242 | 244 | // put all the default options |
243 | 245 | for (ConfigOption<?> option : FlinkOptions.optionalOptions()) { |
244 | 246 | if (!flatConf.contains(option) && option.hasDefaultValue()) { |
@@ -268,9 +270,12 @@ public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) thr |
268 | 270 | .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) |
269 | 271 | .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) |
270 | 272 | .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) |
| 273 | + .setPreCombineField(OptionsResolver.getPreCombineField(conf)) |
271 | 274 | .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) |
272 | 275 | .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) |
273 | | - .setPreCombineField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) |
| 276 | + .setKeyGeneratorClassProp(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) |
| 277 | + .setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)) |
| 278 | + .setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING)) |
274 | 279 | .setTimelineLayoutVersion(1) |
275 | 280 | .initTable(hadoopConf, basePath); |
276 | 281 | LOG.info("Table initialized under base path {}", basePath); |
|
0 commit comments