Skip to content

Commit

Permalink
Merge pull request #498 from atlanhq/DVX-246
Browse files Browse the repository at this point in the history
Remove connection delete workflow from integration tests, bump default retry to 5
  • Loading branch information
cmgrote authored Feb 12, 2024
2 parents 2710290 + b14b8be commit 181307b
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@
import com.atlan.model.assets.Connection;
import com.atlan.model.core.AssetMutationResponse;
import com.atlan.model.enums.AtlanConnectorType;
import com.atlan.model.enums.AtlanWorkflowPhase;
import com.atlan.model.packages.ConnectionDelete;
import com.atlan.model.workflow.Workflow;
import com.atlan.model.workflow.WorkflowResponse;
import com.atlan.net.HttpClient;
import com.atlan.model.enums.AtlanDeleteType;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.testng.annotations.Test;

/**
Expand All @@ -33,8 +29,6 @@ public class ConnectionTest extends AtlanLiveTest {

private static final String PREFIX = makeUnique("CONN");

private static final AtomicInteger retryCount = new AtomicInteger(0);

/**
* Create a new connection with a unique name.
*
Expand Down Expand Up @@ -89,49 +83,40 @@ public static void deleteConnection(String qualifiedName, Logger log) throws Atl
}

/**
* Run the connection delete package for the specified connection, and block until it
* completes successfully.
* Delete all assets in the specified connection, then the connection itself,
* and block until it completes successfully.
*
* @param client connectivity to the Atlan tenant from which to purge the connection
* @param qualifiedName of the connection to delete
* @param log into which to write status information
* @throws AtlanException on any errors deleting the connection
* @throws AtlanException on any errors deleting the connection and its assets
* @throws InterruptedException if the busy-wait loop for monitoring is interuppted
*/
public static void deleteConnection(AtlanClient client, String qualifiedName, Logger log)
throws AtlanException, InterruptedException {
try {
Workflow deleteWorkflow =
ConnectionDelete.creator(qualifiedName, true).build().toWorkflow();
WorkflowResponse response = deleteWorkflow.run(client);
assertNotNull(response);
// If we get here we've succeeded in running, so we'll reset our retry counter
retryCount.set(0);
AtlanWorkflowPhase state = response.monitorStatus(log, Level.INFO, 420L);
assertNotNull(state);
if (state == AtlanWorkflowPhase.RUNNING) {
// If still running after 7 minutes, stop it (so it can then be archived)
log.warn("Stopping hung workflow...");
response = response.stop();
String workflowTemplateName =
response.getSpec().getWorkflowTemplateRef().get("name");
state = response.monitorStatus(log, Level.INFO, 90L);
assertEquals(state, AtlanWorkflowPhase.FAILED); // Status for stop is FAILED
client.workflows.archive(workflowTemplateName);
List<String> guids =
client.assets.select().where(Asset.QUALIFIED_NAME.startsWith(qualifiedName)).pageSize(50).stream()
.map(Asset::getGuid)
.collect(Collectors.toList());
if (!guids.isEmpty()) {
int totalToDelete = guids.size();
log.info(" --- Purging {} assets from {}... ---", totalToDelete, qualifiedName);
if (totalToDelete < 20) {
client.assets.delete(guids, AtlanDeleteType.PURGE);
} else {
assertEquals(state, AtlanWorkflowPhase.SUCCESS);
client.workflows.archive(response.getMetadata().getName());
}
} catch (InvalidRequestException e) {
// Can happen if two deletion workflows are run at the same time,
// in which case we should wait a few seconds and try again
int attempt = retryCount.incrementAndGet();
log.warn("Race condition on parallel deletion, waiting to retry ({})...", attempt);
Thread.sleep(HttpClient.waitTime(attempt).toMillis());
if (attempt < Atlan.getMaxNetworkRetries()) {
deleteConnection(client, qualifiedName, log);
for (int i = 0; i < totalToDelete; i += 20) {
log.info(" ... next batch of 20 ({}%)", Math.round((i * 100.0) / totalToDelete));
List<String> sublist = guids.subList(i, Math.min(i + 20, totalToDelete));
client.assets.delete(sublist, AtlanDeleteType.PURGE);
}
}
}
// Purge the connection itself, now that all assets are purged
Optional<Asset> found = Connection.select().where(Connection.QUALIFIED_NAME.eq(qualifiedName)).stream()
.findFirst();
if (found.isPresent()) {
client.assets.delete(found.get().getGuid(), AtlanDeleteType.PURGE).block();
}
}

@Test(groups = {"invalid.connection"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package com.atlan.pkg

import com.atlan.Atlan
import com.atlan.AtlanClient
import com.atlan.exception.ConflictException
import com.atlan.model.assets.Asset
import com.atlan.model.assets.Connection
import com.atlan.model.assets.Glossary
Expand All @@ -12,6 +13,7 @@ import com.atlan.model.enums.AtlanConnectorType
import com.atlan.model.enums.AtlanDeleteType
import com.atlan.model.search.IndexSearchRequest
import com.atlan.model.search.IndexSearchResponse
import com.atlan.model.typedefs.AtlanTagDef
import com.atlan.net.HttpClient
import com.atlan.serde.Serde
import com.aventrix.jnanoid.jnanoid.NanoIdUtils
Expand Down Expand Up @@ -245,7 +247,27 @@ abstract class PackageTest {
}
}
// Purge the connection itself, now that all assets are purged
client.assets.delete(it.guid, AtlanDeleteType.PURGE)
client.assets.delete(it.guid, AtlanDeleteType.PURGE).block()
}
}
}

