Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -127,6 +128,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
final Map<String, CheckpointState> checkpoints;

/**
* A callback invoked when the global checkpoint is updated. For primary mode this occurs if the computed global checkpoint advances on
* the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint advances
* due to an update from the primary.
*/
private final LongConsumer onGlobalCheckpointUpdated;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
* current global checkpoint.
Expand Down Expand Up @@ -391,7 +399,8 @@ public ReplicationTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint) {
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand All @@ -400,6 +409,7 @@ public ReplicationTracker(
this.appliedClusterStateVersion = -1L;
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Expand Down Expand Up @@ -487,6 +497,7 @@ private void updateGlobalCheckpoint(final String allocationId, final long global
if (cps != null && globalCheckpoint > cps.globalCheckpoint) {
ifUpdated.accept(cps.globalCheckpoint);
cps.globalCheckpoint = globalCheckpoint;
onGlobalCheckpointUpdated.accept(globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doubting whether this should be called out of lock. I'm tending to say yes. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you clarify why the consumer is called when updating the primary's knowledge of the gcp knowledge of a replica? (this method is used there too)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That was not intended. I pushed a477ff4.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding not invoking the notification under lock, I was tending to avoid complicating these methods (I like that we have synchronized as a method modifier) and avoiding dealing with the fact that some of these updates can occur under nested invocations of synchronized methods. This would mean returning booleans and dropping the synchronized from the method modifiers. In the POC that I have, the callback is "cheap" because it forks invocation of the listeners to another thread (practically to the listener thread pool):

    private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
        assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
        if (listeners != null) {
            final List<GlobalCheckpointListener> currentListeners;
            synchronized (this) {
                currentListeners = listeners;
                listeners = null;
            }
            if (currentListeners != null) {
                executor.execute(() -> {
                    for (final GlobalCheckpointListener listener : currentListeners) {
                        try {
                            listener.accept(globalCheckpoint, e);
                        } catch (final Exception caught) {
                            if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
                                logger.warn(
                                        new ParameterizedMessage(
                                                "error notifying global checkpoint listener of updated global checkpoint [{}]",
                                                globalCheckpoint),
                                        caught);
                            } else {
                                logger.warn("error notifying global checkpoint listener of closed shard", caught);
                            }
                        }
                    }
                });
            }
        }
    }

I think this is okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine if you intend to spawn off to other threads in the outer layers. Indeed the simplicity of the current solution is what was making me doubt towards having it as you did.

}
}

Expand Down Expand Up @@ -739,6 +750,7 @@ private synchronized void updateGlobalCheckpointOnPrimary() {
if (globalCheckpoint != computedGlobalCheckpoint) {
logger.trace("global checkpoint updated to [{}]", computedGlobalCheckpoint);
cps.globalCheckpoint = computedGlobalCheckpoint;
onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ public IndexShard(

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
this.replicationTracker = new ReplicationTracker(shardId, shardRouting.allocationId().getId(), indexSettings,
SequenceNumbers.UNASSIGNED_SEQ_NO);
final String aId = shardRouting.allocationId().getId();
this.replicationTracker =
new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {});
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
Expand Down
Loading