diff --git a/network/src/main/java/com/arcadedb/remote/RemoteDatabase.java b/network/src/main/java/com/arcadedb/remote/RemoteDatabase.java index 1ee5b0992a..b5cdeba40d 100644 --- a/network/src/main/java/com/arcadedb/remote/RemoteDatabase.java +++ b/network/src/main/java/com/arcadedb/remote/RemoteDatabase.java @@ -160,14 +160,6 @@ public Object call(final HttpURLConnection connection, final JSONObject response }); } - public void begin() { - command("SQL", "begin"); - } - - public void commit() { - command("SQL", "commit"); - } - public void rollback() { command("SQL", "rollback"); } @@ -248,7 +240,7 @@ private Object httpCommand(final String extendedURL, final String operation, fin String detail; String reason; String exception; - String exceptionArg; + String exceptionArgs; String responsePayload = null; try { responsePayload = FileUtils.readStreamAsString(connection.getErrorStream(), charset); @@ -256,7 +248,7 @@ private Object httpCommand(final String extendedURL, final String operation, fin reason = response.getString("error"); detail = response.has("detail") ? response.getString("detail") : null; exception = response.has("exception") ? response.getString("exception") : null; - exceptionArg = response.has("exceptionArg") ? response.getString("exceptionArg") : null; + exceptionArgs = response.has("exceptionArgs") ? response.getString("exceptionArgs") : null; } catch (Exception e) { lastException = e; // TODO CHECK IF THE COMMAND NEEDS TO BE RE-EXECUTED OR NOT @@ -271,13 +263,13 @@ private Object httpCommand(final String extendedURL, final String operation, fin if (exception != null) { if (exception.equals(ServerIsNotTheLeaderException.class.getName())) { - throw new ServerIsNotTheLeaderException(detail.substring(0, detail.lastIndexOf('.')), exceptionArg); + throw new ServerIsNotTheLeaderException(detail.substring(0, detail.lastIndexOf('.')), exceptionArgs); } else if (exception.equals(QuorumNotReachedException.class.getName())) { lastException = new QuorumNotReachedException(detail); continue; } else if (exception.equals(DuplicatedKeyException.class.getName())) { - final String[] exceptionArgs = exceptionArg.split("\\|"); - throw new DuplicatedKeyException(exceptionArgs[0], exceptionArgs[1], new RID(null, exceptionArgs[2])); + final String[] exceptionArgsParts = exceptionArgs.split("\\|"); + throw new DuplicatedKeyException(exceptionArgsParts[0], exceptionArgsParts[1], new RID(null, exceptionArgsParts[2])); } else if (exception.equals(ConcurrentModificationException.class.getName())) { throw new ConcurrentModificationException(detail); } else if (exception.equals(TransactionException.class.getName())) { diff --git a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java index aa61ac47e4..6dea1c118f 100644 --- a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java +++ b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java @@ -632,13 +632,18 @@ protected Object recordFileChanges(final Callable callback) { final AtomicReference result = new AtomicReference<>(); // ACQUIRE A DATABASE WRITE LOCK. THE LOCK IS REENTRANT, SO THE ACQUISITION DOWN THE LINE IS GOING TO PASS BECAUSE ALREADY ACQUIRED HERE - final DatabaseChangeStructureRequest command = (DatabaseChangeStructureRequest) proxied.executeInWriteLock(() -> { - if (!ha.isLeader()) - return callback.call(); + final DatabaseChangeStructureRequest command = proxied.executeInWriteLock(() -> { + if (!ha.isLeader()) { + // NOT THE LEADER< NOT RESPONSIBLE TO SEND CHANGES TO OTHER SERVERS + result.set(callback.call()); + return null; + } - if (!proxied.getFileManager().startRecordingChanges()) + if (!proxied.getFileManager().startRecordingChanges()) { // ALREADY RECORDING - return callback.call(); + result.set(callback.call()); + return null; + } try { result.set(callback.call()); @@ -663,9 +668,11 @@ protected Object recordFileChanges(final Callable callback) { } }); - // SEND THE COMMAND OUTSIDE THE EXCLUSIVE LOCK - final int quorum = ha.getConfiguredServers() - 1; - ha.sendCommandToReplicasWithQuorum(command, quorum, timeout); + if (command != null) { + // SEND THE COMMAND OUTSIDE THE EXCLUSIVE LOCK + final int quorum = ha.getConfiguredServers() - 1; + ha.sendCommandToReplicasWithQuorum(command, quorum, timeout); + } return result.get(); } diff --git a/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java b/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java index d3e6f2c8b2..61aa9a3424 100644 --- a/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java @@ -122,8 +122,6 @@ public void run() { final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - db.begin(); - LogManager.instance().log(this, Level.INFO, "TEST: Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); long counter = 0; @@ -134,8 +132,6 @@ public void run() { for (int retry = 0; retry < getMaxRetry(); ++retry) { try { - db.begin(); - for (int i = 0; i < getVerticesPerTx(); ++i) { ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, "distributed-test"); @@ -151,8 +147,6 @@ public void run() { Assertions.assertEquals("distributed-test", result.getProperty("name")); } - db.commit(); - if (delay > 0) { try { Thread.sleep(delay); diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java index b6f5c1f1ac..a6aab30c34 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java @@ -32,11 +32,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.logging.*; public class ReplicationServerLeaderChanges3TimesIT extends ReplicationServerIT { private final AtomicInteger messagesInTotal = new AtomicInteger(); @@ -57,9 +56,8 @@ public void testReplication() { final String server1Address = getServer(0).getHttpServer().getListeningAddress(); final String[] server1AddressParts = server1Address.split(":"); - final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - - db.begin(); + final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); LogManager.instance().log(this, Level.INFO, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); @@ -83,8 +81,6 @@ public void testReplication() { Assertions.assertEquals("distributed-test", result.getProperty("name")); } - db.commit(); - } catch (DuplicatedKeyException | NeedRetryException | TimeoutException | TransactionException e) { // IGNORE IT LogManager.instance().log(this, Level.SEVERE, "Error on creating vertex %d, retrying (retry=%d/%d)...", e, counter, retry, maxRetry); @@ -109,8 +105,6 @@ public void testReplication() { if (isPrintingConfigurationAtEveryStep()) getLeaderServer().getHA().printClusterConfiguration(); } - - db.begin(); } LogManager.instance().log(this, Level.INFO, "Done"); diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java index 0d3a4780a7..ff88074f0c 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java @@ -27,10 +27,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.logging.*; public class ReplicationServerLeaderDownIT extends ReplicationServerIT { private final AtomicInteger messages = new AtomicInteger(); @@ -54,8 +54,6 @@ public void testReplication() { final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - db.begin(); - LogManager.instance().log(this, Level.INFO, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); long counter = 0; @@ -90,15 +88,11 @@ public void testReplication() { } } - db.commit(); - if (counter % 1000 == 0) { LogManager.instance().log(this, Level.INFO, "- Progress %d/%d", null, counter, (getTxs() * getVerticesPerTx())); if (isPrintingConfigurationAtEveryStep()) getLeaderServer().getHA().printClusterConfiguration(); } - - db.begin(); } LogManager.instance().log(this, Level.INFO, "Done"); diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java index 811d94db8f..06bece8924 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java @@ -27,10 +27,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.logging.*; public class ReplicationServerLeaderDownNoTransactionsToForwardIT extends ReplicationServerIT { private final AtomicInteger messages = new AtomicInteger(); @@ -51,8 +51,6 @@ public void testReplication() { final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - db.begin(); - LogManager.instance().log(this, Level.INFO, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); long counter = 0; @@ -87,15 +85,11 @@ public void testReplication() { } } - db.commit(); - if (counter % 1000 == 0) { LogManager.instance().log(this, Level.INFO, "- Progress %d/%d", null, counter, (getTxs() * getVerticesPerTx())); if (isPrintingConfigurationAtEveryStep()) getLeaderServer().getHA().printClusterConfiguration(); } - - db.begin(); } LogManager.instance().log(this, Level.INFO, "Done");