-
Notifications
You must be signed in to change notification settings - Fork 517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] StoreEventListener in CachedSystemTransactionV2 and CachedGraphTransaction will be registered and monitored repeatedly #2618
Comments
To resolve the issue of private void listenChanges() {
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Events.STORE_CLEAR,
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear graph cache on event '{}'",
this.graph(), event.name());
this.clearCache(null, true);
return true;
}
return false;
};
// Check if the storeEventListener is already registered
if (!this.store().provider().containsListener(this.storeEventListener)) {
this.store().provider().listen(this.storeEventListener);
}
// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received graph cache event: {}",
this.graph(), event);
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALID.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Object.class);
HugeType type = (HugeType) args[1];
if (type.isVertex()) {
// Invalidate vertex cache
Object arg2 = args[2];
if (arg2 instanceof Id) {
Id id = (Id) arg2;
this.verticesCache.invalidate(id);
} else if (arg2 != null && arg2.getClass().isArray()) {
int size = Array.getLength(arg2);
for (int i = 0; i < size; i++) {
Object id = Array.get(arg2, i);
E.checkArgument(id instanceof Id,
"Expect instance of Id in array, " +
"but got '%s'", id.getClass());
this.verticesCache.invalidate((Id) id);
}
} else {
E.checkArgument(false,
"Expect Id or Id[], but got: %s",
arg2);
}
} else if (type.isEdge()) {
/*
* Invalidate edge cache via clear instead of invalidate
* because of the cacheKey is QueryId not EdgeId
*/
// this.edgesCache.invalidate(id);
this.edgesCache.clear();
}
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
this.clearCache(type, false);
return true;
}
return false;
};
EventHub graphEventHub = this.params().graphEventHub();
// Check if the cacheEventListener is already registered
if (!graphEventHub.containsListener(Events.CACHE)) {
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
}
} Additionally, you should implement an private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);
// Unlisten cache event
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
} By adding these checks and ensuring proper unregistration, you ensure that the listeners are only registered once, preventing the issue of them being registered and monitored repeatedly [1][2]. |
@dosu // Check if the storeEventListener is already registered |
To ensure that public final class CachedGraphTransaction extends GraphTransaction {
private static final ThreadLocal<Boolean> listenerRegistered = ThreadLocal.withInitial(() -> false);
private void listenChanges() {
if (listenerRegistered.get()) {
return;
}
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Events.STORE_CLEAR,
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear graph cache on event '{}'",
this.graph(), event.name());
this.clearCache(null, true);
return true;
}
return false;
};
this.store().provider().listen(this.storeEventListener);
// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received graph cache event: {}",
this.graph(), event);
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALID.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Object.class);
HugeType type = (HugeType) args[1];
if (type.isVertex()) {
// Invalidate vertex cache
Object arg2 = args[2];
if (arg2 instanceof Id) {
Id id = (Id) arg2;
this.verticesCache.invalidate(id);
} else if (arg2 != null && arg2.getClass().isArray()) {
int size = Array.getLength(arg2);
for (int i = 0; i < size; i++) {
Object id = Array.get(arg2, i);
E.checkArgument(id instanceof Id,
"Expect instance of Id in array, " +
"but got '%s'", id.getClass());
this.verticesCache.invalidate((Id) id);
}
} else {
E.checkArgument(false,
"Expect Id or Id[], but got: %s",
arg2);
}
} else if (type.isEdge()) {
/*
* Invalidate edge cache via clear instead of invalidate
* because of the cacheKey is QueryId not EdgeId
*/
// this.edgesCache.invalidate(id);
this.edgesCache.clear();
}
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
this.clearCache(type, false);
return true;
}
return false;
};
EventHub graphEventHub = this.params().graphEventHub();
if (!graphEventHub.containsListener(Events.CACHE)) {
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
}
listenerRegistered.set(true);
}
private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);
// Unlisten cache event
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
listenerRegistered.set(false);
}
// Other methods...
} This code uses a |
Bug Type (问题类型)
logic (逻辑设计问题)
Before submit
Environment (环境信息)
Expected & Actual behavior (期望与实际表现)
StoreEventListener in CachedSystemTransactionV2 and CachedGraphTransaction will be registered and monitored repeatedly;
Because CachedSystemTransactionV2 and CachedGraphTransaction are both ThreadLocal types in TinkerPopTransaction, each thread holds a corresponding instance. When initializing each instance, it registers a storeEventListener with the storeEventHub held by AbstractBackendStoreProvider. Since each listener implements the same functionality, storeEventHub holds a large number of redundant storeEventListeners
Vertex/Edge example (问题点 / 边数据举例)
No response
Schema [VertexLabel, EdgeLabel, IndexLabel] (元数据结构)
No response
The text was updated successfully, but these errors were encountered: