Skip to content

Commit 4a2855b

Browse files
authored
Merge pull request #1044 from atlanhq/FT-779
Adds full delta calculation logic for RAB and CAB
2 parents 8ee3fef + 0857d1e commit 4a2855b

37 files changed

+1477
-389
lines changed

Diff for: gradle/libs.versions.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
[versions]
22
jackson = "2.18.1"
33
slf4j = "2.0.16"
4-
elasticsearch = "8.16.0"
4+
elasticsearch = "8.16.1"
55
freemarker = "2.3.33"
66
classgraph = "4.8.179"
77
testng = "7.10.2"
8-
log4j = "2.24.1"
8+
log4j = "2.24.2"
99
wiremock = "3.9.2"
1010
jnanoid = "2.0.0"
1111
numaflow = "0.9.0"
12-
awssdk = "2.29.18"
12+
awssdk = "2.29.20"
1313
gcs = "26.50.0"
1414
system-stubs = "2.1.7"
1515
fastcsv = "3.4.0"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/* SPDX-License-Identifier: Apache-2.0
2+
Copyright 2024 Atlan Pte. Ltd. */
3+
package com.atlan.pkg.cache
4+
5+
import com.atlan.cache.AbstractOffHeapCache
6+
import com.atlan.util.AssetBatch.AssetIdentity
7+
import java.nio.charset.StandardCharsets
8+
9+
/**
10+
* Generic class through which to cache any objects efficiently, off-heap, to avoid risking extreme
11+
* memory usage.
12+
*/
13+
class ChecksumCache(
14+
name: String?,
15+
) : AbstractOffHeapCache<AssetIdentity, String>(name) {
16+
/** {@inheritDoc} */
17+
override fun serializeKey(key: AssetIdentity): ByteArray {
18+
return key.toString().toByteArray(StandardCharsets.UTF_8)
19+
}
20+
21+
/** {@inheritDoc} */
22+
override fun deserializeKey(bytes: ByteArray): AssetIdentity {
23+
return AssetIdentity.fromString(String(bytes, StandardCharsets.UTF_8))
24+
}
25+
26+
/** {@inheritDoc} */
27+
override fun serializeValue(value: String): ByteArray {
28+
return value.toByteArray(StandardCharsets.UTF_8)
29+
}
30+
31+
/** {@inheritDoc} */
32+
override fun deserializeValue(bytes: ByteArray): String {
33+
return String(bytes, StandardCharsets.UTF_8)
34+
}
35+
}

Diff for: package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/AssetResolver.kt

+36
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
Copyright 2024 Atlan Pte. Ltd. */
33
package com.atlan.pkg.util
44

5+
import com.atlan.model.assets.Asset
6+
import com.atlan.util.AssetBatch.AssetIdentity
7+
import java.io.IOException
8+
59
/**
610
* Interface for resolving asset identities entirely from CSV file input (no calls to Atlan).
711
*/
@@ -47,4 +51,36 @@ interface AssetResolver {
4751
return "$name/$type"
4852
}
4953
}
54+
55+
/**
56+
* Resolve the asset represented by a row of values in a CSV to an asset identity.
57+
*
58+
* @param values row of values for that asset from the CSV
59+
* @param header order of column names in the CSV file being processed
60+
* @param connectionsMap cache of connection qualifiedNames, keyed by connection identity
61+
* @return a unique asset identity for that row of the CSV
62+
*/
63+
@Throws(IOException::class)
64+
fun resolveAsset(
65+
values: List<String>,
66+
header: List<String>,
67+
connectionsMap: Map<ConnectionIdentity, String>,
68+
): AssetIdentity? {
69+
val typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName)
70+
if (typeIdx < 0) {
71+
throw IOException(
72+
"Unable to find the column 'typeName'. This is a mandatory column in the input CSV.",
73+
)
74+
}
75+
val typeName = values[typeIdx]
76+
val qnDetails = getQualifiedNameDetails(values, header, typeName)
77+
val agnosticQN = qnDetails.uniqueQN
78+
val connectionIdentity = getConnectionIdentityFromQN(agnosticQN)
79+
if (connectionIdentity != null && connectionsMap.containsKey(connectionIdentity)) {
80+
val qualifiedName =
81+
agnosticQN.replaceFirst(connectionIdentity.toString(), connectionsMap[connectionIdentity]!!)
82+
return AssetIdentity(typeName, qualifiedName)
83+
}
84+
return null
85+
}
5086
}

