Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds connection cache management for relational assets builder #788

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package com.atlan.pkg
import com.atlan.Atlan
import com.atlan.exception.AtlanException
import com.atlan.exception.NotFoundException
import com.atlan.model.assets.Asset
import com.atlan.model.assets.Connection
import com.atlan.model.enums.AssetCreationHandling
import com.atlan.pkg.cache.PersistentConnectionCache
import com.atlan.pkg.model.Credential
import com.atlan.pkg.objectstore.ADLSCredential
import com.atlan.pkg.objectstore.ADLSSync
Expand All @@ -24,6 +26,8 @@ import mu.KotlinLogging
import org.simplejavamail.email.EmailBuilder
import org.simplejavamail.mailer.MailerBuilder
import java.io.File
import java.io.File.separator
import java.io.IOException
import java.nio.file.Paths
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicLong
Expand Down Expand Up @@ -777,4 +781,68 @@ object Utils {
else -> throw IllegalStateException("Unable to determine cloud provider: $cloud")
}
}

/**
* Update the connection cache for the provided assets.
*
* @param added assets that were added
* @param removed assets that were deleted
*/
fun updateConnectionCache(
added: Collection<Asset>? = null,
removed: Collection<Asset>? = null,
) {
val sync = getBackingStore()
val map = CacheUpdates.build(added, removed)
for ((connectionQN, assets) in map) {
val paths = mutableListOf("tmp", "cache")
paths.addAll(connectionQN.split("/"))
val tmpFile = paths.joinToString { "/" }
// Retrieve any pre-existing cache first, so we can update it
try {
sync.downloadFrom("connection-cache/$connectionQN.sqlite", tmpFile)
} catch (e: IOException) {
logger.info(e) { "Unable to download pre-existing cache: connection-cache/$connectionQN.sqlite" }
}
val cache = PersistentConnectionCache(tmpFile)
cache.addAssets(assets.added)
cache.deleteAssets(assets.removed)
// Replace the cache with the updated one
try {
sync.uploadTo(tmpFile, "connection-cache/$connectionQN.sqlite")
} catch (e: IOException) {
logger.error(e) { "Unable to upload updated cache: connection-cache/$connectionQN.sqlite" }
}
}
}

