From 94e028dfea26ea44f327508148531d56292d3d3e Mon Sep 17 00:00:00 2001 From: Louise Berglund Date: Tue, 9 Dec 2025 14:36:35 +0100 Subject: [PATCH 1/3] Solve race condition when several calls to apoc.trigger.install are made concurrently. Also update some licenses after new Neo4j release. --- LICENSES.txt | 26 ++++++++-------- NOTICE.txt | 26 ++++++++-------- .../java/apoc/cypher/CypherInitializer.java | 12 +++++++ .../trigger/TriggerHandlerNewProcedures.java | 11 +++++-- .../trigger/TriggerNewProceduresTest.java | 31 +++++++++++++++++++ 5 files changed, 78 insertions(+), 28 deletions(-) diff --git a/LICENSES.txt b/LICENSES.txt index 5f0d76a4c..3b800bed9 100644 --- a/LICENSES.txt +++ b/LICENSES.txt @@ -50,9 +50,9 @@ Apache-2.0 curator-client-5.2.0.jar curator-framework-5.2.0.jar curator-recipes-5.2.0.jar - docker-java-api-3.4.0.jar - docker-java-transport-3.4.0.jar - docker-java-transport-zerodep-3.4.0.jar + docker-java-api-3.7.0.jar + docker-java-transport-3.7.0.jar + docker-java-transport-zerodep-3.7.0.jar ehcache-3.8.2.jar error_prone_annotations-2.36.0.jar failureaccess-1.0.3.jar @@ -2705,28 +2705,28 @@ MIT bcpkix-jdk18on-1.82.jar bcprov-jdk18on-1.82.jar bcutil-jdk18on-1.82.jar - cassandra-1.20.2.jar - couchbase-1.20.2.jar - database-commons-1.20.2.jar duct-tape-1.0.8.jar - elasticsearch-1.20.2.jar java-jwt-4.5.0.jar - jdbc-1.20.2.jar jersey-client-2.46.jar jersey-container-servlet-2.46.jar jersey-container-servlet-core-2.46.jar jersey-hk2-2.46.jar jnr-x86asm-1.0.2.jar - localstack-1.20.2.jar mockito-core-5.20.0.jar mssql-jdbc-6.2.1.jre7.jar - mysql-1.20.2.jar - neo4j-1.20.2.jar - postgresql-1.20.2.jar reactive-streams-1.0.4.jar slf4j-api-2.0.11.jar slf4j-api-2.0.17.jar - testcontainers-1.20.2.jar + testcontainers-2.0.2.jar + testcontainers-cassandra-2.0.2.jar + testcontainers-couchbase-2.0.2.jar + testcontainers-database-commons-2.0.2.jar + testcontainers-elasticsearch-2.0.2.jar + testcontainers-jdbc-2.0.2.jar + testcontainers-localstack-2.0.2.jar + testcontainers-mysql-2.0.2.jar + testcontainers-neo4j-2.0.2.jar + testcontainers-postgresql-2.0.2.jar ------------------------------------------------------------------------------ The MIT License diff --git a/NOTICE.txt b/NOTICE.txt index 8dce3ab1c..6dbc79258 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -80,9 +80,9 @@ Apache-2.0 curator-client-5.2.0.jar curator-framework-5.2.0.jar curator-recipes-5.2.0.jar - docker-java-api-3.4.0.jar - docker-java-transport-3.4.0.jar - docker-java-transport-zerodep-3.4.0.jar + docker-java-api-3.7.0.jar + docker-java-transport-3.7.0.jar + docker-java-transport-zerodep-3.7.0.jar ehcache-3.8.2.jar error_prone_annotations-2.36.0.jar failureaccess-1.0.3.jar @@ -468,28 +468,28 @@ MIT bcpkix-jdk18on-1.82.jar bcprov-jdk18on-1.82.jar bcutil-jdk18on-1.82.jar - cassandra-1.20.2.jar - couchbase-1.20.2.jar - database-commons-1.20.2.jar duct-tape-1.0.8.jar - elasticsearch-1.20.2.jar java-jwt-4.5.0.jar - jdbc-1.20.2.jar jersey-client-2.46.jar jersey-container-servlet-2.46.jar jersey-container-servlet-core-2.46.jar jersey-hk2-2.46.jar jnr-x86asm-1.0.2.jar - localstack-1.20.2.jar mockito-core-5.20.0.jar mssql-jdbc-6.2.1.jre7.jar - mysql-1.20.2.jar - neo4j-1.20.2.jar - postgresql-1.20.2.jar reactive-streams-1.0.4.jar slf4j-api-2.0.11.jar slf4j-api-2.0.17.jar - testcontainers-1.20.2.jar + testcontainers-2.0.2.jar + testcontainers-cassandra-2.0.2.jar + testcontainers-couchbase-2.0.2.jar + testcontainers-database-commons-2.0.2.jar + testcontainers-elasticsearch-2.0.2.jar + testcontainers-jdbc-2.0.2.jar + testcontainers-localstack-2.0.2.jar + testcontainers-mysql-2.0.2.jar + testcontainers-neo4j-2.0.2.jar + testcontainers-postgresql-2.0.2.jar MPL 1.1 javassist-3.30.2-GA.jar diff --git a/core/src/main/java/apoc/cypher/CypherInitializer.java b/core/src/main/java/apoc/cypher/CypherInitializer.java index 8c3ed7f37..478b0f47b 100644 --- a/core/src/main/java/apoc/cypher/CypherInitializer.java +++ b/core/src/main/java/apoc/cypher/CypherInitializer.java @@ -22,6 +22,7 @@ import apoc.ApocConfig; import apoc.SystemLabels; +import apoc.SystemPropertyKeys; import apoc.util.LogsUtil; import apoc.util.Util; import apoc.util.collection.Iterators; @@ -43,6 +44,7 @@ import org.neo4j.graphdb.event.DatabaseEventContext; import org.neo4j.graphdb.event.DatabaseEventListener; import org.neo4j.kernel.availability.AvailabilityListener; +import org.neo4j.kernel.impl.coreapi.InternalTransaction; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.monitoring.DatabaseEventListeners; import org.neo4j.logging.Log; @@ -113,6 +115,16 @@ public void available() { apocVersion, neo4jVersion); } } + // Create a uniqueness constraint on system db to avoid race conditions when installing + // triggers + try (InternalTransaction tx = db.beginTransaction()) { + tx.schema() + .constraintFor(SystemLabels.ApocTriggerMeta) + .assertPropertyIsUnique(SystemPropertyKeys.database.name()) + .create(); + tx.commit(); + } + databaseEventListeners.registerDatabaseEventListener(new SystemFunctionalityListener()); } diff --git a/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java b/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java index ae3142709..04751ae24 100644 --- a/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java +++ b/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; +import org.neo4j.graphdb.ConstraintViolationException; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.Transaction; @@ -138,8 +139,14 @@ public static ResourceIterator getTriggerNodes(String databaseName, Transa public static void setLastUpdate(String databaseName, Transaction tx) { Node node = tx.findNode(SystemLabels.ApocTriggerMeta, SystemPropertyKeys.database.name(), databaseName); if (node == null) { - node = tx.createNode(SystemLabels.ApocTriggerMeta); - node.setProperty(SystemPropertyKeys.database.name(), databaseName); + try { + node = tx.createNode(SystemLabels.ApocTriggerMeta); + node.setProperty(SystemPropertyKeys.database.name(), databaseName); + } catch (ConstraintViolationException e) { + // This can happen if two threads try to create the same node concurrently, + // after both having passed the null check. In this case we can ignore the failing tx. + return; + } } final long value = System.currentTimeMillis(); node.setProperty(SystemPropertyKeys.lastUpdated.name(), value); diff --git a/core/src/test/java/apoc/trigger/TriggerNewProceduresTest.java b/core/src/test/java/apoc/trigger/TriggerNewProceduresTest.java index 6f92c5323..d198b51d9 100644 --- a/core/src/test/java/apoc/trigger/TriggerNewProceduresTest.java +++ b/core/src/test/java/apoc/trigger/TriggerNewProceduresTest.java @@ -43,6 +43,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -203,6 +207,33 @@ public void testOverwriteTrigger() { }); } + @Test + void testConcurrentTriggersShouldNotFail() throws ExecutionException, InterruptedException { + var query = "CALL apoc.trigger.install('neo4j', 'myTrigger', 'RETURN 1', {phase: 'before'})"; + + try (ExecutorService executor = Executors.newFixedThreadPool(2)) { + CountDownLatch latch = new CountDownLatch(1); + + Callable task = () -> { + latch.await(); // wait until both threads are ready + testCall(sysDb, query, map(), r -> {}); + return null; + }; + + Future f1 = executor.submit(task); + Future f2 = executor.submit(task); + + // release both threads + latch.countDown(); + + // Will throw if anything inside the tasks failed + f1.get(); + f2.get(); + + executor.shutdown(); + } + } + @Test public void testIssue2247() { db.executeTransactionally("CREATE (n:ToBeDeleted)"); From 2dad37e367794fbd5f7ae5021f4db505e5b7644a Mon Sep 17 00:00:00 2001 From: Louise Berglund Date: Thu, 11 Dec 2025 09:17:04 +0100 Subject: [PATCH 2/3] Add one retry if race condition happens. --- .../trigger/TriggerHandlerNewProcedures.java | 32 +++++++++++++------ .../apoc/trigger/TriggerNewProcedures.java | 13 ++++---- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java b/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java index 04751ae24..feccccaa5 100644 --- a/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java +++ b/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java @@ -34,6 +34,7 @@ import org.neo4j.graphdb.Node; import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.Transaction; +import org.neo4j.kernel.internal.GraphDatabaseAPI; public class TriggerHandlerNewProcedures { public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled." @@ -50,6 +51,7 @@ public static void checkEnabled() { } public static TriggerInfo install( + GraphDatabaseAPI db, String databaseName, String triggerName, String statement, @@ -73,12 +75,12 @@ public static TriggerInfo install( // we'll return current trigger info result = fromNode(node, true); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return result; } - public static TriggerInfo drop(String databaseName, String triggerName, Transaction tx) { + public static TriggerInfo drop(GraphDatabaseAPI db, String databaseName, String triggerName, Transaction tx) { final TriggerInfo[] previous = new TriggerInfo[1]; getTriggerNodes(databaseName, tx, triggerName).forEachRemaining(node -> { @@ -86,12 +88,13 @@ public static TriggerInfo drop(String databaseName, String triggerName, Transact node.delete(); }); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return previous[0]; } - public static TriggerInfo updatePaused(String databaseName, String name, boolean paused, Transaction tx) { + public static TriggerInfo updatePaused( + GraphDatabaseAPI db, String databaseName, String name, boolean paused, Transaction tx) { final TriggerInfo[] result = new TriggerInfo[1]; getTriggerNodes(databaseName, tx, name).forEachRemaining(node -> { @@ -101,12 +104,12 @@ public static TriggerInfo updatePaused(String databaseName, String name, boolean result[0] = fromNode(node, true); }); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return result[0]; } - public static List dropAll(String databaseName, Transaction tx) { + public static List dropAll(GraphDatabaseAPI db, String databaseName, Transaction tx) { final List previous = new ArrayList<>(); getTriggerNodes(databaseName, tx).forEachRemaining(node -> { @@ -114,7 +117,7 @@ public static List dropAll(String databaseName, Transaction tx) { previous.add(fromNode(node, false)); node.delete(); }); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return previous; } @@ -136,7 +139,11 @@ public static ResourceIterator getTriggerNodes(String databaseName, Transa return tx.findNodes(label, dbNameKey, databaseName, SystemPropertyKeys.name.name(), name); } - public static void setLastUpdate(String databaseName, Transaction tx) { + public static void setLastUpdate(GraphDatabaseAPI db, String databaseName, Transaction tx) { + setLastUpdate(db, databaseName, tx, 0); + } + + public static void setLastUpdate(GraphDatabaseAPI db, String databaseName, Transaction tx, int retryNumber) { Node node = tx.findNode(SystemLabels.ApocTriggerMeta, SystemPropertyKeys.database.name(), databaseName); if (node == null) { try { @@ -144,7 +151,14 @@ public static void setLastUpdate(String databaseName, Transaction tx) { node.setProperty(SystemPropertyKeys.database.name(), databaseName); } catch (ConstraintViolationException e) { // This can happen if two threads try to create the same node concurrently, - // after both having passed the null check. In this case we can ignore the failing tx. + // after both having passed the null check. + // In this case we retry once or otherwise ignore the failing tx. + if (retryNumber < 1) { + try (final var newTx = db.beginTx()) { + TriggerHandlerNewProcedures.setLastUpdate(db, databaseName, newTx, retryNumber + 1); + newTx.commit(); + } + } return; } } diff --git a/core/src/main/java/apoc/trigger/TriggerNewProcedures.java b/core/src/main/java/apoc/trigger/TriggerNewProcedures.java index 1fa51d910..e1448793a 100644 --- a/core/src/main/java/apoc/trigger/TriggerNewProcedures.java +++ b/core/src/main/java/apoc/trigger/TriggerNewProcedures.java @@ -109,7 +109,8 @@ public Stream install( var query = Util.prefixQueryWithCheck(procedureCallContext, statement); return withUpdatingTransaction( databaseName, - tx -> Stream.of(TriggerHandlerNewProcedures.install(databaseName, name, query, selector, params, tx))); + tx -> Stream.of( + TriggerHandlerNewProcedures.install(db, databaseName, name, query, selector, params, tx))); } // TODO - change with @SystemOnlyProcedure @@ -124,7 +125,7 @@ public Stream drop( checkInSystemWriter(); return withUpdatingTransaction( - databaseName, tx -> Stream.ofNullable(TriggerHandlerNewProcedures.drop(databaseName, name, tx))); + databaseName, tx -> Stream.ofNullable(TriggerHandlerNewProcedures.drop(db, databaseName, name, tx))); } // TODO - change with @SystemOnlyProcedure @@ -138,7 +139,7 @@ public Stream dropAll( checkInSystemWriter(); return withUpdatingTransaction( - databaseName, tx -> TriggerHandlerNewProcedures.dropAll(databaseName, tx).stream() + databaseName, tx -> TriggerHandlerNewProcedures.dropAll(db, databaseName, tx).stream() .sorted(Comparator.comparing(i -> i.name))); } @@ -154,7 +155,7 @@ public Stream stop( checkInSystemWriter(); return withUpdatingTransaction(databaseName, tx -> { - final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(databaseName, name, true, tx); + final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(db, databaseName, name, true, tx); return Stream.ofNullable(triggerInfo); }); } @@ -171,7 +172,7 @@ public Stream start( checkInSystemWriter(); return withUpdatingTransaction(databaseName, tx -> { - final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(databaseName, name, false, tx); + final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(db, databaseName, name, false, tx); return Stream.ofNullable(triggerInfo); }); } @@ -198,7 +199,7 @@ public T withUpdatingTransaction(String databaseName, Function Date: Thu, 11 Dec 2025 13:15:13 +0100 Subject: [PATCH 3/3] Make sure we only create constraint once. --- .../java/apoc/cypher/CypherInitializer.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/apoc/cypher/CypherInitializer.java b/core/src/main/java/apoc/cypher/CypherInitializer.java index 478b0f47b..32ea97c94 100644 --- a/core/src/main/java/apoc/cypher/CypherInitializer.java +++ b/core/src/main/java/apoc/cypher/CypherInitializer.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.stream.StreamSupport; import org.apache.commons.configuration2.Configuration; import org.apache.commons.lang3.StringUtils; import org.neo4j.common.DependencyResolver; @@ -118,11 +119,22 @@ public void available() { // Create a uniqueness constraint on system db to avoid race conditions when installing // triggers try (InternalTransaction tx = db.beginTransaction()) { - tx.schema() - .constraintFor(SystemLabels.ApocTriggerMeta) - .assertPropertyIsUnique(SystemPropertyKeys.database.name()) - .create(); - tx.commit(); + var constraintName = "triggerConstraint"; + var maybeConstraint = StreamSupport.stream( + tx.schema() + .getConstraints(SystemLabels.ApocTriggerMeta) + .spliterator(), + false) + .filter(x -> x.getName().equals(constraintName)) + .toList(); + if (maybeConstraint.isEmpty()) { + tx.schema() + .constraintFor(SystemLabels.ApocTriggerMeta) + .withName(constraintName) + .assertPropertyIsUnique(SystemPropertyKeys.database.name()) + .create(); + tx.commit(); + } } databaseEventListeners.registerDatabaseEventListener(new SystemFunctionalityListener());