Diff for: package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt

+103-22
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import com.atlan.pkg.cache.ConnectionCache
88
import com.atlan.pkg.objectstore.ObjectStorageSyncer
99
import com.atlan.pkg.serde.csv.CSVPreprocessor
1010
import com.atlan.pkg.serde.csv.RowPreprocessor
11+
import com.atlan.util.AssetBatch.AssetIdentity
1112
import mu.KLogger
1213
import java.io.File.separator
14+
import java.io.IOException
1315
import java.nio.file.Paths
1416
import java.time.Instant
1517
import java.time.ZoneId
@@ -27,6 +29,7 @@ import java.time.format.DateTimeFormatter
2729
* @param preprocessedDetails details retrieved from pre-processing the input CSV file
2830
* @param typesToRemove limit the asset types that will be removed to this collection of type names
2931
* @param logger for logging
32+
* @param reloadSemantic the type of reload to do for assets (default {@code all} will reload any assets that are not deleted, whether changed or not)
3033
* @param previousFilePreprocessor responsible for pre-processing the previous CSV file (if provided directly)
3134
* @param outputDirectory local directory where files can be written and compared
3235
* @param previousFileProcessedExtension extension to use in the object store for files that have been processed
@@ -40,59 +43,110 @@ class DeltaProcessor(
4043
val preprocessedDetails: Results,
4144
val typesToRemove: Collection<String>,
4245
private val logger: KLogger,
46+
val reloadSemantic: String = "all",
4347
val previousFilePreprocessor: CSVPreprocessor? = null,
4448
val outputDirectory: String = Paths.get(separator, "tmp").toString(),
4549
private val previousFileProcessedExtension: String = ".processed",
46-
) {
50+
) : AutoCloseable {
51+
private val objectStore = Utils.getBackingStore(outputDirectory)
52+
private var initialLoad: Boolean = true
53+
private var delta: FileBasedDelta? = null
54+
private var deletedAssets: OffHeapAssetCache? = null
55+
private val reloadAll = reloadSemantic == "all"
56+
4757
/**
48-
* Run the delta detection.
49-
* This includes: determining which assets should be deleted, deleting those assets, and updating
50-
* the persistent connection cache by removing any deleted assets and creating / updating any assets
51-
* that were created or modified.
52-
*
53-
* @param modifiedAssets list of assets that were modified by the processing up to the point of this delta detection
58+
* Calculate any delta from the provided file context.
5459
*/
55-
fun run(modifiedAssets: OffHeapAssetCache? = null) {
56-
var deletedAssets: OffHeapAssetCache? = null
60+
fun calculate() {
5761
if (semantic == "full") {
5862
if (qualifiedNamePrefix.isNullOrBlank()) {
59-
logger.warn { "Unable to determine qualifiedName prefix, will not delete any assets." }
63+
logger.warn { "Unable to determine qualifiedName prefix, cannot calculate any delta." }
6064
} else {
6165
val purgeAssets = removalType == "purge"
6266
val assetRootName = preprocessedDetails.assetRootName
6367
val previousFileLocation = "$previousFilesPrefix/$qualifiedNamePrefix"
64-
val objectStore = Utils.getBackingStore(outputDirectory)
6568
val previousFile =
6669
if (previousFilePreprocessor != null && previousFilePreprocessor.filename.isNotBlank()) {
6770
transformPreviousRaw(assetRootName, previousFilePreprocessor)
6871
} else {
6972
objectStore.copyLatestFrom(previousFileLocation, previousFileProcessedExtension, outputDirectory)
7073
}
7174
if (previousFile.isNotBlank()) {
72-
// If there was a previous file, calculate the delta to see what we need
73-
// to delete
74-
val assetRemover =
75-
AssetRemover(
75+
// If there was a previous file, calculate the delta (changes + deletions)
76+
initialLoad = false
77+
delta =
78+
FileBasedDelta(
7679
ConnectionCache.getIdentityMap(),
7780
resolver,
7881
logger,
7982
typesToRemove.toList(),
8083
qualifiedNamePrefix,
8184
purgeAssets,
85+
!reloadAll,
8286
outputDirectory,
8387
)
84-
assetRemover.calculateDeletions(preprocessedDetails.preprocessedFile, previousFile)
85-
if (assetRemover.hasAnythingToDelete()) {
86-
// Note: this will update the persistent connection cache for both adds and deletes
87-
deletedAssets = assetRemover.deleteAssets()
88-
}
88+
delta!!.calculateDelta(preprocessedDetails.preprocessedFile, previousFile)
8989
} else {
9090
logger.info { "No previous file found, treated it as an initial load." }
9191
}
92-
// Copy processed files to specified location in object storage for future comparison purposes
93-
uploadToBackingStore(objectStore, preprocessedDetails.preprocessedFile, qualifiedNamePrefix, previousFileProcessedExtension)
9492
}
9593
}
94+
}
95+
96+
/**
97+
* Resolve the asset represented by a row of values in a CSV to an asset identity.
98+
*
99+
* @param values row of values for that asset from the CSV
100+
* @param header order of column names in the CSV file being processed
101+
* @return a unique asset identity for that row of the CSV
102+
*/
103+
@Throws(IOException::class)
104+
fun resolveAsset(
105+
values: List<String>,
106+
header: List<String>,
107+
): AssetIdentity? {
108+
return delta?.resolveAsset(values, header)
109+
}
110+
111+
/**
112+
* Determine whether the provided asset identity should be processed (true) or skipped (false).
113+
*
114+
* @param identity of the asset to check whether reloading should occur
115+
* @return true if the asset with this identity should be reloaded, otherwise false
116+
*/
117+
fun reloadAsset(identity: AssetIdentity): Boolean {
118+
if (!reloadAll) {
119+
return delta?.assetsToReload?.containsKey(identity) ?: true
120+
}
121+
return true
122+
}
123+
124+
/**
125+
* Delete any assets that were detected by the delta to be deleted.
126+
*/
127+
fun processDeletions() {
128+
if (!initialLoad && delta!!.hasAnythingToDelete()) {
129+
// Note: this will update the persistent connection cache for both adds and deletes
130+
deletedAssets = delta!!.deleteAssets()
131+
}
132+
}
133+
134+
/**
135+
* Upload the latest processed file to the backing store, to persist the state for the next run.
136+
*/
137+
fun uploadStateToBackingStore() {
138+
if (!qualifiedNamePrefix.isNullOrBlank()) {
139+
// Copy processed files to specified location in object storage for future comparison purposes
140+
uploadToBackingStore(objectStore, preprocessedDetails.preprocessedFile, qualifiedNamePrefix, previousFileProcessedExtension)
141+
}
142+
}
143+
144+
/**
145+
* Update the persistent connection cache with details of any assets that were added or removed.
146+
*
147+
* @param modifiedAssets cache of assets that were created or modified (whether by initial processing or reloading)
148+
*/
149+
fun updateConnectionCache(modifiedAssets: OffHeapAssetCache? = null) {
96150
// Update the connection cache with any changes (added and / or removed assets)
97151
Utils.updateConnectionCache(
98152
added = modifiedAssets,
@@ -101,6 +155,33 @@ class DeltaProcessor(
101155
)
102156
}
103157

158+
/** {@inheritDoc} */
159+
override fun close() {
160+
uploadStateToBackingStore()
161+
var exception: Exception? = null
162+
if (delta != null) {
163+
try {
164+
delta!!.close()
165+
} catch (e: Exception) {
166+
exception = e
167+
}
168+
}
169+
if (deletedAssets != null) {
170+
try {
171+
deletedAssets!!.close()
172+
} catch (e: Exception) {
173+
if (exception != null) {
174+
exception.addSuppressed(e)
175+
} else {
176+
exception = e
177+
}
178+
}
179+
}
180+
if (exception != null) {
181+
throw exception
182+
}
183+
}
184+
104185
/**
105186
* Upload the file used to load the assets to Atlan backing store.
106187
*

0 commit comments

Comments
 (0)