From 0a29a2cc54aca9cf36dd88b6132869e6c58597b5 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Wed, 3 Apr 2024 00:54:22 +0530 Subject: [PATCH 1/2] logging for transform Signed-off-by: Sarthak Aggarwal --- .../transform/TransformRunner.kt | 17 +++++++++++++++++ .../transform/TransformSearchService.kt | 5 +++++ 2 files changed, 22 insertions(+) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 8ac49dce0..a27a9c6a3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -117,6 +117,7 @@ object TransformRunner : val transformLockManager = transformContext.transformLockManager transformLockManager.acquireLockForScheduledJob() try { + logger.debug("Transform Job ${transform.id} Initiating.") do { when { transformLockManager.lock == null -> { @@ -141,6 +142,7 @@ object TransformRunner : // If we have not populated the list of shards to search, do so now if (bucketsToTransform.shardsToSearch == null) { // Note the timestamp when we got the shard global checkpoints to the user may know what data is included + logger.debug("Transform job ${transform.id} is populating shards to search for index ${transform.sourceIndex}.") newGlobalCheckpointTime = Instant.now() newGlobalCheckpoints = transformSearchService.getShardsGlobalCheckpoint(transform.sourceIndex) bucketsToTransform = @@ -148,6 +150,11 @@ object TransformRunner : metadata.shardIDToGlobalCheckpoint, newGlobalCheckpoints, ) + logger.debug( + "Transform job {} fetched global checkpoints {}.", + transform.id, + newGlobalCheckpoints + ) } // If there are shards to search do it here if (bucketsToTransform.currentShard != null) { @@ -161,6 +168,13 @@ object TransformRunner : bucketsToTransform.modifiedBuckets.filter { transformProcessedBucketLog.isNotProcessed(it) }.toMutableSet() + + logger.debug( + "Transform job {} recompute to start with modified buckets {}. Processing shard {}.", + transform.id, + modifiedBuckets.size, + bucketsToTransform.currentShard + ) // Recompute modified buckets and update them in targetIndex currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext) // Add processed buckets to 'processed set' so that we don't try to reprocess them again @@ -229,6 +243,7 @@ object TransformRunner : ) } currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets) + logger.debug("Transform job {} has current buckets {} to transform. Processing Shard {} with checkpoints from {} to {}.", transform.id,currentBucketsToTransform.modifiedBuckets.size, currentShard.shardId ,currentShard.from, currentShard.to) val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis + shardLevelModifiedBuckets.searchTimeInMillis @@ -325,7 +340,9 @@ object TransformRunner : } val indexTimeInMillis = withTransformSecurityContext(transform) { + logger.debug("Transform job {} starting to index for target index: {} with documents {}.", transform.id,transform.targetIndex, transformSearchResult.docsToIndex.size) transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext) + logger.debug("Transform job {} completed to index for target index: {}.", transform.id, transform.targetIndex) } val stats = transformSearchResult.stats val updatedStats = diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index b9944e425..2036264c7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -151,12 +151,17 @@ class TransformSearchService( return@suspendUntil } val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, searchRequestTimeoutInSeconds) + logger.debug("Transform job {} is starting its search request {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, request.source(), currentShard.from, currentShard.to) search(request, listener) + logger.debug("Transform job {} has completed search request for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to) } } // If the request was successful, update page size transformContext.lastSuccessfulPageSize = pageSize + logger.debug("Transform job {} updated page size {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, pageSize, currentShard.shardId, currentShard.from, currentShard.to) transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) + logger.debug("Transform job {} is renewing lock for long search for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis()) + logger.trace("Transform job {} search response {} for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, searchResponse, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis()) return convertBucketSearchResponse(transform, searchResponse) } catch (e: TransformSearchServiceException) { throw e From 35b4eff998283e6a79a8ed3c2d026587fe9bf9c2 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Mon, 8 Apr 2024 13:05:37 +0530 Subject: [PATCH 2/2] fixing lints and gradle checks Signed-off-by: Sarthak Aggarwal --- .../transform/TransformRunner.kt | 14 ++++++++--- .../transform/TransformSearchService.kt | 25 +++++++++++++++---- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index a27a9c6a3..30949e75c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -153,7 +153,7 @@ object TransformRunner : logger.debug( "Transform job {} fetched global checkpoints {}.", transform.id, - newGlobalCheckpoints + newGlobalCheckpoints, ) } // If there are shards to search do it here @@ -173,7 +173,7 @@ object TransformRunner : "Transform job {} recompute to start with modified buckets {}. Processing shard {}.", transform.id, modifiedBuckets.size, - bucketsToTransform.currentShard + bucketsToTransform.currentShard, ) // Recompute modified buckets and update them in targetIndex currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext) @@ -243,7 +243,10 @@ object TransformRunner : ) } currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets) - logger.debug("Transform job {} has current buckets {} to transform. Processing Shard {} with checkpoints from {} to {}.", transform.id,currentBucketsToTransform.modifiedBuckets.size, currentShard.shardId ,currentShard.from, currentShard.to) + logger.debug( + "Transform job {} has current buckets {} to transform. Processing Shard {} with checkpoints from {} to {}.", + transform.id, currentBucketsToTransform.modifiedBuckets.size, currentShard.shardId, currentShard.from, currentShard.to, + ) val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis + shardLevelModifiedBuckets.searchTimeInMillis @@ -340,7 +343,10 @@ object TransformRunner : } val indexTimeInMillis = withTransformSecurityContext(transform) { - logger.debug("Transform job {} starting to index for target index: {} with documents {}.", transform.id,transform.targetIndex, transformSearchResult.docsToIndex.size) + logger.debug( + "Transform job {} starting to index for target index: {} with documents {}.", + transform.id, transform.targetIndex, transformSearchResult.docsToIndex.size, + ) transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext) logger.debug("Transform job {} completed to index for target index: {}.", transform.id, transform.targetIndex) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 2036264c7..c8567cd87 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -151,17 +151,32 @@ class TransformSearchService( return@suspendUntil } val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, searchRequestTimeoutInSeconds) - logger.debug("Transform job {} is starting its search request {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, request.source(), currentShard.from, currentShard.to) + logger.debug( + "Transform job {} is starting its search request {} for Shard {} from checkpoint: {} and to checkpoint: {}", + transform.id, currentShard.shardId, request.source(), currentShard.from, currentShard.to, + ) search(request, listener) - logger.debug("Transform job {} has completed search request for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to) + logger.debug( + "Transform job {} has completed search request for Shard {} from checkpoint: {} and to checkpoint: {}", + transform.id, currentShard.shardId, currentShard.from, currentShard.to, + ) } } // If the request was successful, update page size transformContext.lastSuccessfulPageSize = pageSize - logger.debug("Transform job {} updated page size {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, pageSize, currentShard.shardId, currentShard.from, currentShard.to) + logger.debug( + "Transform job {} updated page size {} for Shard {} from checkpoint: {} and to checkpoint: {}", + transform.id, pageSize, currentShard.shardId, currentShard.from, currentShard.to, + ) transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) - logger.debug("Transform job {} is renewing lock for long search for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis()) - logger.trace("Transform job {} search response {} for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, searchResponse, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis()) + logger.debug( + "Transform job {} is renewing lock for long search for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", + transform.id, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis(), + ) + logger.trace( + "Transform job {} search response {} for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", + transform.id, searchResponse, currentShard.shardId, currentShard.from, currentShard.to, searchResponse.took.millis(), + ) return convertBucketSearchResponse(transform, searchResponse) } catch (e: TransformSearchServiceException) { throw e