Skip to content

Commit

Permalink
Merge pull request #940 from atlanhq/DVX-708
Browse files Browse the repository at this point in the history
Further concurrency tweaks
  • Loading branch information
cmgrote authored Oct 22, 2024
2 parents dd601f8 + 1db3eff commit aceffd7
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
12 changes: 9 additions & 3 deletions sdk/src/main/java/com/atlan/util/AssetBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private AssetMutationResponse process() throws AtlanException {
* @return the mutation response from the queued batch of assets that were flushed
* @throws AtlanException on any problems flushing (submitting) the batch
*/
public AssetMutationResponse flush() throws AtlanException {
public synchronized AssetMutationResponse flush() throws AtlanException {
AsyncCreationResponse response = null;
List<Asset> revised = null;
if (!_batch.isEmpty()) {
Expand Down Expand Up @@ -504,8 +504,14 @@ private void trackResponse(AssetMutationResponse response, List<Asset> sent) {
resolvedGuids.putAll(response.getGuidAssignments());
}
if (sent != null) {
Set<String> createdGuids = created.stream().map(Asset::getGuid).collect(Collectors.toSet());
Set<String> updatedGuids = updated.stream().map(Asset::getGuid).collect(Collectors.toSet());
Set<String> createdGuids;
Set<String> updatedGuids;
synchronized (created) {
createdGuids = created.stream().map(Asset::getGuid).collect(Collectors.toSet());
}
synchronized (updated) {
updatedGuids = updated.stream().map(Asset::getGuid).collect(Collectors.toSet());
}
for (Asset one : sent) {
String guid = one.getGuid();
if (guid != null
Expand Down
56 changes: 25 additions & 31 deletions sdk/src/main/java/com/atlan/util/ParallelBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.atlan.model.core.AssetMutationResponse;
import com.atlan.model.enums.AssetCreationHandling;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -63,14 +63,14 @@ public class ParallelBatch {
@Builder.Default
private boolean tableViewAgnostic = false;

private final Map<Long, AssetBatch> batchMap = new ConcurrentHashMap<>();
private final List<Asset> created = new ArrayList<>();
private final List<Asset> updated = new ArrayList<>();
private final List<Asset> restored = new ArrayList<>();
private final List<AssetBatch.FailedBatch> failures = new ArrayList<>();
private final List<Asset> skipped = new ArrayList<>();
private final Map<String, String> resolvedGuids = new HashMap<>();
private final Map<AssetBatch.AssetIdentity, String> resolvedQualifiedNames = new HashMap<>();
private final ConcurrentHashMap<Long, AssetBatch> batchMap = new ConcurrentHashMap<>();
private final List<Asset> created = Collections.synchronizedList(new ArrayList<>());
private final List<Asset> updated = Collections.synchronizedList(new ArrayList<>());
private final List<Asset> restored = Collections.synchronizedList(new ArrayList<>());
private final List<AssetBatch.FailedBatch> failures = Collections.synchronizedList(new ArrayList<>());
private final List<Asset> skipped = Collections.synchronizedList(new ArrayList<>());
private final Map<String, String> resolvedGuids = new ConcurrentHashMap<>();
private final Map<AssetBatch.AssetIdentity, String> resolvedQualifiedNames = new ConcurrentHashMap<>();

/**
* Create a new batch of assets to be bulk-saved, in parallel (across threads).
Expand Down Expand Up @@ -272,28 +272,22 @@ public ParallelBatch(
* @throws AtlanException on any problems adding the asset to or processing the batch
*/
public AssetMutationResponse add(Asset single) throws AtlanException {
lock.writeLock().lock();
try {
long id = Thread.currentThread().getId();
if (!batchMap.containsKey(id)) {
batchMap.put(
id,
new AssetBatch(
client,
maxSize,
replaceAtlanTags,
customMetadataHandling,
captureFailures,
updateOnly,
track,
!caseSensitive,
creationHandling,
tableViewAgnostic));
}
return batchMap.get(id).add(single);
} finally {
lock.writeLock().unlock();
}
long id = Thread.currentThread().getId();
// Note: these are thread-specific operations, so not explicitly locked or synchronized
AssetBatch batch = batchMap.computeIfAbsent(
id,
k -> new AssetBatch(
client,
maxSize,
replaceAtlanTags,
customMetadataHandling,
captureFailures,
updateOnly,
track,
!caseSensitive,
creationHandling,
tableViewAgnostic));
return batch.add(single);
}

/**
Expand Down

0 comments on commit aceffd7

Please sign in to comment.