diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java index 594ebb37688e4..4c0cd13da7716 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java @@ -32,6 +32,8 @@ import com.linkedin.common.BrowsePathEntry; import com.linkedin.common.BrowsePathEntryArray; import com.linkedin.common.BrowsePathsV2; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.Status; import com.linkedin.common.SubTypes; import com.linkedin.common.UrnArray; @@ -71,6 +73,9 @@ public class DataHubSyncClient extends HoodieSyncClient { private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncClient.class); protected final DataHubSyncConfig config; + private final DataPlatformUrn dataPlatformUrn; + private final Option dataPlatformInstance; + private final Option dataPlatformInstanceUrn; private final DatasetUrn datasetUrn; private final Urn databaseUrn; private final String tableName; @@ -81,7 +86,10 @@ public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient metaCli super(config); this.config = config; HoodieDataHubDatasetIdentifier datasetIdentifier = - config.getDatasetIdentifier(); + config.getDatasetIdentifier(); + this.dataPlatformUrn = datasetIdentifier.getDataPlatformUrn(); + this.dataPlatformInstance = datasetIdentifier.getDataPlatformInstance(); + this.dataPlatformInstanceUrn = datasetIdentifier.getDataPlatformInstanceUrn(); this.datasetUrn = datasetIdentifier.getDatasetUrn(); this.databaseUrn = datasetIdentifier.getDatabaseUrn(); this.tableName = datasetIdentifier.getTableName(); @@ -217,8 +225,8 @@ private MetadataChangeProposalWrapper createContainerAspect(Urn entityUrn, Urn c return containerProposal; } - private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, List path) { - BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(path); + private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, List paths) { + BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(paths); MetadataChangeProposalWrapper browsePathsProposal = MetadataChangeProposalWrapper.builder() .entityType(entityUrn.getEntityType()) .entityUrn(entityUrn) @@ -228,6 +236,21 @@ private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, Lis return browsePathsProposal; } + private MetadataChangeProposalWrapper createDataPlatformInstanceAspect(Urn entityUrn) { + DataPlatformInstance dataPlatformInstanceAspect = new DataPlatformInstance().setPlatform(this.dataPlatformUrn); + if (this.dataPlatformInstanceUrn.isPresent()) { + dataPlatformInstanceAspect.setInstance(dataPlatformInstanceUrn.get()); + } + + MetadataChangeProposalWrapper dataPlatformInstanceProposal = MetadataChangeProposalWrapper.builder() + .entityType(entityUrn.getEntityType()) + .entityUrn(entityUrn) + .upsert() + .aspect(dataPlatformInstanceAspect) + .build(); + return dataPlatformInstanceProposal; + } + private MetadataChangeProposalWrapper createDomainAspect(Urn entityUrn) { try { Urn domainUrn = Urn.createFromString(config.getDomainIdentifier()); @@ -252,12 +275,18 @@ private Stream createContainerEntity() { .aspect(new ContainerProperties().setName(databaseName)) .build(); + List paths = dataPlatformInstanceUrn.map(dpiUrn -> Collections.singletonList( + new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString())) + ).orElse(Collections.emptyList()); + Stream resultStream = Stream.of( - containerEntityProposal, - createSubTypeAspect(databaseUrn, "Database"), - createBrowsePathsAspect(databaseUrn, Collections.emptyList()), createStatusAspect(databaseUrn), - config.attachDomain() ? createDomainAspect(databaseUrn) : null - ).filter(Objects::nonNull); + containerEntityProposal, + createSubTypeAspect(databaseUrn, "Database"), + createDataPlatformInstanceAspect(databaseUrn), + createBrowsePathsAspect(databaseUrn, paths), + createStatusAspect(databaseUrn), + config.attachDomain() ? createDomainAspect(databaseUrn) : null + ).filter(Objects::nonNull); return resultStream; } @@ -314,13 +343,23 @@ private MetadataChangeProposalWrapper createSchemaMetadataAspect(String tableNam } private Stream createDatasetEntity() { + BrowsePathEntry databasePath = new BrowsePathEntry().setUrn(databaseUrn).setId(databaseName); + List paths = dataPlatformInstanceUrn.map(dpiUrn -> { + List list = new ArrayList(); + list.add(new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString())); + list.add(databasePath); + return list; + } + ).orElse(Collections.singletonList(databasePath)); + Stream result = Stream.of( - createStatusAspect(datasetUrn), - createSubTypeAspect(datasetUrn, "Table"), - createBrowsePathsAspect(datasetUrn, Collections.singletonList(new BrowsePathEntry().setUrn(databaseUrn).setId(databaseName))), - createContainerAspect(datasetUrn, databaseUrn), - createSchemaMetadataAspect(tableName), - config.attachDomain() ? createDomainAspect(datasetUrn) : null + createStatusAspect(datasetUrn), + createSubTypeAspect(datasetUrn, "Table"), + createDataPlatformInstanceAspect(datasetUrn), + createBrowsePathsAspect(datasetUrn, paths), + createContainerAspect(datasetUrn, databaseUrn), + createSchemaMetadataAspect(tableName), + config.attachDomain() ? createDomainAspect(datasetUrn) : null ).filter(Objects::nonNull); return result; } diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java index a54c7c85e48ff..2731563e04b1e 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java @@ -78,6 +78,13 @@ public class DataHubSyncConfig extends HoodieSyncConfig { .withDocumentation("String used to represent Hudi when creating its corresponding DataPlatform entity " + "within Datahub"); + public static final ConfigProperty META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME = ConfigProperty + .key("hoodie.meta.sync.datahub.dataplatform_instance.name") + .noDefaultValue() + .markAdvanced() + .withDocumentation("String used to represent Hudi instance when emitting Container and Dataset entities " + + "with the corresponding DataPlatformInstance, only if given."); + public static final ConfigProperty META_SYNC_DATAHUB_DATASET_ENV = ConfigProperty .key("hoodie.meta.sync.datahub.dataset.env") .defaultValue(DEFAULT_DATAHUB_ENV.name()) @@ -174,6 +181,10 @@ public static class DataHubSyncConfigParams { + "corresponding DataPlatform entity within Datahub") public String dataPlatformName; + @Parameter(names = {"--data-platform-instance-name"}, description = "String used to represent Hudi instance when emitting Container and Dataset entities " + + "with the corresponding DataPlatformInstance, only if given.") + public String dataPlatformInstanceName; + @Parameter(names = {"--dataset-env"}, description = "Which Datahub Environment to use when pushing entities") public String datasetEnv; @@ -196,6 +207,7 @@ public Properties toProps() { props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_TOKEN.key(), emitterToken); props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), emitterSupplierClass); props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), dataPlatformName); + props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), dataPlatformInstanceName); props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), datasetEnv); props.setPropertyIfNonNull(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key(), domainIdentifier); // We want the default behavior of DataHubSync Tool when run as command line to NOT suppress exceptions diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java index 3c964c1c6b58e..055ed8f4b57a8 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java @@ -19,6 +19,8 @@ package org.apache.hudi.sync.datahub.config; +import org.apache.hudi.common.util.Option; + import com.linkedin.common.FabricType; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.DatasetUrn; @@ -29,6 +31,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME; import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME; import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV; @@ -43,6 +46,10 @@ public class HoodieDataHubDatasetIdentifier { public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV; protected final Properties props; + private final String dataPlatform; + private final DataPlatformUrn dataPlatformUrn; + private final Option dataPlatformInstance; + private final Option dataPlatformInstanceUrn; private final DatasetUrn datasetUrn; private final Urn databaseUrn; private final String tableName; @@ -55,20 +62,28 @@ public HoodieDataHubDatasetIdentifier(Properties props) { } DataHubSyncConfig config = new DataHubSyncConfig(props); + this.dataPlatform = config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME); + this.dataPlatformUrn = createDataPlatformUrn(this.dataPlatform); + this.dataPlatformInstance = Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME)); + this.dataPlatformInstanceUrn = createDataPlatformInstanceUrn( + this.dataPlatformUrn, + Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME)) + ); this.datasetUrn = new DatasetUrn( - createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)), - createDatasetName(config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)), - FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)) + this.dataPlatformUrn, + createDatasetName(this.dataPlatformInstance, config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)), + FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)) ); this.tableName = config.getString(META_SYNC_TABLE_NAME); this.databaseName = config.getString(META_SYNC_DATABASE_NAME); + // https://github.com/datahub-project/datahub/blob/0b105395e913cc47a59bdeed0c56d7c0d4b71b63/metadata-ingestion/src/datahub/emitter/mcp_builder.py#L69-L72 DatabaseKey databaseKey = DatabaseKey.builder() - .platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)) - .instance(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)) - .database(this.databaseName) - .build(); + .platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)) + .instance(this.dataPlatformInstance.orElse(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))) + .database(this.databaseName) + .build(); this.databaseUrn = databaseKey.asUrn(); } @@ -77,6 +92,22 @@ public DatasetUrn getDatasetUrn() { return this.datasetUrn; } + public String getDataPlatform() { + return this.dataPlatform; + } + + public DataPlatformUrn getDataPlatformUrn() { + return this.dataPlatformUrn; + } + + public Option getDataPlatformInstance() { + return this.dataPlatformInstance; + } + + public Option getDataPlatformInstanceUrn() { + return this.dataPlatformInstanceUrn; + } + public Urn getDatabaseUrn() { return this.databaseUrn; } @@ -93,7 +124,22 @@ private static DataPlatformUrn createDataPlatformUrn(String platformUrn) { return new DataPlatformUrn(platformUrn); } - private static String createDatasetName(String databaseName, String tableName) { + private static Option createDataPlatformInstanceUrn(DataPlatformUrn dataPlatformUrn, Option dataPlatformInstance) { + if (dataPlatformInstance.isEmpty()) { + return Option.empty(); + } + String dataPlatformInstanceStr = String.format("urn:li:dataPlatformInstance:(%s,%s)", dataPlatformUrn.toString(), dataPlatformInstance.get()); + try { + return Option.of(Urn.createFromString(dataPlatformInstanceStr)); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("Failed to create DataPlatformInstance URN from string: %s", dataPlatformInstanceStr), e); + } + } + + private static String createDatasetName(Option dataPlatformInstance, String databaseName, String tableName) { + if (dataPlatformInstance.isPresent()) { + return String.format("%s.%s.%s", dataPlatformInstance.get(), databaseName, tableName); + } return String.format("%s.%s", databaseName, tableName); } } \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java index d9910bacab577..30d7eb8e405f3 100644 --- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java @@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -47,10 +48,13 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME; import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -97,7 +101,7 @@ public void afterEach() { } @Test - public void testUpdateTableSchemaInvokesRestEmitter() throws IOException { + public void testUpdateTableSchema() throws IOException { Properties props = new Properties(); props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); props.put(META_SYNC_BASE_PATH.key(), tableBasePath); @@ -111,9 +115,82 @@ public void testUpdateTableSchemaInvokesRestEmitter() throws IOException { DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); - dhClient.updateTableSchema("some_table", null/*, null*/); - verify(restEmitterMock, times(9)).emit(any(MetadataChangeProposalWrapper.class), - Mockito.any()); + dhClient.updateTableSchema("some_table", null); + ArgumentCaptor captor = ArgumentCaptor.forClass(MetadataChangeProposalWrapper.class); + verify(restEmitterMock, times(11)).emit(captor.capture(), Mockito.any()); + + Map capturedProposalsMap = captor.getAllValues().stream() + .collect(Collectors.toMap( + mcpw -> mcpw.getEntityUrn() + "+" + mcpw.getAspectName(), + mcpw -> mcpw.getAspect().toString() + )); + + Map expectedProposalsMap = new HashMap<>(); + expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+containerProperties", "{name=default}"); + expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+subTypes", "{typeNames=[Database]}"); + expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+dataPlatformInstance", "{platform=urn:li:dataPlatform:hudi}"); + expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+browsePathsV2", "{path=[]}"); + expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+status", "{removed=false}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+status", "{removed=false}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+subTypes", "{typeNames=[Table]}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+dataPlatformInstance", "{platform=urn:li:dataPlatform:hudi}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+browsePathsV2", "{path=[{urn=urn:li:container:ca5c62a21c8486b650b16634aacd3996, id=default}]}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+container", "{container=urn:li:container:ca5c62a21c8486b650b16634aacd3996}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+schemaMetadata", "{platformSchema={com.linkedin.schema.OtherSchema={rawSchema" + + "={\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"}]}}}, schemaName=triprec, fields=[{nullable=false, fieldPath=[version=2.0]" + + ".[type=triprec].[type=long].ts, isPartOfKey=false, type={type={com.linkedin.schema.NumberType={}}}, nativeDataType=long}], version=0, platform=urn:li:dataPlatform:hudi" + + ", hash=fcb5c4d60382cb4d1dd4710810b42295}"); + + assertEquals(expectedProposalsMap, capturedProposalsMap); + } + + @Test + public void testUpdateTableSchemaWhenPlatformInstance() throws IOException { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), "test_instance"); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + + Mockito.when( + restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any()) + ).thenReturn( + CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()) + ); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + dhClient.updateTableSchema("some_table", null); + ArgumentCaptor captor = ArgumentCaptor.forClass(MetadataChangeProposalWrapper.class); + verify(restEmitterMock, times(11)).emit(captor.capture(), Mockito.any()); + + Map capturedProposalsMap = captor.getAllValues().stream() + .collect(Collectors.toMap( + mcpw -> mcpw.getEntityUrn() + "+" + mcpw.getAspectName(), + mcpw -> mcpw.getAspect().toString() + )); + + Map expectedProposalsMap = new HashMap<>(); + expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+containerProperties", "{name=default}"); + expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+subTypes", "{typeNames=[Database]}"); + expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+dataPlatformInstance", "{platform=urn:li:dataPlatform:hudi, instance=" + + "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}"); + expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+browsePathsV2", "{path=[{urn=urn:li:dataPlatformInstance:(urn:li:dataPlatform" + + ":hudi,test_instance), id=urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}]}"); + expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+status", "{removed=false}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+status", "{removed=false}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+subTypes", "{typeNames=[Table]}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+dataPlatformInstance", "{platform=urn:li:dataPlatform:hudi, instance=" + + "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+browsePathsV2", "{path=[{urn=urn:li:dataPlatformInstance:(urn:li:dataPlatform" + + ":hudi,test_instance), id=urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}, {urn=urn:li:container:da9c1430eb7551811f4c0e11b911614d, id=default}]}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+container", "{container=urn:li:container:da9c1430eb7551811f4c0e11b911614d}"); + expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+schemaMetadata", "{platformSchema={com.linkedin.schema.OtherSchema={rawSchema" + + "={\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"}]}}}, schemaName=triprec, fields=[{nullable=false, fieldPath=[version=2.0]" + + ".[type=triprec].[type=long].ts, isPartOfKey=false, type={type={com.linkedin.schema.NumberType={}}}, nativeDataType=long}], version=0, platform=urn:li:dataPlatform:hudi" + + ", hash=fcb5c4d60382cb4d1dd4710810b42295}"); + + assertEquals(expectedProposalsMap, capturedProposalsMap); } @Test diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java index 52af11f0a8547..9310944bb884e 100644 --- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java @@ -30,6 +30,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME; import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME; import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -144,4 +145,56 @@ void testConstructorWithInvalidEnvironment() { new HoodieDataHubDatasetIdentifier(props); }); } + + @Test + @DisplayName("Test constructor with platform instance") + void testConstructorWithPlatformInstance() { + String expectedDatabaseUrnWithPlatformInstance = "urn:li:container:ee430d6d2a1fb6336b0e972809e41e55"; + String expectedDatabaseUrnWithEnvAsInstance = "urn:li:container:ec7465a48d93b5c5e57eca1f44febed5"; + + // Given both platform instance and env + props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db"); + props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table"); + props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), "custom_platform"); + props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), "custom_instance"); + props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD"); + + // When + HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props); + + // Then + assertEquals("custom_platform", identifier.getDataPlatform()); + assertEquals("urn:li:dataPlatform:custom_platform", identifier.getDataPlatformUrn().toString()); + assertEquals("custom_instance", identifier.getDataPlatformInstance().get()); + assertEquals("urn:li:dataPlatformInstance:(urn:li:dataPlatform:custom_platform,custom_instance)", identifier.getDataPlatformInstanceUrn().get().toString()); + assertEquals(expectedDatabaseUrnWithPlatformInstance, identifier.getDatabaseUrn().toString()); + + // Given platform instance only + props.remove(META_SYNC_DATAHUB_DATASET_ENV.key()); + + // When + identifier = new HoodieDataHubDatasetIdentifier(props); + + // Then + assertEquals("custom_platform", identifier.getDataPlatform()); + assertEquals("urn:li:dataPlatform:custom_platform", identifier.getDataPlatformUrn().toString()); + assertEquals("custom_instance", identifier.getDataPlatformInstance().get()); + assertEquals("urn:li:dataPlatformInstance:(urn:li:dataPlatform:custom_platform,custom_instance)", identifier.getDataPlatformInstanceUrn().get().toString()); + assertEquals(expectedDatabaseUrnWithPlatformInstance, identifier.getDatabaseUrn().toString()); + + // Given env only + props.remove(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key()); + props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD"); + + // When + identifier = new HoodieDataHubDatasetIdentifier(props); + + // Then + assertEquals("custom_platform", identifier.getDataPlatform()); + assertEquals("urn:li:dataPlatform:custom_platform", identifier.getDataPlatformUrn().toString()); + assertTrue(identifier.getDataPlatformInstance().isEmpty()); + assertTrue(identifier.getDataPlatformInstanceUrn().isEmpty()); + assertEquals(expectedDatabaseUrnWithEnvAsInstance, identifier.getDatabaseUrn().toString()); + + } } \ No newline at end of file