diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 5604a6240c71c..264b6a5cc0e25 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -747,6 +747,8 @@ protected Pair startService() { while (!isShutdownRequested()) { try { long start = System.currentTimeMillis(); + // Send a heartbeat metrics event to track the active ingestion job for this table. + streamSync.getMetrics().updateStreamerHeartbeatTimestamp(start); // check if deltastreamer need to update the configuration before the sync if (configurationHotUpdateStrategyOpt.isPresent()) { Option newProps = configurationHotUpdateStrategyOpt.get().updateProperties(props); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java index 9f1b087900d91..f29404701db97 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java @@ -229,8 +229,11 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS // configured via this option. The column is then used to trigger error events. StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema) .add(new StructField(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, DataTypes.StringType, true, Metadata.empty())); + StructType nullableStruct = dataType.asNullable(); Option> dataset = r.getBatch().map(rdd -> source.getSparkSession().read() - .option("columnNameOfCorruptRecord", ERROR_TABLE_CURRUPT_RECORD_COL_NAME).schema(dataType.asNullable()) + .option("columnNameOfCorruptRecord", ERROR_TABLE_CURRUPT_RECORD_COL_NAME) + .schema(nullableStruct) + .option("mode", "PERMISSIVE") .json(rdd)); Option> eventsDataset = processErrorEvents(dataset, ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 6626084aa2bdf..657474525f1f3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -160,6 +160,7 @@ public class StreamSync implements Serializable, Closeable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class); + private static final String NULL_PLACEHOLDER = "[null]"; /** * Delta Sync Config. @@ -421,14 +422,19 @@ public Pair, JavaRDD> syncOnce() throws IOException } else { Schema newSourceSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema(); Schema newTargetSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema(); - if (!(processedSchema.isSchemaPresent(newSourceSchema)) - || !(processedSchema.isSchemaPresent(newTargetSchema))) { - LOG.info("Seeing new schema. Source :" + newSourceSchema.toString(true) - + ", Target :" + newTargetSchema.toString(true)); + if ((newSourceSchema != null && !processedSchema.isSchemaPresent(newSourceSchema)) + || (newTargetSchema != null && !processedSchema.isSchemaPresent(newTargetSchema))) { + String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true); + String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true); + LOG.info("Seeing new schema. Source: {0}, Target: {1}", sourceStr, targetStr); // We need to recreate write client with new schema and register them. reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource); - processedSchema.addSchema(newSourceSchema); - processedSchema.addSchema(newTargetSchema); + if (newSourceSchema != null) { + processedSchema.addSchema(newSourceSchema); + } + if (newTargetSchema != null) { + processedSchema.addSchema(newTargetSchema); + } } } @@ -577,7 +583,8 @@ private InputBatch fetchNextBatchFromSource(Option resumeCheckpointStr, ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); - if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) { + if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null + && this.userProvidedSchemaProvider.getTargetSchema() != InputBatch.NULL_SCHEMA) { if (useRowWriter) { inputBatchForWriter = new InputBatch(transformed, checkpointStr, this.userProvidedSchemaProvider); } else { @@ -983,6 +990,7 @@ public void runMetaSync() { LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility"); } if (cfg.enableMetaSync) { + LOG.debug("[MetaSync] Starting sync"); FileSystem fs = FSUtils.getFs(cfg.targetBasePath, hoodieSparkContext.hadoopConfiguration()); TypedProperties metaProps = new TypedProperties(); @@ -996,14 +1004,19 @@ public void runMetaSync() { Map failedMetaSyncs = new HashMap<>(); for (String impl : syncClientToolClasses) { Timer.Context syncContext = metrics.getMetaSyncTimerContext(); + boolean success = false; try { SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, cfg.targetBasePath, cfg.baseFileFormat); + success = true; } catch (HoodieMetaSyncException e) { - LOG.warn("SyncTool class " + impl.trim() + " failed with exception", e); + LOG.error("SyncTool class {0} failed with exception {1}", impl.trim(), e); failedMetaSyncs.put(impl, e); } long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0; metrics.updateStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs); + if (success) { + LOG.info("[MetaSync] SyncTool class {0} completed successfully and took {1} ", impl.trim(), metaSyncTimeMs); + } } if (!failedMetaSyncs.isEmpty()) { throw getHoodieMetaSyncException(failedMetaSyncs); @@ -1175,13 +1188,14 @@ private void registerAvroSchemas(SchemaProvider schemaProvider) { */ private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) { // register the schemas, so that shuffle does not serialize the full schemas - if (null != sourceSchema) { - List schemas = new ArrayList<>(); + List schemas = new ArrayList<>(); + if (sourceSchema != null) { schemas.add(sourceSchema); - if (targetSchema != null) { - schemas.add(targetSchema); - } - + } + if (targetSchema != null) { + schemas.add(targetSchema); + } + if (!schemas.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Registering Schema: " + schemas); }