Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize updates of the persistent connection cache #854

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ simplejavamail = "8.11.3"
swagger = "2.1.22"
jsonpath = "2.9.0"
commons-compress = "1.27.1"
sqlite = "3.46.1.2"
sqlite = "3.46.1.3"
jakarta-mail = "2.1.3"
angus-mail = "2.0.3"
pkl = "0.26.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.atlan.pkg.util
import com.atlan.Atlan
import com.atlan.model.assets.Asset
import com.atlan.model.enums.AtlanDeleteType
import com.atlan.pkg.Utils
import com.atlan.pkg.cache.PersistentConnectionCache
import com.atlan.pkg.serde.csv.CSVXformer
import com.atlan.util.AssetBatch
Expand Down Expand Up @@ -91,10 +90,12 @@ class AssetRemover(

/**
* Actually run the removal of any assets identified for deletion.
*
* @return a list of the assets that were deleted
*/
fun deleteAssets() {
fun deleteAssets(): List<Asset> {
translateToGuids()
deleteAssetsByGuid()
return deleteAssetsByGuid()
}

/**
Expand Down Expand Up @@ -209,8 +210,10 @@ class AssetRemover(

/**
* Delete all assets we have identified for deletion, in batches of 20 at a time.
*
* @return a list of the assets that were deleted
*/
private fun deleteAssetsByGuid() {
private fun deleteAssetsByGuid(): List<Asset> {
if (guidsToDeleteToDetails.isNotEmpty()) {
val deletionType = if (purge) AtlanDeleteType.PURGE else AtlanDeleteType.SOFT
val guidList = guidsToDeleteToDetails.keys.filter { it.isNotBlank() }.toList()
Expand All @@ -236,10 +239,7 @@ class AssetRemover(
}
}
}
Utils.updateConnectionCache(
removed = guidsToDeleteToDetails.values.map { it },
fallback = fallback,
)
}
return guidsToDeleteToDetails.values.map { it }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.pkg.util

import com.atlan.model.assets.Asset
import com.atlan.pkg.Utils
import com.atlan.pkg.cache.ConnectionCache
import com.atlan.pkg.objectstore.ObjectStorageSyncer
Expand Down Expand Up @@ -43,7 +44,16 @@ class DeltaProcessor(
val outputDirectory: String = Paths.get(separator, "tmp").toString(),
private val previousFileProcessedExtension: String = ".processed",
) {
fun run() {
/**
* Run the delta detection.
* This includes: determining which assets should be deleted, deleting those assets, and updating
* the persistent connection cache by removing any deleted assets and creating / updating any assets
* that were created or modified.
*
* @param modifiedAssets list of assets that were modified by the processing up to the point of this delta detection
*/
fun run(modifiedAssets: List<Asset>? = null) {
var deletedAssets: List<Asset>? = null
if (semantic == "full") {
if (qualifiedNamePrefix.isNullOrBlank()) {
logger.warn { "Unable to determine qualifiedName prefix, will not delete any assets." }
Expand Down Expand Up @@ -73,7 +83,8 @@ class DeltaProcessor(
)
assetRemover.calculateDeletions(preprocessedDetails.preprocessedFile, previousFile)
if (assetRemover.hasAnythingToDelete()) {
assetRemover.deleteAssets()
// Note: this will update the persistent connection cache for both adds and deletes
deletedAssets = assetRemover.deleteAssets()
}
} else {
logger.info { "No previous file found, treated it as an initial load." }
Expand All @@ -82,6 +93,12 @@ class DeltaProcessor(
uploadToBackingStore(objectStore, preprocessedDetails.preprocessedFile, qualifiedNamePrefix, previousFileProcessedExtension)
}
}
// Update the connection cache with any changes (added and / or removed assets)
Utils.updateConnectionCache(
added = modifiedAssets,
removed = deletedAssets,
fallback = outputDirectory,
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,7 @@ object Importer {
Cube.select().where(Cube.GUID.eq(it)).pageSize(1).stream().findFirst().getOrNull()?.qualifiedName
}

Utils.updateConnectionCache(
added = ImportResults.getAllModifiedAssets(cubeImporterResults, dimResults, hierResults, fieldResults),
fallback = outputDirectory,
)
val modifiedAssets = ImportResults.getAllModifiedAssets(cubeImporterResults, dimResults, hierResults, fieldResults)

val delta =
DeltaProcessor(
Expand All @@ -187,7 +184,7 @@ object Importer {
previousFilePreprocessor = Preprocessor(Utils.getOrDefault(config.previousFileDirect, ""), fieldSeparator),
outputDirectory = outputDirectory,
)
delta.run()
delta.run(modifiedAssets)
return cubeQN
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.atlan.pkg.Utils
import com.atlan.pkg.aim.Importer
import com.atlan.pkg.serde.FieldSerde
import com.atlan.pkg.serde.csv.CSVXformer.Companion.getHeader
import com.atlan.pkg.serde.csv.ImportResults
import mu.KotlinLogging
import java.io.File
import kotlin.system.exitProcess
Expand Down Expand Up @@ -92,11 +91,6 @@ object Loader {
)
val assetResults = Importer.import(importConfig, outputDirectory)

Utils.updateConnectionCache(
added = ImportResults.getAllModifiedAssets(assetResults),
fallback = outputDirectory,
)

val qualifiedNameMap = assetResults?.primary?.qualifiedNames ?: mapOf()

// 3. Transform the lineage, only keeping any rows that have both input and output assets in Atlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,47 +209,45 @@ object Importer {
)
val colResults = columnImporter.import()

Utils.updateConnectionCache(
added = ImportResults.getAllModifiedAssets(dbResults, schResults, tblResults, viewResults, mviewResults, colResults),
fallback = outputDirectory,
)
val modifiedAssets = ImportResults.getAllModifiedAssets(dbResults, schResults, tblResults, viewResults, mviewResults, colResults)

if (deltaSemantic == "full") {
val connectionIdentity = ConnectionIdentity.fromString(preprocessedDetails.assetRootName)
val connectionQN =
val connectionQN =
if (deltaSemantic == "full") {
val connectionIdentity = ConnectionIdentity.fromString(preprocessedDetails.assetRootName)
try {
val list = Connection.findByName(connectionIdentity.name, AtlanConnectorType.fromValue(connectionIdentity.type))
list[0].qualifiedName
} catch (e: AtlanException) {
logger.error(e) { "Unable to find the unique connection in Atlan from the file: $connectionIdentity" }
exitProcess(50)
}
} else {
null
}

val previousFileDirect = Utils.getOrDefault(config.previousFileDirect, "")
val delta =
DeltaProcessor(
semantic = deltaSemantic,
qualifiedNamePrefix = connectionQN,
removalType = Utils.getOrDefault(config.deltaRemovalType, "archive"),
previousFilesPrefix = PREVIOUS_FILES_PREFIX,
resolver = AssetImporter,
preprocessedDetails = preprocessedDetails,
typesToRemove = listOf(Database.TYPE_NAME, Schema.TYPE_NAME, Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME, Column.TYPE_NAME),
logger = logger,
previousFilePreprocessor =
Preprocessor(
previousFileDirect,
fieldSeparator,
true,
outputFile = "$previousFileDirect.transformed.csv",
outputHeaders = targetHeaders,
),
outputDirectory = outputDirectory,
)
delta.run()
return connectionQN
}
return null
val previousFileDirect = Utils.getOrDefault(config.previousFileDirect, "")
val delta =
DeltaProcessor(
semantic = deltaSemantic,
qualifiedNamePrefix = connectionQN,
removalType = Utils.getOrDefault(config.deltaRemovalType, "archive"),
previousFilesPrefix = PREVIOUS_FILES_PREFIX,
resolver = AssetImporter,
preprocessedDetails = preprocessedDetails,
typesToRemove = listOf(Database.TYPE_NAME, Schema.TYPE_NAME, Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME, Column.TYPE_NAME),
logger = logger,
previousFilePreprocessor =
Preprocessor(
previousFileDirect,
fieldSeparator,
true,
outputFile = "$previousFileDirect.transformed.csv",
outputHeaders = targetHeaders,
),
outputDirectory = outputDirectory,
)
delta.run(modifiedAssets)
return connectionQN
}

private class Preprocessor(
Expand Down
Loading