Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions network/src/main/java/com/arcadedb/remote/RemoteDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,6 @@ public Object call(final HttpURLConnection connection, final JSONObject response
});
}

public void begin() {
command("SQL", "begin");
}

public void commit() {
command("SQL", "commit");
}

public void rollback() {
command("SQL", "rollback");
}
Expand Down Expand Up @@ -248,15 +240,15 @@ private Object httpCommand(final String extendedURL, final String operation, fin
String detail;
String reason;
String exception;
String exceptionArg;
String exceptionArgs;
String responsePayload = null;
try {
responsePayload = FileUtils.readStreamAsString(connection.getErrorStream(), charset);
final JSONObject response = new JSONObject(responsePayload);
reason = response.getString("error");
detail = response.has("detail") ? response.getString("detail") : null;
exception = response.has("exception") ? response.getString("exception") : null;
exceptionArg = response.has("exceptionArg") ? response.getString("exceptionArg") : null;
exceptionArgs = response.has("exceptionArgs") ? response.getString("exceptionArgs") : null;
} catch (Exception e) {
lastException = e;
// TODO CHECK IF THE COMMAND NEEDS TO BE RE-EXECUTED OR NOT
Expand All @@ -271,13 +263,13 @@ private Object httpCommand(final String extendedURL, final String operation, fin

if (exception != null) {
if (exception.equals(ServerIsNotTheLeaderException.class.getName())) {
throw new ServerIsNotTheLeaderException(detail.substring(0, detail.lastIndexOf('.')), exceptionArg);
throw new ServerIsNotTheLeaderException(detail.substring(0, detail.lastIndexOf('.')), exceptionArgs);
} else if (exception.equals(QuorumNotReachedException.class.getName())) {
lastException = new QuorumNotReachedException(detail);
continue;
} else if (exception.equals(DuplicatedKeyException.class.getName())) {
final String[] exceptionArgs = exceptionArg.split("\\|");
throw new DuplicatedKeyException(exceptionArgs[0], exceptionArgs[1], new RID(null, exceptionArgs[2]));
final String[] exceptionArgsParts = exceptionArgs.split("\\|");
throw new DuplicatedKeyException(exceptionArgsParts[0], exceptionArgsParts[1], new RID(null, exceptionArgsParts[2]));
} else if (exception.equals(ConcurrentModificationException.class.getName())) {
throw new ConcurrentModificationException(detail);
} else if (exception.equals(TransactionException.class.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,13 +632,18 @@ protected Object recordFileChanges(final Callable<Object> callback) {
final AtomicReference<Object> result = new AtomicReference<>();

// ACQUIRE A DATABASE WRITE LOCK. THE LOCK IS REENTRANT, SO THE ACQUISITION DOWN THE LINE IS GOING TO PASS BECAUSE ALREADY ACQUIRED HERE
final DatabaseChangeStructureRequest command = (DatabaseChangeStructureRequest) proxied.executeInWriteLock(() -> {
if (!ha.isLeader())
return callback.call();
final DatabaseChangeStructureRequest command = proxied.executeInWriteLock(() -> {
if (!ha.isLeader()) {
// NOT THE LEADER< NOT RESPONSIBLE TO SEND CHANGES TO OTHER SERVERS
result.set(callback.call());
return null;
}

if (!proxied.getFileManager().startRecordingChanges())
if (!proxied.getFileManager().startRecordingChanges()) {
// ALREADY RECORDING
return callback.call();
result.set(callback.call());
return null;
}

try {
result.set(callback.call());
Expand All @@ -663,9 +668,11 @@ protected Object recordFileChanges(final Callable<Object> callback) {
}
});

// SEND THE COMMAND OUTSIDE THE EXCLUSIVE LOCK
final int quorum = ha.getConfiguredServers() - 1;
ha.sendCommandToReplicasWithQuorum(command, quorum, timeout);
if (command != null) {
// SEND THE COMMAND OUTSIDE THE EXCLUSIVE LOCK
final int quorum = ha.getConfiguredServers() - 1;
ha.sendCommandToReplicasWithQuorum(command, quorum, timeout);
}

return result.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ public void run() {
final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root",
BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);

db.begin();

LogManager.instance().log(this, Level.INFO, "TEST: Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx());

long counter = 0;
Expand All @@ -134,8 +132,6 @@ public void run() {
for (int retry = 0; retry < getMaxRetry(); ++retry) {
try {

db.begin();

for (int i = 0; i < getVerticesPerTx(); ++i) {

ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, "distributed-test");
Expand All @@ -151,8 +147,6 @@ public void run() {
Assertions.assertEquals("distributed-test", result.getProperty("name"));
}

db.commit();

if (delay > 0) {
try {
Thread.sleep(delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;

public class ReplicationServerLeaderChanges3TimesIT extends ReplicationServerIT {
private final AtomicInteger messagesInTotal = new AtomicInteger();
Expand All @@ -57,9 +56,8 @@ public void testReplication() {
final String server1Address = getServer(0).getHttpServer().getListeningAddress();
final String[] server1AddressParts = server1Address.split(":");

final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);

db.begin();
final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root",
BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);

LogManager.instance().log(this, Level.INFO, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx());

Expand All @@ -83,8 +81,6 @@ public void testReplication() {
Assertions.assertEquals("distributed-test", result.getProperty("name"));
}

db.commit();

} catch (DuplicatedKeyException | NeedRetryException | TimeoutException | TransactionException e) {
// IGNORE IT
LogManager.instance().log(this, Level.SEVERE, "Error on creating vertex %d, retrying (retry=%d/%d)...", e, counter, retry, maxRetry);
Expand All @@ -109,8 +105,6 @@ public void testReplication() {
if (isPrintingConfigurationAtEveryStep())
getLeaderServer().getHA().printClusterConfiguration();
}

db.begin();
}

LogManager.instance().log(this, Level.INFO, "Done");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;

public class ReplicationServerLeaderDownIT extends ReplicationServerIT {
private final AtomicInteger messages = new AtomicInteger();
Expand All @@ -54,8 +54,6 @@ public void testReplication() {
final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root",
BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);

db.begin();

LogManager.instance().log(this, Level.INFO, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx());

long counter = 0;
Expand Down Expand Up @@ -90,15 +88,11 @@ public void testReplication() {
}
}

db.commit();

if (counter % 1000 == 0) {
LogManager.instance().log(this, Level.INFO, "- Progress %d/%d", null, counter, (getTxs() * getVerticesPerTx()));
if (isPrintingConfigurationAtEveryStep())
getLeaderServer().getHA().printClusterConfiguration();
}

db.begin();
}

LogManager.instance().log(this, Level.INFO, "Done");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;

public class ReplicationServerLeaderDownNoTransactionsToForwardIT extends ReplicationServerIT {
private final AtomicInteger messages = new AtomicInteger();
Expand All @@ -51,8 +51,6 @@ public void testReplication() {
final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root",
BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);

db.begin();

LogManager.instance().log(this, Level.INFO, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx());

long counter = 0;
Expand Down Expand Up @@ -87,15 +85,11 @@ public void testReplication() {
}
}

db.commit();

if (counter % 1000 == 0) {
LogManager.instance().log(this, Level.INFO, "- Progress %d/%d", null, counter, (getTxs() * getVerticesPerTx()));
if (isPrintingConfigurationAtEveryStep())
getLeaderServer().getHA().printClusterConfiguration();
}

db.begin();
}

LogManager.instance().log(this, Level.INFO, "Done");
Expand Down