Skip to content

Commit

Permalink
Merge pull request #487 from atlanhq/DVX-227
Browse files Browse the repository at this point in the history
Adds ability to passthrough extra attributes for lineage processes
  • Loading branch information
cmgrote authored Feb 2, 2024
2 parents 626ed21 + 58a4614 commit ceca900
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,27 @@ abstract class CSVXformer(
.quoteCharacter('"')
.skipEmptyRows(true)
.errorOnDifferentFieldCount(true)
builder.build(input).use { tmp ->
val one = tmp.stream().findFirst()
header =
one.map { obj: CsvRow -> obj.fields }
.orElse(emptyList())
}
header = getHeader(inputFile, fieldSeparator)
reader = builder.build(input)
counter = builder.build(input)
}

companion object {
fun getHeader(file: String, fieldSeparator: Char = ','): List<String> {
val input = Paths.get(file)
val builder = CsvReader.builder()
.fieldSeparator(fieldSeparator)
.quoteCharacter('"')
.skipEmptyRows(true)
.errorOnDifferentFieldCount(true)
builder.build(input).use { tmp ->
val one = tmp.stream().findFirst()
return one.map { obj: CsvRow -> obj.fields }
.orElse(emptyList())
}
}
}

/**
* Run the transformation and produce the output into the specified file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ class AssetTransformer(
const val SOURCE_TYPE = "$SOURCE_PREFIX $TYPE"
const val TARGET_PREFIX = "Target"
const val TARGET_TYPE = "$TARGET_PREFIX $TYPE"
val INPUT_HEADERS = listOf(
"$SOURCE_PREFIX $TYPE",
"$SOURCE_PREFIX $NAME",
"$SOURCE_PREFIX $CONNECTOR",
"$SOURCE_PREFIX $CONNECTION",
"$SOURCE_PREFIX $IDENTITY",
"$TARGET_PREFIX $TYPE",
"$TARGET_PREFIX $NAME",
"$TARGET_PREFIX $CONNECTOR",
"$TARGET_PREFIX $CONNECTION",
"$TARGET_PREFIX $IDENTITY",
)

fun getConnectionQN(ctx: Loader.Context, inputRow: Map<String, String>, prefix: String): String {
val connectorType = inputRow["$prefix $CONNECTOR"] ?: ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.pkg.lb

import com.atlan.model.assets.Asset
import com.atlan.model.assets.ICatalog
import com.atlan.model.assets.LineageProcess
import com.atlan.pkg.serde.FieldSerde
Expand All @@ -14,19 +13,12 @@ import mu.KLogger
class LineageTransformer(
private val ctx: Loader.Context,
private val inputFile: String,
private val lineageHeaders: List<String>,
private val qnMap: Map<AssetIdentity, String>,
private val logger: KLogger,
) : CSVXformer(
inputFile,
listOf(
Asset.TYPE_NAME.atlanFieldName,
Asset.QUALIFIED_NAME.atlanFieldName,
Asset.NAME.atlanFieldName,
Asset.CONNECTION_QUALIFIED_NAME.atlanFieldName,
"connectorType",
LineageProcess.INPUTS.atlanFieldName,
LineageProcess.OUTPUTS.atlanFieldName,
),
lineageHeaders,
logger,
) {
companion object {
Expand All @@ -35,6 +27,12 @@ class LineageTransformer(
const val XFORM_CONNECTION = "$XFORM_PREFIX ${AssetTransformer.CONNECTION}"
const val XFORM_IDENTITY = "$XFORM_PREFIX ${AssetTransformer.IDENTITY}"
const val XFORM_NAME = "$XFORM_PREFIX ${AssetTransformer.NAME}"
val INPUT_HEADERS = listOf(
XFORM_CONNECTOR,
XFORM_CONNECTION,
XFORM_IDENTITY,
XFORM_NAME,
)
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -72,21 +70,30 @@ class LineageTransformer(
listOf(target as ICatalog),
null,
)
return listOf(
listOf(
LineageProcess.TYPE_NAME,
qualifiedName,
name,
connectionQN,
xformConnector,
AssetRefXformer.encode(source),
AssetRefXformer.encode(target),
),
val row = mutableListOf(
LineageProcess.TYPE_NAME,
qualifiedName,
name,
connectionQN,
xformConnector,
AssetRefXformer.encode(source),
AssetRefXformer.encode(target),
)
for (i in row.size until lineageHeaders.size) {
// Append other attributes onto the row
row.add(inputRow[lineageHeaders[i]] ?: "")
}
return listOf(row)
}
}
// If we fall through, we were unable to define the lineage, so write a blank row
return listOf(listOf("", "", "", "", "", "", ""))
val row = mutableListOf("", "", "", "", "", "", "")
val extras = lineageHeaders.size - row.size
for (i in 0 until extras) {
// Pad the row with blanks for every extra column
row.add("")
}
return listOf(row)
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ package com.atlan.pkg.lb

import AssetImportCfg
import LineageBuilderCfg
import com.atlan.model.assets.Asset
import com.atlan.model.assets.Connection
import com.atlan.model.assets.LineageProcess
import com.atlan.pkg.Utils
import com.atlan.pkg.aim.Importer
import com.atlan.pkg.serde.FieldSerde
import com.atlan.pkg.serde.csv.CSVXformer.Companion.getHeader
import com.atlan.util.AssetBatch.AssetCreationHandling
import mu.KotlinLogging
import java.io.File
Expand Down Expand Up @@ -87,8 +90,24 @@ object Loader {

// 3. Transform the lineage, only keeping any rows that have both input and output assets in Atlan
logger.info { "=== Processing lineage... ===" }
val lineageHeaders = mutableListOf(
Asset.TYPE_NAME.atlanFieldName,
Asset.QUALIFIED_NAME.atlanFieldName,
Asset.NAME.atlanFieldName,
Asset.CONNECTION_QUALIFIED_NAME.atlanFieldName,
"connectorType",
LineageProcess.INPUTS.atlanFieldName,
LineageProcess.OUTPUTS.atlanFieldName,
)
val lineageFile = "$outputDirectory${File.separator}CSA_LB_lineage.csv"
val lineageXform = LineageTransformer(ctx, lineageInput, qualifiedNameMap, logger)
// Determine any non-standard lineage fields in the header and append them to the end of
// the list of standard header fields, so they're passed-through to be used as part of
// defining the lineage process itself
val inputHeaders = getHeader(lineageInput).toMutableList()
inputHeaders.removeAll(AssetTransformer.INPUT_HEADERS)
inputHeaders.removeAll(LineageTransformer.INPUT_HEADERS)
inputHeaders.forEach { lineageHeaders.add(it) }
val lineageXform = LineageTransformer(ctx, lineageInput, lineageHeaders, qualifiedNameMap, logger)
lineageXform.transform(lineageFile)

// 4. Load the lineage processes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import com.atlan.model.assets.Connection
import com.atlan.model.assets.LineageProcess
import com.atlan.model.assets.Table
import com.atlan.model.assets.View
import com.atlan.model.enums.AtlanAnnouncementType
import com.atlan.model.enums.AtlanConnectorType
import com.atlan.model.enums.AtlanLineageDirection
import com.atlan.model.enums.CertificateStatus
import com.atlan.model.lineage.FluentLineage
import com.atlan.pkg.PackageTest
import com.atlan.pkg.lb.Loader
Expand Down Expand Up @@ -132,12 +134,23 @@ class PartialAssetsTest : PackageTest() {
.where(LineageProcess.QUALIFIED_NAME.startsWith(c.qualifiedName))
.includeOnResults(LineageProcess.NAME)
.includeOnResults(LineageProcess.IS_PARTIAL)
.includeOnResults(LineageProcess.SQL)
.includeOnResults(LineageProcess.CERTIFICATE_STATUS)
.includeOnResults(LineageProcess.ANNOUNCEMENT_TYPE)
.includeOnResults(LineageProcess.ANNOUNCEMENT_TITLE)
.includeOnResults(LineageProcess.ANNOUNCEMENT_MESSAGE)
.toRequest()
val response = retrySearchUntil(request, 1)
val lineage = response.assets
assertEquals(1, lineage.size)
assertFalse(lineage[0].isPartial)
assertEquals("source_table > target_view", lineage[0].name)
val process = lineage[0] as LineageProcess
assertFalse(process.isPartial)
assertEquals("source_table > target_view", process.name)
assertEquals("select * from db1.schema1.source_table", process.sql)
assertEquals(CertificateStatus.DRAFT, process.certificateStatus)
assertEquals(AtlanAnnouncementType.INFORMATION, process.announcementType)
assertEquals("Testing lineage builder", process.announcementTitle)
assertEquals("Only a test...", process.announcementMessage)
}

@Test(groups = ["create"])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Source Type,Source Connector,Source Connection,Source Identity,Source Name,Target Type,Target Connector,Target Connection,Target Identity,Target Name,Transformation Connector,Transformation Connection,Transformation Identity,Transformation Name
Table,mulesoft,{CONNECTION},db1/schema1/source_table,source_table,View,mulesoft,{CONNECTION},db2/schema2/target_view,target_view,mulesoft,{CONNECTION},xform_123,source_table > target_view
Source Type,Source Connector,Source Connection,Source Identity,Source Name,Target Type,Target Connector,Target Connection,Target Identity,Target Name,Transformation Connector,Transformation Connection,Transformation Identity,Transformation Name,sql,certificateStatus,announcementType,announcementTitle,announcementMessage
Table,mulesoft,{CONNECTION},db1/schema1/source_table,source_table,View,mulesoft,{CONNECTION},db2/schema2/target_view,target_view,mulesoft,{CONNECTION},xform_123,source_table > target_view,select * from db1.schema1.source_table,DRAFT,information,Testing lineage builder,Only a test...

0 comments on commit ceca900

Please sign in to comment.