Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,12 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
const val REPLICATION_EXECUTOR_NAME_FOLLOWER = "replication_follower"
val REPLICATED_INDEX_SETTING: Setting<String> = Setting.simpleString("index.plugins.replication.follower.leader_index",
Setting.Property.InternalIndex, Setting.Property.IndexScope)
// Node-level batch size setting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Cluster level

val REPLICATION_FOLLOWER_OPS_BATCH_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.follower.index.ops_batch_size", 50000, 16,
Setting.Property.Dynamic, Setting.Property.NodeScope)
// Index-level batch size setting
val REPLICATION_FOLLOWER_OPS_BATCH_SIZE_INDEX: Setting<Int> = Setting.intSetting("index.plugins.replication.follower.ops_batch_size", 50000, 16,
Setting.Property.Dynamic, Setting.Property.IndexScope)
val REPLICATION_LEADER_THREADPOOL_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.thread_pool.size", 0, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.thread_pool.queue_size", 1000, 0,
Expand Down Expand Up @@ -358,7 +362,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getSettings(): List<Setting<*>> {
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_FOLLOWER_OPS_BATCH_SIZE_INDEX, REPLICATION_LEADER_THREADPOOL_SIZE,
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.task.shard

import org.opensearch.index.IndexSettings
import org.opensearch.replication.ReplicationPlugin
import org.opensearch.replication.ReplicationSettings

/**
* Helper class to manage batch size settings with fallback from index-level to cluster-level
*/
class BatchSizeSettings(
private val indexSettings: IndexSettings,
private val replicationSettings: ReplicationSettings
) {

/**
* Get the effective batch size - index-level if set, otherwise cluster-level
*/
fun getBatchSize(): Int {
return if (hasIndexLevelSetting()) {
ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE_INDEX.get(indexSettings.settings)
} else {
replicationSettings.batchSize
}
}

/**
* Check if index-level setting is configured
*/
fun hasIndexLevelSetting(): Boolean {
return indexSettings.settings.hasValue(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE_INDEX.key)
}

/**
* Get the source of the current batch size setting
*/
fun getBatchSizeSource(): String {
return if (hasIndexLevelSetting()) "index-level" else "cluster-level"
}

// For dynamic batch size adjustment (2GB fix)
@Volatile
private var dynamicBatchSize: Int? = null
private val minBatchSize = 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets define this as a constant and reuse the value here and also in the settings definition in ReplicationPlugin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a constant in ReplicationPlugin class


/**
* Get effective batch size considering dynamic adjustments
*/
fun getEffectiveBatchSize(): Int {
return dynamicBatchSize ?: getBatchSize()
}

/**
* Reduce batch size for 2GB limit handling
*/
fun reduceBatchSize() {
val currentSize = getEffectiveBatchSize()
dynamicBatchSize = maxOf(currentSize / 2, minBatchSize)
}

/**
* Reset to original batch size after successful operations
*/
fun resetBatchSize() {
dynamicBatchSize = null
}

/**
* Check if batch size has been dynamically reduced
*/
fun isDynamicallyReduced(): Boolean {
return dynamicBatchSize != null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class ShardReplicationChangesTracker(indexShard: IndexShard, private val replica
private val missingBatches = Collections.synchronizedList(ArrayList<Pair<Long, Long>>())
private val observedSeqNoAtLeader = AtomicLong(indexShard.localCheckpoint)
private val seqNoAlreadyRequested = AtomicLong(indexShard.localCheckpoint)
private val batchSize = replicationSettings.batchSize
private val batchSizeSettings = BatchSizeSettings(indexShard.indexSettings(), replicationSettings)


/**
* Provides a range of operations to be fetched next.
Expand Down Expand Up @@ -67,14 +68,43 @@ class ShardReplicationChangesTracker(indexShard: IndexShard, private val replica
missingBatches.removeAt(0)
} else {
// return the next batch to fetch and update seqNoAlreadyRequested.
val fromSeq = seqNoAlreadyRequested.getAndAdd(batchSize.toLong()) + 1
val toSeq = fromSeq + batchSize - 1
logDebug("Fetching the batch $fromSeq-$toSeq")
val currentBatchSize = batchSizeSettings.getEffectiveBatchSize()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we gate this dynamic batch size feature behind a cluster/index setting? We can enable it by default based on testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think another setting is needed as the test coverage is good.

val fromSeq = seqNoAlreadyRequested.getAndAdd(currentBatchSize.toLong()) + 1
val toSeq = fromSeq + currentBatchSize - 1
val sizeInfo = if (batchSizeSettings.isDynamicallyReduced()) {
"reduced to $currentBatchSize"
} else {
"$currentBatchSize from ${batchSizeSettings.getBatchSizeSource()}"
}
logDebug("Fetching the batch $fromSeq-$toSeq (batch size: $sizeInfo)")
Pair(fromSeq, toSeq)
}
}
}

/**
* Reduce batch size
*/
fun reduceBatchSize() {
batchSizeSettings.reduceBatchSize()
logDebug("Batch size reduced to ${batchSizeSettings.getEffectiveBatchSize()}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log as INFO

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

/**
* Reset batch size
*/
fun resetBatchSize() {
batchSizeSettings.resetBatchSize()
logDebug("Batch size reset to ${batchSizeSettings.getEffectiveBatchSize()}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log as INFO

}

/**
* Batch Size Settings
*/
fun batchSizeSettings() : BatchSizeSettings {
return batchSizeSettings
}

/**
* Ensures that we've successfully fetched a particular range of operations.
* In case of any failure(or we didn't get complete batch), we make sure that we're fetching the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,6 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.action.changes.GetChangesAction
import org.opensearch.replication.action.changes.GetChangesRequest
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
import org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex
import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper
import org.opensearch.replication.task.CrossClusterReplicationTask
import org.opensearch.replication.task.ReplicationState
import org.opensearch.replication.util.indicesService
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.replication.util.suspendExecuteWithRetries
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
Expand All @@ -39,21 +25,34 @@ import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.withContext
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchTimeoutException
import org.opensearch.transport.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.logging.Loggers
import org.opensearch.index.seqno.RetentionLeaseActions
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.tasks.TaskId
import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException
import org.opensearch.index.seqno.RetentionLeaseNotFoundException
import org.opensearch.core.index.shard.ShardId
import org.opensearch.persistent.PersistentTaskState
import org.opensearch.persistent.PersistentTasksNodeService
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.tasks.TaskId
import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.action.changes.GetChangesAction
import org.opensearch.replication.action.changes.GetChangesRequest
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
import org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex
import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper
import org.opensearch.replication.task.CrossClusterReplicationTask
import org.opensearch.replication.task.ReplicationState
import org.opensearch.replication.util.indicesService
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.replication.util.suspendExecuteWithRetries
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.NodeNotConnectedException
import org.opensearch.transport.client.Client
import java.time.Duration


Expand Down Expand Up @@ -255,6 +254,18 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
} catch (e: Exception) {
followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1)
logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}")

// Handle 2GB limit exception specifically
if (e is IllegalArgumentException &&
e.message?.equals("ReleasableBytesStreamOutput cannot hold more than 2GB of data") == true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just check for ReleasableBytesStreamOutput cannot hold more than? If the 2GB limit changes in future, this exception will start failing again

logError("Hit 2GB limit with current batch size ${changeTracker.batchSizeSettings().getEffectiveBatchSize()}. Reducing batch size.")
changeTracker.reduceBatchSize()
logError("Reduced batch size to ${changeTracker.batchSizeSettings().getEffectiveBatchSize()}. Retrying immediately.")
changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1, -1)
// No delay for 2GB limit - retry immediately with smaller batch
return@launch
}

changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1)
// Propagate 4xx exceptions up the chain and halt replication as they are irrecoverable
val range4xx = 400.rangeTo(499)
Expand Down
Loading