Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions LICENSES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ Apache-2.0
curator-client-5.2.0.jar
curator-framework-5.2.0.jar
curator-recipes-5.2.0.jar
docker-java-api-3.4.0.jar
docker-java-transport-3.4.0.jar
docker-java-transport-zerodep-3.4.0.jar
docker-java-api-3.7.0.jar
docker-java-transport-3.7.0.jar
docker-java-transport-zerodep-3.7.0.jar
ehcache-3.8.2.jar
error_prone_annotations-2.36.0.jar
failureaccess-1.0.3.jar
Expand Down Expand Up @@ -2705,28 +2705,28 @@ MIT
bcpkix-jdk18on-1.82.jar
bcprov-jdk18on-1.82.jar
bcutil-jdk18on-1.82.jar
cassandra-1.20.2.jar
couchbase-1.20.2.jar
database-commons-1.20.2.jar
duct-tape-1.0.8.jar
elasticsearch-1.20.2.jar
java-jwt-4.5.0.jar
jdbc-1.20.2.jar
jersey-client-2.46.jar
jersey-container-servlet-2.46.jar
jersey-container-servlet-core-2.46.jar
jersey-hk2-2.46.jar
jnr-x86asm-1.0.2.jar
localstack-1.20.2.jar
mockito-core-5.20.0.jar
mssql-jdbc-6.2.1.jre7.jar
mysql-1.20.2.jar
neo4j-1.20.2.jar
postgresql-1.20.2.jar
reactive-streams-1.0.4.jar
slf4j-api-2.0.11.jar
slf4j-api-2.0.17.jar
testcontainers-1.20.2.jar
testcontainers-2.0.2.jar
testcontainers-cassandra-2.0.2.jar
testcontainers-couchbase-2.0.2.jar
testcontainers-database-commons-2.0.2.jar
testcontainers-elasticsearch-2.0.2.jar
testcontainers-jdbc-2.0.2.jar
testcontainers-localstack-2.0.2.jar
testcontainers-mysql-2.0.2.jar
testcontainers-neo4j-2.0.2.jar
testcontainers-postgresql-2.0.2.jar
------------------------------------------------------------------------------

The MIT License
Expand Down
26 changes: 13 additions & 13 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ Apache-2.0
curator-client-5.2.0.jar
curator-framework-5.2.0.jar
curator-recipes-5.2.0.jar
docker-java-api-3.4.0.jar
docker-java-transport-3.4.0.jar
docker-java-transport-zerodep-3.4.0.jar
docker-java-api-3.7.0.jar
docker-java-transport-3.7.0.jar
docker-java-transport-zerodep-3.7.0.jar
ehcache-3.8.2.jar
error_prone_annotations-2.36.0.jar
failureaccess-1.0.3.jar
Expand Down Expand Up @@ -468,28 +468,28 @@ MIT
bcpkix-jdk18on-1.82.jar
bcprov-jdk18on-1.82.jar
bcutil-jdk18on-1.82.jar
cassandra-1.20.2.jar
couchbase-1.20.2.jar
database-commons-1.20.2.jar
duct-tape-1.0.8.jar
elasticsearch-1.20.2.jar
java-jwt-4.5.0.jar
jdbc-1.20.2.jar
jersey-client-2.46.jar
jersey-container-servlet-2.46.jar
jersey-container-servlet-core-2.46.jar
jersey-hk2-2.46.jar
jnr-x86asm-1.0.2.jar
localstack-1.20.2.jar
mockito-core-5.20.0.jar
mssql-jdbc-6.2.1.jre7.jar
mysql-1.20.2.jar
neo4j-1.20.2.jar
postgresql-1.20.2.jar
reactive-streams-1.0.4.jar
slf4j-api-2.0.11.jar
slf4j-api-2.0.17.jar
testcontainers-1.20.2.jar
testcontainers-2.0.2.jar
testcontainers-cassandra-2.0.2.jar
testcontainers-couchbase-2.0.2.jar
testcontainers-database-commons-2.0.2.jar
testcontainers-elasticsearch-2.0.2.jar
testcontainers-jdbc-2.0.2.jar
testcontainers-localstack-2.0.2.jar
testcontainers-mysql-2.0.2.jar
testcontainers-neo4j-2.0.2.jar
testcontainers-postgresql-2.0.2.jar

