Skip to content

Commit 28b7d8b

Browse files
logging for transform
Signed-off-by: Sarthak Aggarwal <[email protected]>
1 parent e5a13ff commit 28b7d8b

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt

+17
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ object TransformRunner :
117117
val transformLockManager = transformContext.transformLockManager
118118
transformLockManager.acquireLockForScheduledJob()
119119
try {
120+
logger.debug("Transform Job ${transform.id} Initiating.")
120121
do {
121122
when {
122123
transformLockManager.lock == null -> {
@@ -141,13 +142,19 @@ object TransformRunner :
141142
// If we have not populated the list of shards to search, do so now
142143
if (bucketsToTransform.shardsToSearch == null) {
143144
// Note the timestamp when we got the shard global checkpoints to the user may know what data is included
145+
logger.debug("Transform job ${transform.id} is populating shards to search for index ${transform.sourceIndex}.")
144146
newGlobalCheckpointTime = Instant.now()
145147
newGlobalCheckpoints = transformSearchService.getShardsGlobalCheckpoint(transform.sourceIndex)
146148
bucketsToTransform =
147149
bucketsToTransform.initializeShardsToSearch(
148150
metadata.shardIDToGlobalCheckpoint,
149151
newGlobalCheckpoints,
150152
)
153+
logger.debug(
154+
"Transform job {} fetched global checkpoints {}.",
155+
transform.id,
156+
newGlobalCheckpoints
157+
)
151158
}
152159
// If there are shards to search do it here
153160
if (bucketsToTransform.currentShard != null) {
@@ -161,6 +168,13 @@ object TransformRunner :
161168
bucketsToTransform.modifiedBuckets.filter {
162169
transformProcessedBucketLog.isNotProcessed(it)
163170
}.toMutableSet()
171+
172+
logger.debug(
173+
"Transform job {} recompute to start with modified buckets {}. Processing shard {}.",
174+
transform.id,
175+
modifiedBuckets.size,
176+
bucketsToTransform.currentShard
177+
)
164178
// Recompute modified buckets and update them in targetIndex
165179
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext)
166180
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
@@ -229,6 +243,7 @@ object TransformRunner :
229243
)
230244
}
231245
currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
246+
logger.debug("Transform job {} has current buckets {} to transform. Processing Shard {} with checkpoints from {} to {}.", transform.id,currentBucketsToTransform.modifiedBuckets.size, currentShard.shardId ,currentShard.from, currentShard.to)
232247
val mergedSearchTime =
233248
currentBucketsToTransform.metadata.stats.searchTimeInMillis +
234249
shardLevelModifiedBuckets.searchTimeInMillis
@@ -325,7 +340,9 @@ object TransformRunner :
325340
}
326341
val indexTimeInMillis =
327342
withTransformSecurityContext(transform) {
343+
logger.debug("Transform job {} starting to index for target index: {} with documents {}.", transform.id,transform.targetIndex, transformSearchResult.docsToIndex.size)
328344
transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext)
345+
logger.debug("Transform job {} completed to index for target index: {}.", transform.id, transform.targetIndex)
329346
}
330347
val stats = transformSearchResult.stats
331348
val updatedStats =

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt

+4
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,16 @@ class TransformSearchService(
151151
return@suspendUntil
152152
}
153153
val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, searchRequestTimeoutInSeconds)
154+
logger.debug("Transform job {} is starting its search request {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, request.source(), currentShard.from, currentShard.to)
154155
search(request, listener)
156+
logger.debug("Transform job {} has completed search request for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to)
155157
}
156158
}
157159
// If the request was successful, update page size
158160
transformContext.lastSuccessfulPageSize = pageSize
161+
logger.debug("Transform job {} updated page size {} for Shard {} from checkpoint: {} and to checkpoint: {}", transform.id, pageSize, currentShard.shardId, currentShard.from, currentShard.to)
159162
transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart)
163+
logger.debug("Transform job {} is renewing lock for long search for Shard {} from checkpoint: {} and to checkpoint: {}. Time for search {}", transform.id, currentShard.shardId, currentShard.from, currentShard.to, (Instant.now().epochSecond - searchStart))
160164
return convertBucketSearchResponse(transform, searchResponse)
161165
} catch (e: TransformSearchServiceException) {
162166
throw e

0 commit comments

Comments
 (0)