diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/Utils.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/Utils.kt index cccf36fc78..4ec9ee8c4c 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/Utils.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/Utils.kt @@ -5,8 +5,10 @@ package com.atlan.pkg import com.atlan.Atlan import com.atlan.exception.AtlanException import com.atlan.exception.NotFoundException +import com.atlan.model.assets.Asset import com.atlan.model.assets.Connection import com.atlan.model.enums.AssetCreationHandling +import com.atlan.pkg.cache.PersistentConnectionCache import com.atlan.pkg.model.Credential import com.atlan.pkg.objectstore.ADLSCredential import com.atlan.pkg.objectstore.ADLSSync @@ -24,6 +26,8 @@ import mu.KotlinLogging import org.simplejavamail.email.EmailBuilder import org.simplejavamail.mailer.MailerBuilder import java.io.File +import java.io.File.separator +import java.io.IOException import java.nio.file.Paths import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicLong @@ -777,4 +781,68 @@ object Utils { else -> throw IllegalStateException("Unable to determine cloud provider: $cloud") } } + + /** + * Update the connection cache for the provided assets. + * + * @param added assets that were added + * @param removed assets that were deleted + */ + fun updateConnectionCache( + added: Collection? = null, + removed: Collection? = null, + ) { + val sync = getBackingStore() + val map = CacheUpdates.build(added, removed) + for ((connectionQN, assets) in map) { + val paths = mutableListOf("tmp", "cache") + paths.addAll(connectionQN.split("/")) + val tmpFile = paths.joinToString { "/" } + // Retrieve any pre-existing cache first, so we can update it + try { + sync.downloadFrom("connection-cache/$connectionQN.sqlite", tmpFile) + } catch (e: IOException) { + logger.info(e) { "Unable to download pre-existing cache: connection-cache/$connectionQN.sqlite" } + } + val cache = PersistentConnectionCache(tmpFile) + cache.addAssets(assets.added) + cache.deleteAssets(assets.removed) + // Replace the cache with the updated one + try { + sync.uploadTo(tmpFile, "connection-cache/$connectionQN.sqlite") + } catch (e: IOException) { + logger.error(e) { "Unable to upload updated cache: connection-cache/$connectionQN.sqlite" } + } + } + } + + private data class CacheUpdates( + val connectionQN: String, + val added: MutableList, + val removed: MutableList, + ) { + companion object { + fun build( + add: Collection?, + remove: Collection?, + ): Map { + val map = mutableMapOf() + add?.forEach { + val connectionQN = it.connectionQualifiedName + if (!map.containsKey(connectionQN)) { + map[connectionQN] = CacheUpdates(connectionQN, mutableListOf(), mutableListOf()) + } + map[connectionQN]!!.added.add(it) + } + remove?.forEach { + val connectionQN = it.connectionQualifiedName + if (!map.containsKey(connectionQN)) { + map[connectionQN] = CacheUpdates(connectionQN, mutableListOf(), mutableListOf()) + } + map[connectionQN]!!.removed.add(it) + } + return map + } + } + } } diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/cache/PersistentConnectionCache.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/cache/PersistentConnectionCache.kt new file mode 100644 index 0000000000..328093398e --- /dev/null +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/cache/PersistentConnectionCache.kt @@ -0,0 +1,114 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.cache + +import com.atlan.model.assets.Asset +import com.atlan.model.assets.Column +import mu.KotlinLogging +import java.sql.DriverManager +import java.sql.SQLException + +/** + * Cache that contains a listing of all asset identities for a given connection, and stores + * these persistently in object storage for use by other workflows (i.e. in considering lineage). + * + * @param dbFile the file path to the SQLite database file + * @param ignoreQNs a collection of qualifiedNames to ignore + */ +class PersistentConnectionCache( + private val dbFile: String, + private val ignoreQNs: Collection = emptySet(), +) { + private val logger = KotlinLogging.logger {} + private val dbString = "jdbc:sqlite:$dbFile" + + init { + val tableQuery = + """ + CREATE TABLE entities ( + type_name TEXT NOT NULL, + qual_name TEXT NOT NULL, + con_qual_name TEXT, + name TEXT, + order_seq INTEGER, + tenant_id TEXT, + PRIMARY KEY (type_name, qual_name) + ) + """.trimIndent() + + DriverManager.getConnection(dbString).use { connection -> + connection.createStatement().use { statement -> + try { + statement.executeQuery("SELECT type_name FROM entities LIMIT 1") + } catch (e: SQLException) { + // Only create a new database if there is not an existing one already + statement.executeUpdate("DROP TABLE IF EXISTS entities") + // sqlite does not have create or replace, so we drop and create + statement.executeUpdate(tableQuery) + // if querying on both typename and qualified name, this is far more efficient and drops run times from 2.5 hours to 30 mins + statement.executeUpdate("CREATE INDEX typename_index ON entities(type_name, qual_name COLLATE NOCASE)") + statement.executeUpdate("CREATE INDEX qualified_name_index ON entities(qual_name COLLATE NOCASE)") + statement.executeUpdate("CREATE INDEX name_index ON entities(name COLLATE NOCASE)") + } + } + connection.commit() + } + } + + /** + * Remove the provided assets from the persistent connection cache. + * + * @param assets to remove from the cache + */ + fun deleteAssets(assets: Collection) { + DriverManager.getConnection(dbString).use { connection -> + connection.prepareStatement("delete from entities where type_name = ? and qual_name = ?").use { ps -> + assets.forEach { a -> + ps.setString(1, a.typeName) + ps.setString(2, a.qualifiedName) + ps.executeUpdate() + } + } + connection.commit() + } + } + + /** + * Add the provided assets to the persistent connection cache. + * + * @param assets to add to the cache + */ + fun addAssets(assets: Collection) { + DriverManager.getConnection(dbString).use { connection -> + connection.prepareStatement("insert or replace into entities values(?, ?, ?, ?, ?, ?)").use { ps -> + assets.forEach { a -> + if (a.qualifiedName in ignoreQNs) { + logger.debug { "Skipping ${a.qualifiedName} from being added to the cache" } + } else { + ps.setString(1, a.typeName) + ps.setString(2, a.qualifiedName) + ps.setString(3, a.connectionQualifiedName) + ps.setString(4, a.name) + ps.setInt(5, if (a is Column) a.order else -1) + ps.setString(6, a.tenantId) + ps.executeUpdate() + } + } + } + connection.commit() + } + } + + companion object { + /** Fields that need to be present on every asset to be added to connection cache. */ + val REQUIRED_FIELDS = + listOf( + Asset.TYPE_NAME, + Asset.QUALIFIED_NAME, + Asset.CONNECTION_QUALIFIED_NAME, + Asset.NAME, + Column.ORDER, + Asset.TENANT_ID, + ) + } +} diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ADLSSync.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ADLSSync.kt index e704e5250f..d7826905fd 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ADLSSync.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ADLSSync.kt @@ -13,6 +13,7 @@ import com.azure.storage.file.datalake.models.ListPathsOptions import mu.KLogger import org.pkl.core.module.ModuleKeyFactories.file import java.io.File +import java.io.IOException /** * Class to generally move data between ADLS and local storage. @@ -172,22 +173,26 @@ class ADLSSync( localFile: String, ) { logger.info { " ... downloading adls://$containerName/$remoteKey to $localFile" } - val local = File(localFile) - if (local.exists()) { - local.delete() - } - if (!local.parentFile.exists()) { - local.parentFile.mkdirs() - } - if (adlsClient != null) { - val fsClient = adlsClient.getFileSystemClient(containerName) - val fileClient = fsClient.getFileClient(remoteKey) - fileClient.readToFile(localFile) - } else if (blobContainerClient != null) { - val blobClient = blobContainerClient.getBlobClient(remoteKey) - blobClient.downloadToFile(localFile) - } else { - throw IllegalStateException("No ADLS client configured -- cannot download.") + try { + val local = File(localFile) + if (local.exists()) { + local.delete() + } + if (!local.parentFile.exists()) { + local.parentFile.mkdirs() + } + if (adlsClient != null) { + val fsClient = adlsClient.getFileSystemClient(containerName) + val fileClient = fsClient.getFileClient(remoteKey) + fileClient.readToFile(localFile) + } else if (blobContainerClient != null) { + val blobClient = blobContainerClient.getBlobClient(remoteKey) + blobClient.downloadToFile(localFile) + } else { + throw IllegalStateException("No ADLS client configured -- cannot download.") + } + } catch (e: Exception) { + throw IOException(e) } } @@ -242,15 +247,19 @@ class ADLSSync( logger.info { " ... uploading $localFile to adls://$containerName/$remoteKey" } // Note: no need to delete files first (putObject overwrites, including auto-versioning // if enabled on the bucket), and no need to create parent prefixes in ADLS - if (adlsClient != null) { - val fsClient = adlsClient.getFileSystemClient(containerName) - val fileClient = fsClient.getFileClient(remoteKey) - fileClient.uploadFromFile(localFile, true) - } else if (blobContainerClient != null) { - val blobClient = blobContainerClient.getBlobClient(remoteKey) - blobClient.uploadFromFile(localFile, true) - } else { - throw IllegalStateException("No ADLS client configured -- cannot upload.") + try { + if (adlsClient != null) { + val fsClient = adlsClient.getFileSystemClient(containerName) + val fileClient = fsClient.getFileClient(remoteKey) + fileClient.uploadFromFile(localFile, true) + } else if (blobContainerClient != null) { + val blobClient = blobContainerClient.getBlobClient(remoteKey) + blobClient.uploadFromFile(localFile, true) + } else { + throw IllegalStateException("No ADLS client configured -- cannot upload.") + } + } catch (e: Exception) { + throw IOException(e) } } } diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/GCSSync.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/GCSSync.kt index 59210bd86a..561ff428e9 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/GCSSync.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/GCSSync.kt @@ -9,6 +9,7 @@ import mu.KLogger import java.io.File import java.io.FileInputStream import java.io.FileOutputStream +import java.io.IOException /** * Class to generally move data between GCS and local storage. @@ -119,16 +120,20 @@ class GCSSync( localFile: String, ) { logger.info { " ... downloading gcs://$bucketName/$remoteKey to $localFile" } - val local = File(localFile) - if (local.exists()) { - local.delete() - } - if (!local.parentFile.exists()) { - local.parentFile.mkdirs() - } - val blob = storage.get(bucketName, remoteKey) - FileOutputStream(local).use { fos -> - blob.downloadTo(fos) + try { + val local = File(localFile) + if (local.exists()) { + local.delete() + } + if (!local.parentFile.exists()) { + local.parentFile.mkdirs() + } + val blob = storage.get(bucketName, remoteKey) + FileOutputStream(local).use { fos -> + blob.downloadTo(fos) + } + } catch (e: Exception) { + throw IOException(e) } } @@ -176,10 +181,14 @@ class GCSSync( logger.info { " ... uploading $localFile to gcs://$bucketName/$remoteKey" } // Note: no need to delete files first (putObject overwrites, including auto-versioning // if enabled on the bucket), and no need to create parent prefixes in GCS - val local = File(localFile) - val bucket = storage.get(bucketName) - FileInputStream(local).use { fis -> - bucket.create(remoteKey, fis) + try { + val local = File(localFile) + val bucket = storage.get(bucketName) + FileInputStream(local).use { fis -> + bucket.create(remoteKey, fis) + } + } catch (e: Exception) { + throw IOException(e) } } } diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ObjectStorageSyncer.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ObjectStorageSyncer.kt index e443c3e05c..71ca765943 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ObjectStorageSyncer.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/ObjectStorageSyncer.kt @@ -2,6 +2,8 @@ Copyright 2023 Atlan Pte. Ltd. */ package com.atlan.pkg.objectstore +import java.io.IOException + /** * Interface for syncing files between cloud object storage and local file systems. */ @@ -42,7 +44,9 @@ interface ObjectStorageSyncer { * * @param remoteKey from which to download the file * @param localFile into which to download the file + * @throws IOException on any errors downloading */ + @Throws(IOException::class) fun downloadFrom( remoteKey: String, localFile: String, @@ -67,7 +71,9 @@ interface ObjectStorageSyncer { * * @param localFile from which to upload the file * @param remoteKey into which to upload the file + * @throws IOException on any errors downloading */ + @Throws(IOException::class) fun uploadTo( localFile: String, remoteKey: String, diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/S3Sync.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/S3Sync.kt index 22110c906d..353a32731c 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/S3Sync.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/objectstore/S3Sync.kt @@ -11,6 +11,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest import software.amazon.awssdk.services.s3.model.ListObjectsV2Request import software.amazon.awssdk.services.s3.model.PutObjectRequest import java.io.File +import java.io.IOException /** * Class to generally move data between S3 and local storage. @@ -137,18 +138,22 @@ class S3Sync( localFile: String, ) { logger.info { " ... downloading s3://$bucketName/$remoteKey to $localFile" } - val local = File(localFile) - if (local.exists()) { - local.delete() - } - if (!local.parentFile.exists()) { - local.parentFile.mkdirs() + try { + val local = File(localFile) + if (local.exists()) { + local.delete() + } + if (!local.parentFile.exists()) { + local.parentFile.mkdirs() + } + val objectKey = File(remoteKey).path + s3Client.getObject( + GetObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + local.toPath(), + ) + } catch (e: Exception) { + throw IOException(e) } - val objectKey = File(remoteKey).path - s3Client.getObject( - GetObjectRequest.builder().bucket(bucketName).key(objectKey).build(), - local.toPath(), - ) } /** {@inheritDoc} */ @@ -198,11 +203,15 @@ class S3Sync( logger.info { " ... uploading $localFile to s3://$bucketName/$remoteKey" } // Note: no need to delete files first (putObject overwrites, including auto-versioning // if enabled on the bucket), and no need to create parent prefixes in S3 - val local = File(localFile) - val objectKey = File(remoteKey).path - s3Client.putObject( - PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), - local.toPath(), - ) + try { + val local = File(localFile) + val objectKey = File(remoteKey).path + s3Client.putObject( + PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + local.toPath(), + ) + } catch (e: Exception) { + throw IOException(e) + } } } diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/AssetRemover.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/AssetRemover.kt index 9f638ab24f..299a01d026 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/AssetRemover.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/AssetRemover.kt @@ -5,6 +5,7 @@ package com.atlan.pkg.util import com.atlan.Atlan import com.atlan.model.assets.Asset import com.atlan.model.enums.AtlanDeleteType +import com.atlan.pkg.Utils import com.atlan.pkg.serde.csv.CSVXformer import com.atlan.util.AssetBatch import de.siegmar.fastcsv.reader.CsvReader @@ -228,6 +229,9 @@ class AssetRemover( } } } + if (client.isInternal) { + Utils.updateConnectionCache(removed = assetsToDelete.keys.map { it.toMinimalAsset() }) + } } } } diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt index d4650b3b8e..e736221cde 100644 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt @@ -3,6 +3,7 @@ package com.atlan.pkg.rab import RelationalAssetsBuilderCfg +import com.atlan.Atlan import com.atlan.model.assets.Asset import com.atlan.model.assets.Column import com.atlan.model.assets.Connection @@ -106,7 +107,7 @@ object Importer { true, fieldSeparator, ) - connectionImporter.import() + val connectionResults = connectionImporter.import() logger.info { " --- Importing databases... ---" } val databaseImporter = @@ -119,7 +120,7 @@ object Importer { trackBatches, fieldSeparator, ) - databaseImporter.import() + val dbResults = databaseImporter.import() logger.info { " --- Importing schemas... ---" } val schemaImporter = @@ -132,7 +133,7 @@ object Importer { trackBatches, fieldSeparator, ) - schemaImporter.import() + val schResults = schemaImporter.import() logger.info { " --- Importing tables... ---" } val tableImporter = @@ -145,7 +146,7 @@ object Importer { trackBatches, fieldSeparator, ) - tableImporter.import() + val tblResults = tableImporter.import() logger.info { " --- Importing views... ---" } val viewImporter = @@ -158,7 +159,7 @@ object Importer { trackBatches, fieldSeparator, ) - viewImporter.import() + val viewResults = viewImporter.import() logger.info { " --- Importing materialized views... ---" } val materializedViewImporter = @@ -171,7 +172,7 @@ object Importer { trackBatches, fieldSeparator, ) - materializedViewImporter.import() + val mviewResults = materializedViewImporter.import() logger.info { " --- Importing columns... ---" } val columnImporter = @@ -184,7 +185,13 @@ object Importer { trackBatches, fieldSeparator, ) - columnImporter.import() + val colResults = columnImporter.import() + + if (Atlan.getDefaultClient().isInternal && trackBatches) { + // Only attempt to manage a connection cache if we are running in-cluster + val results = dbResults?.combinedWith(schResults)?.combinedWith(tblResults)?.combinedWith(viewResults)?.combinedWith(mviewResults)?.combinedWith(colResults) + Utils.updateConnectionCache(added = results?.primary?.created) + } } private fun preprocessCSV( diff --git a/sdk/src/main/java/com/atlan/util/AssetBatch.java b/sdk/src/main/java/com/atlan/util/AssetBatch.java index 6e6d82073d..c16a170ad8 100644 --- a/sdk/src/main/java/com/atlan/util/AssetBatch.java +++ b/sdk/src/main/java/com/atlan/util/AssetBatch.java @@ -9,6 +9,7 @@ import com.atlan.exception.InvalidRequestException; import com.atlan.exception.LogicException; import com.atlan.model.assets.Asset; +import com.atlan.model.assets.Column; import com.atlan.model.assets.IndistinctAsset; import com.atlan.model.assets.MaterializedView; import com.atlan.model.assets.Table; @@ -511,33 +512,45 @@ private void trackResponse(AssetMutationResponse response, List sent) { private void track(List tracker, Asset candidate) { try { - tracker.add(candidate - .trimToRequired() - .guid(candidate.getGuid()) - .name(candidate.getName()) - .build()); + tracker.add(buildCacheable(candidate.trimToRequired(), candidate)); } catch (InvalidRequestException e) { try { Class assetClass = Serde.getAssetClassForType(candidate.getTypeName()); Method method = assetClass.getMethod("_internal"); Object result = method.invoke(null); - Asset.AssetBuilder builder = (Asset.AssetBuilder) result; - tracker.add(builder.guid(candidate.getGuid()) - .qualifiedName(candidate.getQualifiedName()) - .build()); + tracker.add(buildCacheable((Asset.AssetBuilder) result, candidate)); } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException eRef) { - tracker.add(IndistinctAsset._internal() - .typeName(candidate.getTypeName()) - .guid(candidate.getGuid()) - .qualifiedName(candidate.getQualifiedName()) - .build()); + tracker.add(buildCacheable(IndistinctAsset._internal().typeName(candidate.getTypeName()), candidate)); } } } + /** + * Construct the minimal asset representation necessary for the asset to be included in a + * persistent connection cache. + * + * @param builder for the asset + * @param candidate from which to draw any additional details + * @return the minimally-complete, cacheable asset + */ + private Asset buildCacheable(Asset.AssetBuilder builder, Asset candidate) { + builder.guid(candidate.getGuid()) + .qualifiedName(candidate.getQualifiedName()) + .connectionQualifiedName(candidate.getConnectionQualifiedName()) + .name(candidate.getName()) + .tenantId(candidate.getTenantId()); + if (candidate instanceof Column) { + Integer order = ((Column) candidate).getOrder(); + if (order != null) { + ((Column.ColumnBuilder) builder).order(order); + } + } + return builder.build(); + } + /** * Internal class to capture batch failures. */ @@ -574,6 +587,18 @@ public AssetIdentity(String typeName, String qualifiedName, boolean caseInsensit } } + /** + * Translate this AssetIdentity into a minimal asset composed of only a type and a qualifiedName. + * + * @return the minimal asset + */ + public Asset toMinimalAsset() { + return IndistinctAsset._internal() + .typeName(typeName) + .qualifiedName(qualifiedName) + .build(); + } + @Override public String toString() { return typeName + "::" + qualifiedName;