diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java index 4c050451c5a22..7d400f6ed5cba 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.sync.datahub; +import com.linkedin.common.Status; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; @@ -52,15 +53,18 @@ import org.apache.avro.Schema; import org.apache.parquet.schema.MessageType; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class DataHubSyncClient extends HoodieSyncClient { protected final DataHubSyncConfig config; private final DatasetUrn datasetUrn; + private static final Status SOFT_DELETE_FALSE = new Status().setRemoved(false); public DataHubSyncClient(DataHubSyncConfig config) { super(config); @@ -87,10 +91,13 @@ public void updateTableProperties(String tableName, Map tablePro .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties))) .build(); + DatahubResponseLogger responseLogger = new DatahubResponseLogger(); + try (RestEmitter emitter = config.getRestEmitter()) { - emitter.emit(propertiesChangeProposal, null).get(); + emitter.emit(propertiesChangeProposal, responseLogger).get(); } catch (Exception e) { - throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " + tableProperties, e); + throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " + + tableProperties, e); } } @@ -105,21 +112,12 @@ public void updateTableSchema(String tableName, MessageType schema) { final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString())); - MetadataChangeProposalWrapper schemaChangeProposal = MetadataChangeProposalWrapper.builder() - .entityType("dataset") - .entityUrn(datasetUrn) - .upsert() - .aspect(new SchemaMetadata() - .setSchemaName(tableName) - .setVersion(0) - .setHash("") - .setPlatform(datasetUrn.getPlatformEntity()) - .setPlatformSchema(platformSchema) - .setFields(new SchemaFieldArray(fields))) - .build(); + MetadataChangeProposalWrapper schemaChange = createSchemaMetadataUpdate(tableName, fields, platformSchema); + DatahubResponseLogger responseLogger = new DatahubResponseLogger(); try (RestEmitter emitter = config.getRestEmitter()) { - emitter.emit(schemaChangeProposal, null).get(); + emitter.emit(schemaChange, responseLogger).get(); + undoSoftDelete(emitter, responseLogger); } catch (Exception e) { throw new HoodieDataHubSyncException("Fail to change schema for Dataset " + datasetUrn, e); } @@ -135,6 +133,37 @@ public void close() { // no op; } + // When updating an entity, it is ncessary to set its soft-delete status to false, or else the update won't get + // reflected in the UI. + private void undoSoftDelete(RestEmitter client, DatahubResponseLogger responseLogger) throws IOException, ExecutionException, + InterruptedException { + MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder() + .entityType("dataset") + .entityUrn(datasetUrn) + .upsert() + .aspect(SOFT_DELETE_FALSE) + .aspectName("status") + .build(); + + client.emit(softDeleteUndoProposal, responseLogger).get(); + } + + private MetadataChangeProposalWrapper createSchemaMetadataUpdate(String tableName, List fields, + SchemaMetadata.PlatformSchema platformSchema) { + return MetadataChangeProposalWrapper.builder() + .entityType("dataset") + .entityUrn(datasetUrn) + .upsert() + .aspect(new SchemaMetadata() + .setSchemaName(tableName) + .setVersion(0) + .setHash("") + .setPlatform(datasetUrn.getPlatformEntity()) + .setPlatformSchema(platformSchema) + .setFields(new SchemaFieldArray(fields))) + .build(); + } + static Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) { try { return new TableSchemaResolver(metaClient).getTableAvroSchema(true); diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java new file mode 100644 index 0000000000000..27b6acff4021f --- /dev/null +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java @@ -0,0 +1,32 @@ +package org.apache.hudi.sync.datahub; + +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Handle responses to requests to Datahub Metastore. Just logs them. + */ +public class DatahubResponseLogger implements Callback { + private static final Logger LOG = LogManager.getLogger(DatahubResponseLogger.class); + + @Override + public void onCompletion(MetadataWriteResponse response) { + LOG.info("Completed Datahub RestEmitter request. " + + "Status: " + (response.isSuccess() ? " succeeded" : " failed")); + if (!response.isSuccess()) { + LOG.error("Request failed. " + response); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Response details: " + response); + } + } + + @Override + public void onFailure(Throwable e) { + LOG.error("Error during Datahub RestEmitter request", e); + } + +} diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java index 53c6529b5c699..bda4f206b5d48 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java @@ -30,6 +30,9 @@ import java.util.Properties; +import static org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier.DEFAULT_DATAHUB_ENV; +import static org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier.DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME; + public class DataHubSyncConfig extends HoodieSyncConfig { public static final ConfigProperty META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS = ConfigProperty @@ -52,6 +55,23 @@ public class DataHubSyncConfig extends HoodieSyncConfig { .noDefaultValue() .withDocumentation("Pluggable class to supply a DataHub REST emitter to connect to the DataHub instance. This overwrites other emitter configs."); + public static final ConfigProperty META_SYNC_DATAHUB_DATAPLATFORM_NAME = ConfigProperty + .key("hoodie.meta.sync.datahub.dataplatform.name") + .defaultValue(DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME) + .withDocumentation("String used to represent Hudi when creating its corresponding DataPlatform entity " + + "within Datahub"); + + public static final ConfigProperty META_SYNC_DATAHUB_DATASET_ENV = ConfigProperty + .key("hoodie.meta.sync.datahub.dataset.env") + .defaultValue(DEFAULT_DATAHUB_ENV.name()) + .withDocumentation("Environment to use when pushing entities to Datahub"); + + public static final ConfigProperty META_SYNC_DATAHUB_ENVIRONMENT_NAME = ConfigProperty + .key("hoodie.meta.sync.datahub.env.name") + .defaultValue(DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME) + .withDocumentation("String used to represent Hudi when creating its corresponding DataPlatform entity " + + "within Datahub"); + public final HoodieDataHubDatasetIdentifier datasetIdentifier; public DataHubSyncConfig(Properties props) { @@ -87,6 +107,13 @@ public static class DataHubSyncConfigParams { @Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable class to supply a DataHub REST emitter to connect to the DataHub instance. This overwrites other emitter configs.") public String emitterSupplierClass; + @Parameter(names = {"--data-platform-name"}, description = "String used to represent Hudi when creating its " + + "corresponding DataPlatform entity within Datahub") + public String dataPlatformName; + + @Parameter(names = {"--dataset-env"}, description = "Which Datahub Environment to use when pushing entities") + public String datasetEnv; + public boolean isHelp() { return hoodieSyncConfigParams.isHelp(); } @@ -97,6 +124,8 @@ public Properties toProps() { props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SERVER.key(), emitterServer); props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_TOKEN.key(), emitterToken); props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), emitterSupplierClass); + props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), dataPlatformName); + props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), datasetEnv); return props; } } diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java index c7e121ea0f001..05f26624f70ac 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java @@ -27,6 +27,8 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV; /** * Construct and provide the default {@link DatasetUrn} to identify the Dataset on DataHub. @@ -36,6 +38,7 @@ public class HoodieDataHubDatasetIdentifier { public static final String DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME = "hudi"; + public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV; protected final Properties props; @@ -44,8 +47,20 @@ public HoodieDataHubDatasetIdentifier(Properties props) { } public DatasetUrn getDatasetUrn() { - DataPlatformUrn dataPlatformUrn = new DataPlatformUrn(DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME); DataHubSyncConfig config = new DataHubSyncConfig(props); - return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)), FabricType.DEV); + + return new DatasetUrn( + createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)), + createDatasetName(config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)), + FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)) + ); + } + + private static DataPlatformUrn createDataPlatformUrn(String platformUrn) { + return new DataPlatformUrn(platformUrn); + } + + private static String createDatasetName(String databaseName, String tableName) { + return String.format("%s.%s", databaseName, tableName); } }