private data class CacheUpdates(
val connectionQN: String,
val added: MutableList<Asset>,
val removed: MutableList<Asset>,
) {
companion object {
fun build(
add: Collection<Asset>?,
remove: Collection<Asset>?,
): Map<String, CacheUpdates> {
val map = mutableMapOf<String, CacheUpdates>()
add?.forEach {
val connectionQN = it.connectionQualifiedName
if (!map.containsKey(connectionQN)) {
map[connectionQN] = CacheUpdates(connectionQN, mutableListOf(), mutableListOf())
}
map[connectionQN]!!.added.add(it)
}
remove?.forEach {
val connectionQN = it.connectionQualifiedName
if (!map.containsKey(connectionQN)) {
map[connectionQN] = CacheUpdates(connectionQN, mutableListOf(), mutableListOf())
}
map[connectionQN]!!.removed.add(it)
}
return map
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.pkg.cache

import com.atlan.model.assets.Asset
import com.atlan.model.assets.Column
import mu.KotlinLogging
import java.sql.DriverManager
import java.sql.SQLException

/**
* Cache that contains a listing of all asset identities for a given connection, and stores
* these persistently in object storage for use by other workflows (i.e. in considering lineage).
*
* @param dbFile the file path to the SQLite database file
* @param ignoreQNs a collection of qualifiedNames to ignore
*/
class PersistentConnectionCache(
private val dbFile: String,
private val ignoreQNs: Collection<String> = emptySet(),
) {
private val logger = KotlinLogging.logger {}
private val dbString = "jdbc:sqlite:$dbFile"

init {
val tableQuery =
"""
CREATE TABLE entities (
type_name TEXT NOT NULL,
qual_name TEXT NOT NULL,
con_qual_name TEXT,
name TEXT,
order_seq INTEGER,
tenant_id TEXT,
PRIMARY KEY (type_name, qual_name)
)
""".trimIndent()

DriverManager.getConnection(dbString).use { connection ->
connection.createStatement().use { statement ->
try {
statement.executeQuery("SELECT type_name FROM entities LIMIT 1")
} catch (e: SQLException) {
// Only create a new database if there is not an existing one already
statement.executeUpdate("DROP TABLE IF EXISTS entities")
// sqlite does not have create or replace, so we drop and create
statement.executeUpdate(tableQuery)
// if querying on both typename and qualified name, this is far more efficient and drops run times from 2.5 hours to 30 mins
statement.executeUpdate("CREATE INDEX typename_index ON entities(type_name, qual_name COLLATE NOCASE)")
statement.executeUpdate("CREATE INDEX qualified_name_index ON entities(qual_name COLLATE NOCASE)")
statement.executeUpdate("CREATE INDEX name_index ON entities(name COLLATE NOCASE)")
}
}
connection.commit()
}
}

/**
* Remove the provided assets from the persistent connection cache.
*
* @param assets to remove from the cache
*/
fun deleteAssets(assets: Collection<Asset>) {
DriverManager.getConnection(dbString).use { connection ->
connection.prepareStatement("delete from entities where type_name = ? and qual_name = ?").use { ps ->
assets.forEach { a ->
ps.setString(1, a.typeName)
ps.setString(2, a.qualifiedName)
ps.executeUpdate()
}
}
connection.commit()
}
}

/**
* Add the provided assets to the persistent connection cache.
*
* @param assets to add to the cache
*/
fun addAssets(assets: Collection<Asset>) {
DriverManager.getConnection(dbString).use { connection ->
connection.prepareStatement("insert or replace into entities values(?, ?, ?, ?, ?, ?)").use { ps ->
assets.forEach { a ->
if (a.qualifiedName in ignoreQNs) {
logger.debug { "Skipping ${a.qualifiedName} from being added to the cache" }
} else {
ps.setString(1, a.typeName)
ps.setString(2, a.qualifiedName)
ps.setString(3, a.connectionQualifiedName)
ps.setString(4, a.name)
ps.setInt(5, if (a is Column) a.order else -1)
ps.setString(6, a.tenantId)
ps.executeUpdate()
}
}
}
connection.commit()
}
}

companion object {
/** Fields that need to be present on every asset to be added to connection cache. */
val REQUIRED_FIELDS =
listOf(
Asset.TYPE_NAME,
Asset.QUALIFIED_NAME,
Asset.CONNECTION_QUALIFIED_NAME,
Asset.NAME,
Column.ORDER,
Asset.TENANT_ID,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.azure.storage.file.datalake.models.ListPathsOptions
import mu.KLogger
import org.pkl.core.module.ModuleKeyFactories.file
import java.io.File
import java.io.IOException

/**
* Class to generally move data between ADLS and local storage.
Expand Down Expand Up @@ -172,22 +173,26 @@ class ADLSSync(
localFile: String,
) {
logger.info { " ... downloading adls://$containerName/$remoteKey to $localFile" }
val local = File(localFile)
if (local.exists()) {
local.delete()
}
if (!local.parentFile.exists()) {
local.parentFile.mkdirs()
}
if (adlsClient != null) {
val fsClient = adlsClient.getFileSystemClient(containerName)
val fileClient = fsClient.getFileClient(remoteKey)
fileClient.readToFile(localFile)
} else if (blobContainerClient != null) {
val blobClient = blobContainerClient.getBlobClient(remoteKey)
blobClient.downloadToFile(localFile)
} else {
throw IllegalStateException("No ADLS client configured -- cannot download.")
try {
val local = File(localFile)
if (local.exists()) {
local.delete()
}
if (!local.parentFile.exists()) {
local.parentFile.mkdirs()
}
if (adlsClient != null) {
val fsClient = adlsClient.getFileSystemClient(containerName)
val fileClient = fsClient.getFileClient(remoteKey)
fileClient.readToFile(localFile)
} else if (blobContainerClient != null) {
val blobClient = blobContainerClient.getBlobClient(remoteKey)
blobClient.downloadToFile(localFile)
} else {
throw IllegalStateException("No ADLS client configured -- cannot download.")
}
} catch (e: Exception) {
throw IOException(e)
}
}

Expand Down Expand Up @@ -242,15 +247,19 @@ class ADLSSync(
logger.info { " ... uploading $localFile to adls://$containerName/$remoteKey" }
// Note: no need to delete files first (putObject overwrites, including auto-versioning
// if enabled on the bucket), and no need to create parent prefixes in ADLS
if (adlsClient != null) {
val fsClient = adlsClient.getFileSystemClient(containerName)
val fileClient = fsClient.getFileClient(remoteKey)
fileClient.uploadFromFile(localFile, true)
} else if (blobContainerClient != null) {
val blobClient = blobContainerClient.getBlobClient(remoteKey)
blobClient.uploadFromFile(localFile, true)
} else {
throw IllegalStateException("No ADLS client configured -- cannot upload.")
try {
if (adlsClient != null) {
val fsClient = adlsClient.getFileSystemClient(containerName)
val fileClient = fsClient.getFileClient(remoteKey)
fileClient.uploadFromFile(localFile, true)
} else if (blobContainerClient != null) {
val blobClient = blobContainerClient.getBlobClient(remoteKey)
blobClient.uploadFromFile(localFile, true)
} else {
throw IllegalStateException("No ADLS client configured -- cannot upload.")
}
} catch (e: Exception) {
throw IOException(e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import mu.KLogger
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.IOException

/**
* Class to generally move data between GCS and local storage.
Expand Down Expand Up @@ -119,16 +120,20 @@ class GCSSync(
localFile: String,
) {
logger.info { " ... downloading gcs://$bucketName/$remoteKey to $localFile" }
val local = File(localFile)
if (local.exists()) {
local.delete()
}
if (!local.parentFile.exists()) {
local.parentFile.mkdirs()
}
val blob = storage.get(bucketName, remoteKey)
FileOutputStream(local).use { fos ->
blob.downloadTo(fos)
try {
val local = File(localFile)
if (local.exists()) {
local.delete()
}
if (!local.parentFile.exists()) {
local.parentFile.mkdirs()
}
val blob = storage.get(bucketName, remoteKey)
FileOutputStream(local).use { fos ->
blob.downloadTo(fos)
}
} catch (e: Exception) {
throw IOException(e)
}
}

Expand Down Expand Up @@ -176,10 +181,14 @@ class GCSSync(
logger.info { " ... uploading $localFile to gcs://$bucketName/$remoteKey" }
// Note: no need to delete files first (putObject overwrites, including auto-versioning
// if enabled on the bucket), and no need to create parent prefixes in GCS
val local = File(localFile)
val bucket = storage.get(bucketName)
FileInputStream(local).use { fis ->
bucket.create(remoteKey, fis)
try {
val local = File(localFile)
val bucket = storage.get(bucketName)
FileInputStream(local).use { fis ->
bucket.create(remoteKey, fis)
}
} catch (e: Exception) {
throw IOException(e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.pkg.objectstore

import java.io.IOException

/**
* Interface for syncing files between cloud object storage and local file systems.
*/
Expand Down Expand Up @@ -42,7 +44,9 @@ interface ObjectStorageSyncer {
*
* @param remoteKey from which to download the file
* @param localFile into which to download the file
* @throws IOException on any errors downloading
*/
@Throws(IOException::class)
fun downloadFrom(
remoteKey: String,
localFile: String,
Expand All @@ -67,7 +71,9 @@ interface ObjectStorageSyncer {
*
* @param localFile from which to upload the file
* @param remoteKey into which to upload the file
* @throws IOException on any errors downloading
*/
@Throws(IOException::class)
fun uploadTo(
localFile: String,
remoteKey: String,
Expand Down
Loading
Loading