MPL 1.1
javassist-3.30.2-GA.jar
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/java/apoc/cypher/CypherInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import apoc.ApocConfig;
import apoc.SystemLabels;
import apoc.SystemPropertyKeys;
import apoc.util.LogsUtil;
import apoc.util.Util;
import apoc.util.collection.Iterators;
Expand All @@ -30,6 +31,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.StreamSupport;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.common.DependencyResolver;
Expand All @@ -43,6 +45,7 @@
import org.neo4j.graphdb.event.DatabaseEventContext;
import org.neo4j.graphdb.event.DatabaseEventListener;
import org.neo4j.kernel.availability.AvailabilityListener;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.DatabaseEventListeners;
import org.neo4j.logging.Log;
Expand Down Expand Up @@ -113,6 +116,27 @@ public void available() {
apocVersion, neo4jVersion);
}
}
// Create a uniqueness constraint on system db to avoid race conditions when installing
// triggers
try (InternalTransaction tx = db.beginTransaction()) {
var constraintName = "triggerConstraint";
var maybeConstraint = StreamSupport.stream(
tx.schema()
.getConstraints(SystemLabels.ApocTriggerMeta)
.spliterator(),
false)
.filter(x -> x.getName().equals(constraintName))
.toList();
if (maybeConstraint.isEmpty()) {
tx.schema()
.constraintFor(SystemLabels.ApocTriggerMeta)
.withName(constraintName)
.assertPropertyIsUnique(SystemPropertyKeys.database.name())
.create();
tx.commit();
}
}

databaseEventListeners.registerDatabaseEventListener(new SystemFunctionalityListener());
}

Expand Down
41 changes: 31 additions & 10 deletions core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import java.util.Map;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.neo4j.graphdb.ConstraintViolationException;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;