/**
* Remove the specified tag.
*
* @param displayName human-readable display name of the tag to remove
* @throws ConflictException if the tag cannot be removed because there are still references to it
*/
@Throws(ConflictException::class)
fun removeTag(displayName: String, retryCount: Int = 0) {
try {
AtlanTagDef.purge(displayName)
} catch (e: ConflictException) {
if (retryCount < client.maxNetworkRetries) {
Thread.sleep(HttpClient.waitTime(retryCount).toMillis())
removeTag(displayName, retryCount + 1)
} else {
throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ class ImportGlossariesTest : PackageTest() {
fun afterClass(context: ITestContext) {
removeGlossary(glossary1)
removeGlossary(glossary2)
AtlanTagDef.purge(tag1)
AtlanTagDef.purge(tag2)
removeTag(tag1)
removeTag(tag2)
teardown(context.failedTests.size() > 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ class LinkTermsTest : PackageTest() {
fun afterClass(context: ITestContext) {
removeConnection(connectionName, connectorType)
removeGlossary(glossaryName)
AtlanTagDef.purge(tag1)
AtlanTagDef.purge(tag2)
removeTag(tag1)
removeTag(tag2)
teardown(context.failedTests.size() > 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ class CreateThenUpsertRABTest : PackageTest() {
@AfterClass(alwaysRun = true)
fun afterClass(context: ITestContext) {
removeConnection(conn1, conn1Type)
AtlanTagDef.purge(tag1)
AtlanTagDef.purge(tag2)
removeTag(tag1)
removeTag(tag2)
teardown(context.failedTests.size() > 0)
}
}
2 changes: 1 addition & 1 deletion sdk/src/main/java/com/atlan/Atlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public abstract class Atlan {
public static final int DEFAULT_CONNECT_TIMEOUT = 30 * 1000;
public static final int DEFAULT_READ_TIMEOUT = 120 * 1000;
public static final int DEFAULT_NETWORK_RETRIES = 3;
public static final int DEFAULT_NETWORK_RETRIES = 5;

public static final String VERSION = "1.10.5-SNAPSHOT";

Expand Down

0 comments on commit 181307b

Please sign in to comment.