Skip to content

Commit

Permalink
Merge pull request #1095 from atlanhq/FT-839
Browse files Browse the repository at this point in the history
Fixes bug that attempted to process non-existent input files
  • Loading branch information
cmgrote authored Dec 5, 2024
2 parents 1ee9e09 + e5f8100 commit d9152f9
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 35 deletions.
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ testng = "7.10.2"
log4j = "2.24.2"
wiremock = "3.10.0"
jnanoid = "2.0.0"
awssdk = "2.29.27"
gcs = "26.50.0"
awssdk = "2.29.29"
gcs = "26.51.0"
system-stubs = "2.1.7"
fastcsv = "3.4.0"
poi = "5.3.0"
Expand Down
22 changes: 22 additions & 0 deletions package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ object Utils {
*
* @param client connectivity to the Atlan tenant
* @param guid of the asset for which to produce a link
* @return a URL that links directly to an asset in Atlan
*/
fun getAssetLink(
client: AtlanClient,
Expand All @@ -532,6 +533,7 @@ object Utils {
*
* @param client connectivity to the Atlan tenant
* @param guid of the asset for which to produce a link
* @return a URL that links directly to a data product or data domain in Atlan
*/
fun getProductLink(
client: AtlanClient,
Expand Down Expand Up @@ -715,6 +717,7 @@ object Utils {
* @param syncer object storage syncer
* @param outputDirectory local directory into which to download the file
* @param prefix object prefix in object storage
* @return a list of paths of the input files
*/
fun getInputFiles(
syncer: ObjectStorageSyncer,
Expand All @@ -731,6 +734,7 @@ object Utils {
* @param syncer object storage syncer
* @param outputDirectory local directory into which to download the file
* @param remote object key in object storage
* @return the path of the input file
*/
fun getInputFile(
syncer: ObjectStorageSyncer,
Expand All @@ -743,6 +747,24 @@ object Utils {
return path
}

/**
* Determines whether a particular input file has been provided or not.
*
* @param importType details of the type of import expected (direct vs object store-based)
* @param directFile details of the directly-provided file
* @param objectStoreKey details of the object store-provided file
* @return true if the file has been provided through either means, otherwise false
*/
fun isFileProvided(
importType: String,
directFile: String,
objectStoreKey: String,
): Boolean {
val directUpload = importType == "DIRECT"
return (directUpload && directFile.isNotBlank() && !directFile.endsWith(DEFAULT_FILE)) ||
(!directUpload && objectStoreKey.isNotBlank())
}

/**
* Upload the provided output file to the object store defined by the credentials available.
* Note: if no credentials are provided, the default (in-tenant) object store will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ object Importer {
ctx: PackageContext<AssetImportCfg>,
outputDirectory: String = "tmp",
): ImportResults? {
val directUpload = ctx.config.importType == "DIRECT"
val assetsFileProvided = (directUpload && ctx.config.assetsFile.isNotBlank() && !ctx.config.assetsFile.endsWith(Utils.DEFAULT_FILE)) || (!directUpload && ctx.config.assetsKey.isNotBlank())
val glossariesFileProvided = (directUpload && ctx.config.glossariesFile.isNotBlank() && !ctx.config.glossariesFile.endsWith(Utils.DEFAULT_FILE)) || (!directUpload && ctx.config.glossariesKey.isNotBlank())
val dataProductsFileProvided = (directUpload && ctx.config.dataProductsFile.isNotBlank() && !ctx.config.dataProductsFile.endsWith(Utils.DEFAULT_FILE)) || (!directUpload && ctx.config.dataProductsKey.isNotBlank())
val assetsFileProvided = Utils.isFileProvided(ctx.config.importType, ctx.config.assetsFile, ctx.config.assetsKey)
val glossariesFileProvided = Utils.isFileProvided(ctx.config.importType, ctx.config.glossariesFile, ctx.config.glossariesKey)
val dataProductsFileProvided = Utils.isFileProvided(ctx.config.importType, ctx.config.dataProductsFile, ctx.config.dataProductsKey)
if (!assetsFileProvided && !glossariesFileProvided && !dataProductsFileProvided) {
logger.error { "No input file was provided for either data products, glossaries or assets." }
exitProcess(1)
Expand All @@ -45,7 +44,7 @@ object Importer {
Utils.getInputFile(
ctx.config.glossariesFile,
outputDirectory,
directUpload,
ctx.config.importType == "DIRECT",
ctx.config.glossariesPrefix,
ctx.config.glossariesKey,
)
Expand Down Expand Up @@ -74,7 +73,7 @@ object Importer {
Utils.getInputFile(
ctx.config.assetsFile,
outputDirectory,
directUpload,
ctx.config.importType == "DIRECT",
ctx.config.assetsPrefix,
ctx.config.assetsKey,
)
Expand All @@ -97,7 +96,7 @@ object Importer {
Utils.getInputFile(
ctx.config.dataProductsFile,
outputDirectory,
directUpload,
ctx.config.importType == "DIRECT",
ctx.config.dataProductsPrefix,
ctx.config.dataProductsKey,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,10 @@ object Importer {
fun import(
ctx: PackageContext<CubeAssetsBuilderCfg>,
outputDirectory: String = "tmp",
): String? {
): String {
val fieldSeparator = ctx.config.assetsFieldSeparator[0]
val assetsUpload = ctx.config.assetsImportType == "DIRECT"
val assetsKey = ctx.config.assetsKey
val assetsFilename = ctx.config.assetsFile

val assetsFileProvided = (assetsUpload && assetsFilename.isNotBlank()) || (!assetsUpload && assetsKey.isNotBlank())
val assetsFileProvided = Utils.isFileProvided(ctx.config.assetsImportType, ctx.config.assetsFile, ctx.config.assetsKey)
if (!assetsFileProvided) {
logger.error { "No input file was provided for assets." }
exitProcess(1)
Expand All @@ -65,11 +62,11 @@ object Importer {
// to allow subsequent out-of-order parallel processing
val assetsInput =
Utils.getInputFile(
assetsFilename,
ctx.config.assetsFile,
outputDirectory,
assetsUpload,
ctx.config.assetsImportType == "DIRECT",
ctx.config.assetsPrefix,
assetsKey,
ctx.config.assetsKey,
)
val preprocessedDetails = Preprocessor(assetsInput, fieldSeparator).preprocess<Results>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,19 @@ object Loader {
ctx: PackageContext<LineageBuilderCfg>,
outputDirectory: String = "tmp",
) {
val lineageUpload = ctx.config.lineageImportType == "DIRECT"
val lineageFilename = ctx.config.lineageFile
val lineageKey = ctx.config.lineageKey

val lineageFileProvided = (lineageUpload && lineageFilename.isNotBlank()) || (!lineageUpload && lineageKey.isNotBlank())
val lineageFileProvided = Utils.isFileProvided(ctx.config.lineageImportType, ctx.config.lineageFile, ctx.config.lineageKey)
if (!lineageFileProvided) {
logger.error { "No input file was provided for lineage." }
exitProcess(1)
}

val lineageInput =
Utils.getInputFile(
lineageFilename,
ctx.config.lineageFile,
outputDirectory,
lineageUpload,
ctx.config.lineageImportType == "DIRECT",
ctx.config.lineagePrefix,
lineageKey,
ctx.config.lineageKey,
)
if (lineageInput.isNotBlank()) {
FieldSerde.FAIL_ON_ERRORS.set(ctx.config.lineageFailOnErrors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ object OpenAPISpecLoader {
fun main(args: Array<String>) {
Utils.initializeContext<OpenAPISpecLoaderCfg>().use { ctx ->
val outputDirectory = if (args.isEmpty()) "tmp" else args[0]
val importType = ctx.config.importType
val specUrl = ctx.config.specUrl
val specFilename = ctx.config.specFile
val specKey = ctx.config.specKey
val batchSize = 20

val inputQN =
Expand All @@ -39,7 +35,7 @@ object OpenAPISpecLoader {
val connectionQN =
Utils.createOrReuseConnection(ctx.client, ctx.config.connectionUsage, inputQN, ctx.config.connection)

val specFileProvided = (importType == "DIRECT" && specFilename.isNotBlank()) || (importType == "CLOUD" && specKey.isNotBlank()) || (importType == "URL" && specUrl.isNotBlank())
val specFileProvided = Utils.isFileProvided(ctx.config.importType, ctx.config.specFile, ctx.config.specKey)
if (!specFileProvided) {
logger.error { "No input file was provided for the OpenAPI spec." }
exitProcess(1)
Expand All @@ -51,19 +47,19 @@ object OpenAPISpecLoader {
}

val sourceUrl =
when (importType) {
when (ctx.config.importType) {
"CLOUD" -> {
Utils.getInputFile(
specFilename,
ctx.config.specFile,
outputDirectory,
false,
ctx.config.specPrefix,
specKey,
ctx.config.specKey,
)
}

"DIRECT" -> specFilename
else -> specUrl
"DIRECT" -> ctx.config.specFile
else -> ctx.config.specUrl
}

logger.info { "Loading OpenAPI specification from $sourceUrl into: $connectionQN" }
Expand Down

0 comments on commit d9152f9

Please sign in to comment.