diff --git a/LICENSES.txt b/LICENSES.txt index 5f0d76a4c..3b800bed9 100644 --- a/LICENSES.txt +++ b/LICENSES.txt @@ -50,9 +50,9 @@ Apache-2.0 curator-client-5.2.0.jar curator-framework-5.2.0.jar curator-recipes-5.2.0.jar - docker-java-api-3.4.0.jar - docker-java-transport-3.4.0.jar - docker-java-transport-zerodep-3.4.0.jar + docker-java-api-3.7.0.jar + docker-java-transport-3.7.0.jar + docker-java-transport-zerodep-3.7.0.jar ehcache-3.8.2.jar error_prone_annotations-2.36.0.jar failureaccess-1.0.3.jar @@ -2705,28 +2705,28 @@ MIT bcpkix-jdk18on-1.82.jar bcprov-jdk18on-1.82.jar bcutil-jdk18on-1.82.jar - cassandra-1.20.2.jar - couchbase-1.20.2.jar - database-commons-1.20.2.jar duct-tape-1.0.8.jar - elasticsearch-1.20.2.jar java-jwt-4.5.0.jar - jdbc-1.20.2.jar jersey-client-2.46.jar jersey-container-servlet-2.46.jar jersey-container-servlet-core-2.46.jar jersey-hk2-2.46.jar jnr-x86asm-1.0.2.jar - localstack-1.20.2.jar mockito-core-5.20.0.jar mssql-jdbc-6.2.1.jre7.jar - mysql-1.20.2.jar - neo4j-1.20.2.jar - postgresql-1.20.2.jar reactive-streams-1.0.4.jar slf4j-api-2.0.11.jar slf4j-api-2.0.17.jar - testcontainers-1.20.2.jar + testcontainers-2.0.2.jar + testcontainers-cassandra-2.0.2.jar + testcontainers-couchbase-2.0.2.jar + testcontainers-database-commons-2.0.2.jar + testcontainers-elasticsearch-2.0.2.jar + testcontainers-jdbc-2.0.2.jar + testcontainers-localstack-2.0.2.jar + testcontainers-mysql-2.0.2.jar + testcontainers-neo4j-2.0.2.jar + testcontainers-postgresql-2.0.2.jar ------------------------------------------------------------------------------ The MIT License diff --git a/NOTICE.txt b/NOTICE.txt index 8dce3ab1c..6dbc79258 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -80,9 +80,9 @@ Apache-2.0 curator-client-5.2.0.jar curator-framework-5.2.0.jar curator-recipes-5.2.0.jar - docker-java-api-3.4.0.jar - docker-java-transport-3.4.0.jar - docker-java-transport-zerodep-3.4.0.jar + docker-java-api-3.7.0.jar + docker-java-transport-3.7.0.jar + docker-java-transport-zerodep-3.7.0.jar ehcache-3.8.2.jar error_prone_annotations-2.36.0.jar failureaccess-1.0.3.jar @@ -468,28 +468,28 @@ MIT bcpkix-jdk18on-1.82.jar bcprov-jdk18on-1.82.jar bcutil-jdk18on-1.82.jar - cassandra-1.20.2.jar - couchbase-1.20.2.jar - database-commons-1.20.2.jar duct-tape-1.0.8.jar - elasticsearch-1.20.2.jar java-jwt-4.5.0.jar - jdbc-1.20.2.jar jersey-client-2.46.jar jersey-container-servlet-2.46.jar jersey-container-servlet-core-2.46.jar jersey-hk2-2.46.jar jnr-x86asm-1.0.2.jar - localstack-1.20.2.jar mockito-core-5.20.0.jar mssql-jdbc-6.2.1.jre7.jar - mysql-1.20.2.jar - neo4j-1.20.2.jar - postgresql-1.20.2.jar reactive-streams-1.0.4.jar slf4j-api-2.0.11.jar slf4j-api-2.0.17.jar - testcontainers-1.20.2.jar + testcontainers-2.0.2.jar + testcontainers-cassandra-2.0.2.jar + testcontainers-couchbase-2.0.2.jar + testcontainers-database-commons-2.0.2.jar + testcontainers-elasticsearch-2.0.2.jar + testcontainers-jdbc-2.0.2.jar + testcontainers-localstack-2.0.2.jar + testcontainers-mysql-2.0.2.jar + testcontainers-neo4j-2.0.2.jar + testcontainers-postgresql-2.0.2.jar MPL 1.1 javassist-3.30.2-GA.jar diff --git a/core/src/main/java/apoc/cypher/CypherInitializer.java b/core/src/main/java/apoc/cypher/CypherInitializer.java index 8c3ed7f37..32ea97c94 100644 --- a/core/src/main/java/apoc/cypher/CypherInitializer.java +++ b/core/src/main/java/apoc/cypher/CypherInitializer.java @@ -22,6 +22,7 @@ import apoc.ApocConfig; import apoc.SystemLabels; +import apoc.SystemPropertyKeys; import apoc.util.LogsUtil; import apoc.util.Util; import apoc.util.collection.Iterators; @@ -30,6 +31,7 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.stream.StreamSupport; import org.apache.commons.configuration2.Configuration; import org.apache.commons.lang3.StringUtils; import org.neo4j.common.DependencyResolver; @@ -43,6 +45,7 @@ import org.neo4j.graphdb.event.DatabaseEventContext; import org.neo4j.graphdb.event.DatabaseEventListener; import org.neo4j.kernel.availability.AvailabilityListener; +import org.neo4j.kernel.impl.coreapi.InternalTransaction; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.monitoring.DatabaseEventListeners; import org.neo4j.logging.Log; @@ -113,6 +116,27 @@ public void available() { apocVersion, neo4jVersion); } } + // Create a uniqueness constraint on system db to avoid race conditions when installing + // triggers + try (InternalTransaction tx = db.beginTransaction()) { + var constraintName = "triggerConstraint"; + var maybeConstraint = StreamSupport.stream( + tx.schema() + .getConstraints(SystemLabels.ApocTriggerMeta) + .spliterator(), + false) + .filter(x -> x.getName().equals(constraintName)) + .toList(); + if (maybeConstraint.isEmpty()) { + tx.schema() + .constraintFor(SystemLabels.ApocTriggerMeta) + .withName(constraintName) + .assertPropertyIsUnique(SystemPropertyKeys.database.name()) + .create(); + tx.commit(); + } + } + databaseEventListeners.registerDatabaseEventListener(new SystemFunctionalityListener()); } diff --git a/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java b/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java index ae3142709..feccccaa5 100644 --- a/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java +++ b/core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java @@ -30,9 +30,11 @@ import java.util.Map; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; +import org.neo4j.graphdb.ConstraintViolationException; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.Transaction; +import org.neo4j.kernel.internal.GraphDatabaseAPI; public class TriggerHandlerNewProcedures { public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled." @@ -49,6 +51,7 @@ public static void checkEnabled() { } public static TriggerInfo install( + GraphDatabaseAPI db, String databaseName, String triggerName, String statement, @@ -72,12 +75,12 @@ public static TriggerInfo install( // we'll return current trigger info result = fromNode(node, true); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return result; } - public static TriggerInfo drop(String databaseName, String triggerName, Transaction tx) { + public static TriggerInfo drop(GraphDatabaseAPI db, String databaseName, String triggerName, Transaction tx) { final TriggerInfo[] previous = new TriggerInfo[1]; getTriggerNodes(databaseName, tx, triggerName).forEachRemaining(node -> { @@ -85,12 +88,13 @@ public static TriggerInfo drop(String databaseName, String triggerName, Transact node.delete(); }); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return previous[0]; } - public static TriggerInfo updatePaused(String databaseName, String name, boolean paused, Transaction tx) { + public static TriggerInfo updatePaused( + GraphDatabaseAPI db, String databaseName, String name, boolean paused, Transaction tx) { final TriggerInfo[] result = new TriggerInfo[1]; getTriggerNodes(databaseName, tx, name).forEachRemaining(node -> { @@ -100,12 +104,12 @@ public static TriggerInfo updatePaused(String databaseName, String name, boolean result[0] = fromNode(node, true); }); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return result[0]; } - public static List dropAll(String databaseName, Transaction tx) { + public static List dropAll(GraphDatabaseAPI db, String databaseName, Transaction tx) { final List previous = new ArrayList<>(); getTriggerNodes(databaseName, tx).forEachRemaining(node -> { @@ -113,7 +117,7 @@ public static List dropAll(String databaseName, Transaction tx) { previous.add(fromNode(node, false)); node.delete(); }); - setLastUpdate(databaseName, tx); + setLastUpdate(db, databaseName, tx); return previous; } @@ -135,11 +139,28 @@ public static ResourceIterator getTriggerNodes(String databaseName, Transa return tx.findNodes(label, dbNameKey, databaseName, SystemPropertyKeys.name.name(), name); } - public static void setLastUpdate(String databaseName, Transaction tx) { + public static void setLastUpdate(GraphDatabaseAPI db, String databaseName, Transaction tx) { + setLastUpdate(db, databaseName, tx, 0); + } + + public static void setLastUpdate(GraphDatabaseAPI db, String databaseName, Transaction tx, int retryNumber) { Node node = tx.findNode(SystemLabels.ApocTriggerMeta, SystemPropertyKeys.database.name(), databaseName); if (node == null) { - node = tx.createNode(SystemLabels.ApocTriggerMeta); - node.setProperty(SystemPropertyKeys.database.name(), databaseName); + try { + node = tx.createNode(SystemLabels.ApocTriggerMeta); + node.setProperty(SystemPropertyKeys.database.name(), databaseName); + } catch (ConstraintViolationException e) { + // This can happen if two threads try to create the same node concurrently, + // after both having passed the null check. + // In this case we retry once or otherwise ignore the failing tx. + if (retryNumber < 1) { + try (final var newTx = db.beginTx()) { + TriggerHandlerNewProcedures.setLastUpdate(db, databaseName, newTx, retryNumber + 1); + newTx.commit(); + } + } + return; + } } final long value = System.currentTimeMillis(); node.setProperty(SystemPropertyKeys.lastUpdated.name(), value); diff --git a/core/src/main/java/apoc/trigger/TriggerNewProcedures.java b/core/src/main/java/apoc/trigger/TriggerNewProcedures.java index 1fa51d910..e1448793a 100644 --- a/core/src/main/java/apoc/trigger/TriggerNewProcedures.java +++ b/core/src/main/java/apoc/trigger/TriggerNewProcedures.java @@ -109,7 +109,8 @@ public Stream install( var query = Util.prefixQueryWithCheck(procedureCallContext, statement); return withUpdatingTransaction( databaseName, - tx -> Stream.of(TriggerHandlerNewProcedures.install(databaseName, name, query, selector, params, tx))); + tx -> Stream.of( + TriggerHandlerNewProcedures.install(db, databaseName, name, query, selector, params, tx))); } // TODO - change with @SystemOnlyProcedure @@ -124,7 +125,7 @@ public Stream drop( checkInSystemWriter(); return withUpdatingTransaction( - databaseName, tx -> Stream.ofNullable(TriggerHandlerNewProcedures.drop(databaseName, name, tx))); + databaseName, tx -> Stream.ofNullable(TriggerHandlerNewProcedures.drop(db, databaseName, name, tx))); } // TODO - change with @SystemOnlyProcedure @@ -138,7 +139,7 @@ public Stream dropAll( checkInSystemWriter(); return withUpdatingTransaction( - databaseName, tx -> TriggerHandlerNewProcedures.dropAll(databaseName, tx).stream() + databaseName, tx -> TriggerHandlerNewProcedures.dropAll(db, databaseName, tx).stream() .sorted(Comparator.comparing(i -> i.name))); } @@ -154,7 +155,7 @@ public Stream stop( checkInSystemWriter(); return withUpdatingTransaction(databaseName, tx -> { - final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(databaseName, name, true, tx); + final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(db, databaseName, name, true, tx); return Stream.ofNullable(triggerInfo); }); } @@ -171,7 +172,7 @@ public Stream start( checkInSystemWriter(); return withUpdatingTransaction(databaseName, tx -> { - final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(databaseName, name, false, tx); + final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(db, databaseName, name, false, tx); return Stream.ofNullable(triggerInfo); }); } @@ -198,7 +199,7 @@ public T withUpdatingTransaction(String databaseName, Function task = () -> { + latch.await(); // wait until both threads are ready + testCall(sysDb, query, map(), r -> {}); + return null; + }; + + Future f1 = executor.submit(task); + Future f2 = executor.submit(task); + + // release both threads + latch.countDown(); + + // Will throw if anything inside the tasks failed + f1.get(); + f2.get(); + + executor.shutdown(); + } + } + @Test public void testIssue2247() { db.executeTransactionally("CREATE (n:ToBeDeleted)");