public class TriggerHandlerNewProcedures {
public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled."
Expand All @@ -49,6 +51,7 @@ public static void checkEnabled() {
}

public static TriggerInfo install(
GraphDatabaseAPI db,
String databaseName,
String triggerName,
String statement,
Expand All @@ -72,25 +75,26 @@ public static TriggerInfo install(
// we'll return current trigger info
result = fromNode(node, true);

setLastUpdate(databaseName, tx);
setLastUpdate(db, databaseName, tx);

return result;
}

public static TriggerInfo drop(String databaseName, String triggerName, Transaction tx) {
public static TriggerInfo drop(GraphDatabaseAPI db, String databaseName, String triggerName, Transaction tx) {
final TriggerInfo[] previous = new TriggerInfo[1];

getTriggerNodes(databaseName, tx, triggerName).forEachRemaining(node -> {
previous[0] = fromNode(node, false);
node.delete();
});

setLastUpdate(databaseName, tx);
setLastUpdate(db, databaseName, tx);

return previous[0];
}

public static TriggerInfo updatePaused(String databaseName, String name, boolean paused, Transaction tx) {
public static TriggerInfo updatePaused(
GraphDatabaseAPI db, String databaseName, String name, boolean paused, Transaction tx) {
final TriggerInfo[] result = new TriggerInfo[1];

getTriggerNodes(databaseName, tx, name).forEachRemaining(node -> {
Expand All @@ -100,20 +104,20 @@ public static TriggerInfo updatePaused(String databaseName, String name, boolean
result[0] = fromNode(node, true);
});

setLastUpdate(databaseName, tx);
setLastUpdate(db, databaseName, tx);

return result[0];
}

public static List<TriggerInfo> dropAll(String databaseName, Transaction tx) {
public static List<TriggerInfo> dropAll(GraphDatabaseAPI db, String databaseName, Transaction tx) {
final List<TriggerInfo> previous = new ArrayList<>();

getTriggerNodes(databaseName, tx).forEachRemaining(node -> {
// we'll return previous trigger info
previous.add(fromNode(node, false));
node.delete();
});
setLastUpdate(databaseName, tx);
setLastUpdate(db, databaseName, tx);

return previous;
}
Expand All @@ -135,11 +139,28 @@ public static ResourceIterator<Node> getTriggerNodes(String databaseName, Transa
return tx.findNodes(label, dbNameKey, databaseName, SystemPropertyKeys.name.name(), name);
}

public static void setLastUpdate(String databaseName, Transaction tx) {
public static void setLastUpdate(GraphDatabaseAPI db, String databaseName, Transaction tx) {
setLastUpdate(db, databaseName, tx, 0);
}

public static void setLastUpdate(GraphDatabaseAPI db, String databaseName, Transaction tx, int retryNumber) {
Node node = tx.findNode(SystemLabels.ApocTriggerMeta, SystemPropertyKeys.database.name(), databaseName);
if (node == null) {
node = tx.createNode(SystemLabels.ApocTriggerMeta);
node.setProperty(SystemPropertyKeys.database.name(), databaseName);
try {
node = tx.createNode(SystemLabels.ApocTriggerMeta);
node.setProperty(SystemPropertyKeys.database.name(), databaseName);
} catch (ConstraintViolationException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new transaction or the outer one? because if it is the outer one won't that cause issues for the user later on in the query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I did not consider this. It looks like the tx is created in withUpdatingTransaction() which is called in the beginning of apoc.trigger.install, apoc.trigger.drop, apoc.trigger.dropAll, apoc.trigger.start, apoc.trigger.stop respectively.
The thread does some work before this method, but it is always commited directly after it. Does that sounds OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that should be fine then I think

// This can happen if two threads try to create the same node concurrently,
// after both having passed the null check.
// In this case we retry once or otherwise ignore the failing tx.
if (retryNumber < 1) {
try (final var newTx = db.beginTx()) {
TriggerHandlerNewProcedures.setLastUpdate(db, databaseName, newTx, retryNumber + 1);
newTx.commit();
}
}
return;
}
}
final long value = System.currentTimeMillis();
node.setProperty(SystemPropertyKeys.lastUpdated.name(), value);
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/java/apoc/trigger/TriggerNewProcedures.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public Stream<TriggerInfo> install(
var query = Util.prefixQueryWithCheck(procedureCallContext, statement);
return withUpdatingTransaction(
databaseName,
tx -> Stream.of(TriggerHandlerNewProcedures.install(databaseName, name, query, selector, params, tx)));
tx -> Stream.of(
TriggerHandlerNewProcedures.install(db, databaseName, name, query, selector, params, tx)));
}

// TODO - change with @SystemOnlyProcedure
Expand All @@ -124,7 +125,7 @@ public Stream<TriggerInfo> drop(
checkInSystemWriter();

return withUpdatingTransaction(
databaseName, tx -> Stream.ofNullable(TriggerHandlerNewProcedures.drop(databaseName, name, tx)));
databaseName, tx -> Stream.ofNullable(TriggerHandlerNewProcedures.drop(db, databaseName, name, tx)));
}

// TODO - change with @SystemOnlyProcedure
Expand All @@ -138,7 +139,7 @@ public Stream<TriggerInfo> dropAll(
checkInSystemWriter();

return withUpdatingTransaction(
databaseName, tx -> TriggerHandlerNewProcedures.dropAll(databaseName, tx).stream()
databaseName, tx -> TriggerHandlerNewProcedures.dropAll(db, databaseName, tx).stream()
.sorted(Comparator.comparing(i -> i.name)));
}

Expand All @@ -154,7 +155,7 @@ public Stream<TriggerInfo> stop(
checkInSystemWriter();

return withUpdatingTransaction(databaseName, tx -> {
final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(databaseName, name, true, tx);
final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(db, databaseName, name, true, tx);
return Stream.ofNullable(triggerInfo);
});
}
Expand All @@ -171,7 +172,7 @@ public Stream<TriggerInfo> start(
checkInSystemWriter();

return withUpdatingTransaction(databaseName, tx -> {
final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(databaseName, name, false, tx);
final TriggerInfo triggerInfo = TriggerHandlerNewProcedures.updatePaused(db, databaseName, name, false, tx);
return Stream.ofNullable(triggerInfo);
});
}
Expand All @@ -198,7 +199,7 @@ public <T> T withUpdatingTransaction(String databaseName, Function<Transaction,

// Last update time needs to be after the installation commit happened to not risk missing updates
try (final var tx = db.beginTx()) {
TriggerHandlerNewProcedures.setLastUpdate(databaseName, tx);
TriggerHandlerNewProcedures.setLastUpdate(db, databaseName, tx);
tx.commit();
}
return result;
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/java/apoc/trigger/TriggerNewProceduresTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -203,6 +207,33 @@ public void testOverwriteTrigger() {
});
}

@Test
void testConcurrentTriggersShouldNotFail() throws ExecutionException, InterruptedException {
var query = "CALL apoc.trigger.install('neo4j', 'myTrigger', 'RETURN 1', {phase: 'before'})";

try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
CountDownLatch latch = new CountDownLatch(1);

Callable<Void> task = () -> {
latch.await(); // wait until both threads are ready
testCall(sysDb, query, map(), r -> {});
return null;
};

Future<Void> f1 = executor.submit(task);
Future<Void> f2 = executor.submit(task);

// release both threads
latch.countDown();

// Will throw if anything inside the tasks failed
f1.get();
f2.get();

executor.shutdown();
}
}

@Test
public void testIssue2247() {
db.executeTransactionally("CREATE (n:ToBeDeleted)");
Expand Down