diff --git a/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java b/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java index a623ff426f..100871c86c 100644 --- a/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java +++ b/engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java @@ -1444,6 +1444,10 @@ public void setWrapper(final String name, final Object instance) { this.wrappers.put(name, instance); } + public QueryEngineManager getQueryEngineManager() { + return queryEngineManager; + } + public void saveConfiguration() throws IOException { FileUtils.writeFile(configurationFile, configuration.toJSON()); } diff --git a/engine/src/main/java/com/arcadedb/database/TransactionContext.java b/engine/src/main/java/com/arcadedb/database/TransactionContext.java index a04aa11649..a8bd983cb3 100644 --- a/engine/src/main/java/com/arcadedb/database/TransactionContext.java +++ b/engine/src/main/java/com/arcadedb/database/TransactionContext.java @@ -33,7 +33,6 @@ import com.arcadedb.index.Index; import com.arcadedb.index.lsm.LSMTreeIndexAbstract; import com.arcadedb.log.LogManager; -import com.arcadedb.utility.Pair; import java.io.*; import java.util.*; @@ -70,6 +69,16 @@ public class TransactionContext implements Transaction { public enum STATUS {INACTIVE, BEGUN, COMMIT_1ST_PHASE, COMMIT_2ND_PHASE} + public class TransactionPhase1 { + public final Binary result; + public final List modifiedPages; + + public TransactionPhase1(final Binary result, final List modifiedPages) { + this.result = result; + this.modifiedPages = modifiedPages; + } + } + public TransactionContext(final DatabaseInternal database) { this.database = database; this.walFlush = WALFile.getWALFlushType(database.getConfiguration().getValueAsInteger(GlobalConfiguration.TX_WAL_FLUSH)); @@ -99,17 +108,17 @@ public Binary commit() { if (status != STATUS.BEGUN) throw new TransactionException("Transaction already in commit phase"); - final Pair> changes = commit1stPhase(true); + final TransactionPhase1 phase1 = commit1stPhase(true); - if (changes != null) - commit2ndPhase(changes); + if (phase1 != null) + commit2ndPhase(phase1); else reset(); if (database.getSchema().getEmbedded().isDirty()) database.getSchema().getEmbedded().saveConfiguration(); - return changes != null ? changes.getFirst() : null; + return phase1 != null ? phase1.result : null; } public Record getRecordFromCache(final RID rid) { @@ -377,6 +386,9 @@ public void commitFromReplica(final WALFile.WALTransaction buffer, indexChanges.setKeys(keysTx); indexChanges.addFilesToLock(modifiedFiles); + final int dictionaryFileId = database.getSchema().getDictionary().getId(); + boolean dictionaryModified = false; + for (WALFile.WALPage p : buffer.pages) { final PaginatedFile file = database.getFileManager().getFile(p.fileId); final int pageSize = file.getPageSize(); @@ -396,10 +408,16 @@ public void commitFromReplica(final WALFile.WALTransaction buffer, newPageCounters.put(pageId.getFileId(), pageId.getPageNumber() + 1); } else modifiedPages.put(pageId, page); + + if (!dictionaryModified && dictionaryFileId == pageId.getFileId()) + dictionaryModified = true; } database.commit(); + if (dictionaryModified) + database.getSchema().getDictionary().reload(); + } catch (ConcurrentModificationException e) { rollback(); throw e; @@ -412,7 +430,7 @@ public void commitFromReplica(final WALFile.WALTransaction buffer, /** * Locks the files in order, then checks all the pre-conditions. */ - public Pair> commit1stPhase(final boolean isLeader) { + public TransactionPhase1 commit1stPhase(final boolean isLeader) { if (status == STATUS.INACTIVE) throw new TransactionException("Transaction not started"); @@ -453,7 +471,6 @@ public Pair> commit1stPhase(final boolean isLeader) { // CHECK THE VERSIONS FIRST final List pages = new ArrayList<>(); - final PageManager pageManager = database.getPageManager(); for (final Iterator it = modifiedPages.values().iterator(); it.hasNext(); ) { @@ -481,13 +498,11 @@ public Pair> commit1stPhase(final boolean isLeader) { if (useWAL) { txId = database.getTransactionManager().getNextTransactionId(); - - LogManager.instance().log(this, Level.FINE, "Creating buffer for TX %d (threadId=%d)", null, txId, Thread.currentThread().getId()); - + //LogManager.instance().log(this, Level.FINE, "Creating buffer for TX %d (threadId=%d)", null, txId, Thread.currentThread().getId()); result = database.getTransactionManager().createTransactionBuffer(txId, pages); } - return new Pair<>(result, pages); + return new TransactionPhase1(result, pages); } catch (DuplicatedKeyException | ConcurrentModificationException e) { rollback(); @@ -499,7 +514,10 @@ public Pair> commit1stPhase(final boolean isLeader) { } } - public void commit2ndPhase(final Pair> changes) { + public void commit2ndPhase(final TransactionContext.TransactionPhase1 changes) { + if (changes == null) + return; + if (database.getMode() == PaginatedFile.MODE.READ_ONLY) throw new TransactionException("Cannot commit changes because the database is open in read-only mode"); @@ -511,9 +529,9 @@ public void commit2ndPhase(final Pair> changes) { final PageManager pageManager = database.getPageManager(); try { - if (changes.getFirst() != null) + if (changes.result != null) // WRITE TO THE WAL FIRST - database.getTransactionManager().writeTransactionToWAL(changes.getSecond(), walFlush, txId, changes.getFirst()); + database.getTransactionManager().writeTransactionToWAL(changes.modifiedPages, walFlush, txId, changes.result); // AT THIS POINT, LOCK + VERSION CHECK, THERE IS NO NEED TO MANAGE ROLLBACK BECAUSE THERE CANNOT BE CONCURRENT TX THAT UPDATE THE SAME PAGE CONCURRENTLY // UPDATE PAGE COUNTER FIRST diff --git a/engine/src/main/java/com/arcadedb/index/IndexFactory.java b/engine/src/main/java/com/arcadedb/index/IndexFactory.java index 7d1aba4c92..d9ddfd0ddd 100644 --- a/engine/src/main/java/com/arcadedb/index/IndexFactory.java +++ b/engine/src/main/java/com/arcadedb/index/IndexFactory.java @@ -20,7 +20,6 @@ import com.arcadedb.index.lsm.LSMTreeIndexAbstract; import com.arcadedb.schema.Type; -import java.io.*; import java.util.*; public class IndexFactory { @@ -32,7 +31,7 @@ public void register(final String type, final IndexFactoryHandler handler) { public IndexInternal createIndex(final String indexType, final DatabaseInternal database, final String indexName, final boolean unique, final String filePath, final PaginatedFile.MODE mode, final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, - final Index.BuildIndexCallback callback) throws IOException { + final Index.BuildIndexCallback callback) { final IndexFactoryHandler handler = map.get(indexType); if (handler == null) diff --git a/engine/src/main/java/com/arcadedb/index/IndexFactoryHandler.java b/engine/src/main/java/com/arcadedb/index/IndexFactoryHandler.java index dedba0d537..c660015058 100644 --- a/engine/src/main/java/com/arcadedb/index/IndexFactoryHandler.java +++ b/engine/src/main/java/com/arcadedb/index/IndexFactoryHandler.java @@ -20,9 +20,7 @@ import com.arcadedb.index.lsm.LSMTreeIndexAbstract; import com.arcadedb.schema.Type; -import java.io.*; - public interface IndexFactoryHandler { IndexInternal create(DatabaseInternal database, String name, boolean unique, String filePath, PaginatedFile.MODE mode, Type[] keyTypes, int pageSize, - LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback) throws IOException; + LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback); } diff --git a/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeFullTextIndex.java b/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeFullTextIndex.java index a29dbe025a..e153d5217a 100644 --- a/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeFullTextIndex.java +++ b/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeFullTextIndex.java @@ -65,8 +65,7 @@ public class LSMTreeFullTextIndex implements Index, IndexInternal { public static class IndexFactoryHandler implements com.arcadedb.index.IndexFactoryHandler { @Override public IndexInternal create(final DatabaseInternal database, final String name, final boolean unique, final String filePath, final PaginatedFile.MODE mode, - final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback) - throws IOException { + final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback) { return new LSMTreeFullTextIndex(database, name, filePath, mode, pageSize, callback); } } @@ -85,12 +84,8 @@ public PaginatedComponent createOnLoad(final DatabaseInternal database, final St */ public LSMTreeFullTextIndex(final DatabaseInternal database, final String name, final String filePath, final PaginatedFile.MODE mode, final int pageSize, final BuildIndexCallback callback) { - try { - analyzer = new StandardAnalyzer(); - underlyingIndex = new LSMTreeIndex(database, name, false, filePath, mode, new Type[] { Type.STRING }, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.ERROR); - } catch (IOException e) { - throw new IndexException("Cannot create search engine (error=" + e + ")", e); - } + analyzer = new StandardAnalyzer(); + underlyingIndex = new LSMTreeIndex(database, name, false, filePath, mode, new Type[] { Type.STRING }, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.ERROR); } /** diff --git a/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndex.java b/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndex.java index 0ab7740654..74b751c8df 100644 --- a/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndex.java +++ b/engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndex.java @@ -67,8 +67,7 @@ public class LSMTreeIndex implements RangeIndex, IndexInternal { public static class IndexFactoryHandler implements com.arcadedb.index.IndexFactoryHandler { @Override public IndexInternal create(final DatabaseInternal database, final String name, final boolean unique, final String filePath, final PaginatedFile.MODE mode, - final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback) - throws IOException { + final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback) { return new LSMTreeIndex(database, name, unique, filePath, mode, keyTypes, pageSize, nullStrategy); } } @@ -99,9 +98,13 @@ public PaginatedComponent createOnLoad(final DatabaseInternal database, final St * Called at creation time. */ public LSMTreeIndex(final DatabaseInternal database, final String name, final boolean unique, String filePath, final PaginatedFile.MODE mode, - final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy) throws IOException { - this.name = name; - this.mutable = new LSMTreeIndexMutable(this, database, name, unique, filePath, mode, keyTypes, pageSize, nullStrategy); + final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy) { + try { + this.name = name; + this.mutable = new LSMTreeIndexMutable(this, database, name, unique, filePath, mode, keyTypes, pageSize, nullStrategy); + } catch (IOException e) { + throw new IndexException("Error on creating index '" + name + "'", e); + } } /** diff --git a/engine/src/main/java/com/arcadedb/log/DefaultLogger.java b/engine/src/main/java/com/arcadedb/log/DefaultLogger.java index 692b729238..3cb97cca7f 100644 --- a/engine/src/main/java/com/arcadedb/log/DefaultLogger.java +++ b/engine/src/main/java/com/arcadedb/log/DefaultLogger.java @@ -69,13 +69,15 @@ public void installCustomFormatter() { } } - public void log(final Object requester, final Level level, String message, final Throwable exception, final String context, final Object arg1, + public void log(final Object requester, Level level, String message, final Throwable exception, final String context, final Object arg1, final Object arg2, final Object arg3, final Object arg4, final Object arg5, final Object arg6, final Object arg7, final Object arg8, final Object arg9, final Object arg10, final Object arg11, final Object arg12, final Object arg13, final Object arg14, final Object arg15, final Object arg16, final Object arg17) { if (message == null) return; + //level = Level.SEVERE; + final String requesterName; if (requester instanceof String) requesterName = (String) requester; diff --git a/engine/src/main/java/com/arcadedb/query/QueryEngine.java b/engine/src/main/java/com/arcadedb/query/QueryEngine.java index 7fd3f9d61f..305e987686 100644 --- a/engine/src/main/java/com/arcadedb/query/QueryEngine.java +++ b/engine/src/main/java/com/arcadedb/query/QueryEngine.java @@ -21,6 +21,12 @@ import java.util.*; public interface QueryEngine { + interface AnalyzedQuery { + boolean isIdempotent(); + + boolean isDDL(); + } + interface QueryEngineFactory { boolean isAvailable(); @@ -29,6 +35,8 @@ interface QueryEngineFactory { QueryEngine getInstance(DatabaseInternal database); } + AnalyzedQuery analyze(String query); + ResultSet query(String query, Map parameters); ResultSet query(String query, Object... parameters); diff --git a/engine/src/main/java/com/arcadedb/query/cypher/CypherQueryEngine.java b/engine/src/main/java/com/arcadedb/query/cypher/CypherQueryEngine.java index c25d07b88c..7c4845eb1d 100644 --- a/engine/src/main/java/com/arcadedb/query/cypher/CypherQueryEngine.java +++ b/engine/src/main/java/com/arcadedb/query/cypher/CypherQueryEngine.java @@ -25,21 +25,18 @@ import com.arcadedb.query.sql.executor.Result; import com.arcadedb.query.sql.executor.ResultInternal; import com.arcadedb.query.sql.executor.ResultSet; -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.stream.Collectors; +import java.lang.reflect.*; +import java.util.*; +import java.util.logging.*; +import java.util.stream.*; public class CypherQueryEngine implements QueryEngine { private static final String ENGINE_NAME = "cypher-engine"; - private final Object arcadeGraph; + private final Object arcadeGraph; public static class CypherQueryEngineFactory implements QueryEngineFactory { - private static Boolean available = null; + private static Boolean available = null; private static Class arcadeGraphClass; private static Class arcadeCypherClass; @@ -84,6 +81,21 @@ protected CypherQueryEngine(final Object arcadeGraph) { this.arcadeGraph = arcadeGraph; } + @Override + public AnalyzedQuery analyze(String query) { + return new AnalyzedQuery() { + @Override + public boolean isIdempotent() { + return false; + } + + @Override + public boolean isDDL() { + return false; + } + }; + } + @Override public ResultSet query(final String query, final Map parameters) { return command(query, parameters); diff --git a/engine/src/main/java/com/arcadedb/query/gremlin/GremlinQueryEngine.java b/engine/src/main/java/com/arcadedb/query/gremlin/GremlinQueryEngine.java index 03738785f3..f034dedb1a 100644 --- a/engine/src/main/java/com/arcadedb/query/gremlin/GremlinQueryEngine.java +++ b/engine/src/main/java/com/arcadedb/query/gremlin/GremlinQueryEngine.java @@ -107,4 +107,20 @@ public ResultSet command(final String query, final Object... parameters) { map.put((String) parameters[i], parameters[i + 1]); return command(query, map); } + + @Override + public AnalyzedQuery analyze(String query) { + return new AnalyzedQuery() { + @Override + public boolean isIdempotent() { + return false; + } + + @Override + public boolean isDDL() { + return false; + } + }; + } + } diff --git a/engine/src/main/java/com/arcadedb/query/mongo/MongoQueryEngine.java b/engine/src/main/java/com/arcadedb/query/mongo/MongoQueryEngine.java index dd1681ab94..67739c623e 100644 --- a/engine/src/main/java/com/arcadedb/query/mongo/MongoQueryEngine.java +++ b/engine/src/main/java/com/arcadedb/query/mongo/MongoQueryEngine.java @@ -73,6 +73,21 @@ protected MongoQueryEngine(final Object mongoDBWrapper) { this.mongoDBWrapper = mongoDBWrapper; } + @Override + public AnalyzedQuery analyze(String query) { + return new AnalyzedQuery() { + @Override + public boolean isIdempotent() { + return false; + } + + @Override + public boolean isDDL() { + return false; + } + }; + } + @Override public ResultSet query(final String query, final Map parameters) { try { diff --git a/engine/src/main/java/com/arcadedb/query/sql/SQLQueryEngine.java b/engine/src/main/java/com/arcadedb/query/sql/SQLQueryEngine.java index 6d21aa0407..925f325059 100644 --- a/engine/src/main/java/com/arcadedb/query/sql/SQLQueryEngine.java +++ b/engine/src/main/java/com/arcadedb/query/sql/SQLQueryEngine.java @@ -89,4 +89,20 @@ public ResultSet command(String query, Object... parameters) { return statement.execute(database, parameters); } + + @Override + public AnalyzedQuery analyze(final String query) { + final Statement statement = SQLEngine.parse(query, database); + return new AnalyzedQuery() { + @Override + public boolean isIdempotent() { + return statement.isIdempotent(); + } + + @Override + public boolean isDDL() { + return statement.isDDL(); + } + }; + } } diff --git a/engine/src/main/java/com/arcadedb/query/sql/executor/InternalResultSet.java b/engine/src/main/java/com/arcadedb/query/sql/executor/InternalResultSet.java index 4fd0074d91..cfb5619ee9 100644 --- a/engine/src/main/java/com/arcadedb/query/sql/executor/InternalResultSet.java +++ b/engine/src/main/java/com/arcadedb/query/sql/executor/InternalResultSet.java @@ -69,6 +69,11 @@ public void reset() { this.next = 0; } + @Override + public long estimateSize() { + return content.size(); + } + public InternalResultSet copy() { final InternalResultSet copy = new InternalResultSet(); copy.content = this.content; diff --git a/engine/src/main/java/com/arcadedb/query/sql/executor/IteratorResultSet.java b/engine/src/main/java/com/arcadedb/query/sql/executor/IteratorResultSet.java index 719ad7bd38..8050a98786 100644 --- a/engine/src/main/java/com/arcadedb/query/sql/executor/IteratorResultSet.java +++ b/engine/src/main/java/com/arcadedb/query/sql/executor/IteratorResultSet.java @@ -17,10 +17,7 @@ import com.arcadedb.database.Document; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; /** * Created by luigidellaquila on 07/07/16. @@ -55,7 +52,6 @@ public Result next() { @Override public void close() { - } @Override diff --git a/engine/src/main/java/com/arcadedb/query/sql/parser/Statement.java b/engine/src/main/java/com/arcadedb/query/sql/parser/Statement.java index 606802157a..db5002f648 100755 --- a/engine/src/main/java/com/arcadedb/query/sql/parser/Statement.java +++ b/engine/src/main/java/com/arcadedb/query/sql/parser/Statement.java @@ -137,6 +137,10 @@ public boolean isIdempotent() { return false; } + public boolean isDDL() { + return this instanceof DDLStatement; + } + public static Statement deserializeFromOResult(final Result doc) { try { Statement result = (Statement) Class.forName(doc.getProperty("__class")).getConstructor(Integer.class).newInstance(-1); diff --git a/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java b/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java index 9f6874d2db..c2b73c8649 100644 --- a/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java +++ b/engine/src/main/java/com/arcadedb/schema/EmbeddedSchema.java @@ -611,20 +611,16 @@ public Index createManualIndex(final INDEX_TYPE indexType, final boolean unique, final AtomicReference result = new AtomicReference<>(); database.transaction(() -> { - try { - final IndexInternal index = indexFactory.createIndex(indexType.name(), database, FileUtils.encode(indexName, ENCODING), unique, - databasePath + "/" + indexName, PaginatedFile.MODE.READ_WRITE, keyTypes, pageSize, nullStrategy, null); + final IndexInternal index = indexFactory.createIndex(indexType.name(), database, FileUtils.encode(indexName, ENCODING), unique, + databasePath + "/" + indexName, PaginatedFile.MODE.READ_WRITE, keyTypes, pageSize, nullStrategy, null); - result.set(index); + result.set(index); - if (index instanceof PaginatedComponent) - registerFile((PaginatedComponent) index); + if (index instanceof PaginatedComponent) + registerFile((PaginatedComponent) index); - indexMap.put(indexName, index); + indexMap.put(indexName, index); - } catch (IOException e) { - throw new SchemaException("Cannot create index '" + indexName + "' (error=" + e + ")", e); - } }, false, 1, null, (error) -> { final IndexInternal indexToRemove = result.get(); if (indexToRemove != null) { diff --git a/server/src/main/java/com/arcadedb/server/ha/HAServer.java b/server/src/main/java/com/arcadedb/server/ha/HAServer.java index 245d2d6c3a..1101eb91d5 100644 --- a/server/src/main/java/com/arcadedb/server/ha/HAServer.java +++ b/server/src/main/java/com/arcadedb/server/ha/HAServer.java @@ -708,8 +708,8 @@ public long sendCommandToReplicasWithQuorum(final HACommand command, final int q checkCurrentNodeIsTheLeader(); - server.log(this, Level.WARNING, "Timeout waiting for quorum to be reached for request " + opNumber); - throw new QuorumNotReachedException("Timeout waiting for quorum to be reached for request " + opNumber); + server.log(this, Level.WARNING, "Timeout waiting for quorum (%d) to be reached for request %d", quorum, opNumber); + throw new QuorumNotReachedException("Timeout waiting for quorum (" + quorum + ") to be reached for request " + opNumber); } } catch (InterruptedException e) { diff --git a/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java b/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java index 48f345ee9a..ff68e4b556 100755 --- a/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java +++ b/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java @@ -19,6 +19,7 @@ import com.arcadedb.ContextConfiguration; import com.arcadedb.GlobalConfiguration; import com.arcadedb.database.Binary; +import com.arcadedb.exception.TimeoutException; import com.arcadedb.log.LogManager; import com.arcadedb.network.binary.ChannelBinaryServer; import com.arcadedb.network.binary.ConnectionException; @@ -161,7 +162,8 @@ public void run() { switch (status) { case ONLINE: - server.getServer().log(this, Level.FINE, "Sending message to replica '%s' (buffered=%d)...", remoteServerName, senderQueue.size()); + server.getServer().log(this, Level.FINE, "Sending message to replica '%s' (msgSize=%d buffered=%d)...", remoteServerName, lastMessage.size(), + senderQueue.size()); sendMessage(lastMessage); lastMessage = null; @@ -230,8 +232,9 @@ public void run() { final Binary buffer = new Binary(8192); while (!shutdownCommunication) { + Pair request = null; try { - final Pair request = server.getMessageFactory().deserializeCommand(buffer, readRequest()); + request = server.getMessageFactory().deserializeCommand(buffer, readRequest()); if (request == null) { channel.clearInput(); @@ -244,6 +247,8 @@ public void run() { else executeMessage(buffer, request); + } catch (TimeoutException e) { + server.getServer().log(this, Level.FINE, "Request %s in timeout (cause=%s)", request, e.getCause()); } catch (IOException e) { server.getServer().log(this, Level.FINE, "IO Error from reading requests (cause=%s)", e.getCause()); server.setReplicaStatus(remoteServerName, false); diff --git a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java index 1ef5b28828..6876946035 100644 --- a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java +++ b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java @@ -37,7 +37,6 @@ import com.arcadedb.database.async.ErrorCallback; import com.arcadedb.database.async.OkCallback; import com.arcadedb.engine.FileManager; -import com.arcadedb.engine.MutablePage; import com.arcadedb.engine.PageManager; import com.arcadedb.engine.PaginatedFile; import com.arcadedb.engine.TransactionManager; @@ -45,13 +44,14 @@ import com.arcadedb.engine.WALFileFactory; import com.arcadedb.exception.ConfigurationException; import com.arcadedb.exception.NeedRetryException; -import com.arcadedb.exception.SchemaException; import com.arcadedb.exception.TransactionException; import com.arcadedb.graph.Edge; import com.arcadedb.graph.GraphEngine; import com.arcadedb.graph.MutableVertex; import com.arcadedb.graph.Vertex; import com.arcadedb.index.IndexCursor; +import com.arcadedb.network.binary.ServerIsNotTheLeaderException; +import com.arcadedb.query.QueryEngine; import com.arcadedb.query.sql.executor.ResultSet; import com.arcadedb.query.sql.parser.ExecutionPlanCache; import com.arcadedb.query.sql.parser.StatementCache; @@ -63,7 +63,6 @@ import com.arcadedb.server.ha.message.DatabaseChangeStructureRequest; import com.arcadedb.server.ha.message.TxForwardRequest; import com.arcadedb.server.ha.message.TxRequest; -import com.arcadedb.utility.Pair; import org.json.JSONObject; import java.io.*; @@ -101,14 +100,14 @@ public void commit() { final TransactionContext tx = current.getLastTransaction(); try { - final Pair> changes = tx.commit1stPhase(isLeader); + final TransactionContext.TransactionPhase1 phase1 = tx.commit1stPhase(isLeader); try { - if (changes != null) { - final Binary bufferChanges = changes.getFirst(); + if (phase1 != null) { + final Binary bufferChanges = phase1.result; if (isLeader) - replicateTx(tx, changes, bufferChanges); + replicateTx(tx, phase1, bufferChanges); else { // USE A BIGGER TIMEOUT CONSIDERING THE DOUBLE LATENCY final TxForwardRequest command = new TxForwardRequest(ReplicatedDatabase.this, bufferChanges, tx.getIndexChanges().toMap()); @@ -137,7 +136,7 @@ public void commit() { }); } - public void replicateTx(final TransactionContext tx, final Pair> changes, final Binary bufferChanges) { + public void replicateTx(final TransactionContext tx, final TransactionContext.TransactionPhase1 phase1, final Binary bufferChanges) { final int configuredServers = server.getHA().getConfiguredServers(); final int reqQuorum; @@ -177,7 +176,7 @@ public void replicateTx(final TransactionContext tx, final Pair proxied.command(language, query, args)); @@ -561,9 +564,13 @@ public ResultSet command(final String language, final String query, final Object @Override public ResultSet command(final String language, final String query, final Map args) { if (!server.getHA().isLeader()) { - // USE A BIGGER TIMEOUT CONSIDERING THE DOUBLE LATENCY - final CommandForwardRequest command = new CommandForwardRequest(ReplicatedDatabase.this, language, query, args, null); - return (ResultSet) server.getHA().forwardCommandToLeader(command, timeout * 2); + final QueryEngine.AnalyzedQuery analyzed = proxied.getQueryEngineManager().getInstance(language, this).analyze(query); + if (analyzed.isDDL()) { + // USE A BIGGER TIMEOUT CONSIDERING THE DOUBLE LATENCY + final CommandForwardRequest command = new CommandForwardRequest(ReplicatedDatabase.this, language, query, args, null); + return (ResultSet) server.getHA().forwardCommandToLeader(command, timeout * 2); + } + return proxied.command(language, query, args); } return recordFileChanges(() -> proxied.command(language, query, args)); @@ -654,7 +661,7 @@ public RET recordFileChanges(final Callable callback) { if (!ha.isLeader()) { // NOT THE LEADER: NOT RESPONSIBLE TO SEND CHANGES TO OTHER SERVERS // TODO: Issue #118SchemaException - throw new SchemaException("Changes to the schema must be executed on the leader server"); + throw new ServerIsNotTheLeaderException("Changes to the schema must be executed on the leader server", ha.getLeaderName()); // result.set(callback.call()); // return null; } diff --git a/server/src/main/java/com/arcadedb/server/ha/message/CommandForwardRequest.java b/server/src/main/java/com/arcadedb/server/ha/message/CommandForwardRequest.java index 300aaf260e..4e7b3e4b78 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/CommandForwardRequest.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/CommandForwardRequest.java @@ -23,8 +23,7 @@ import com.arcadedb.server.ha.HAServer; import com.arcadedb.server.ha.ReplicationException; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.*; /** * Forward a command to the Leader server to be executed. @@ -132,6 +131,6 @@ public HACommand execute(final HAServer server, final String remoteServerName, f @Override public String toString() { - return "command-forward-request(" + databaseName + ")"; + return "command-forward-request(" + databaseName + "," + language + "," + command + ")"; } } diff --git a/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java b/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java index 91b3f41813..90f7d04d16 100644 --- a/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java +++ b/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java @@ -423,9 +423,10 @@ protected void deleteDatabaseFolders() { if (servers != null) for (int i = 0; i < getServerCount(); ++i) - for (String dbName : getServer(i).getDatabaseNames()) - if (getServer(i).existsDatabase(dbName)) - ((DatabaseInternal) getServer(i).getDatabase(dbName)).getWrappedDatabaseInstance().drop(); + if (getServer(i) != null) + for (String dbName : getServer(i).getDatabaseNames()) + if (getServer(i).existsDatabase(dbName)) + ((DatabaseInternal) getServer(i).getDatabase(dbName)).getWrappedDatabaseInstance().drop(); Assertions.assertTrue(DatabaseFactory.getActiveDatabaseInstances().isEmpty(), "Found active databases: " + DatabaseFactory.getActiveDatabaseInstances()); diff --git a/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java b/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java index c8797d20b7..16b4c2ecb1 100644 --- a/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java @@ -105,24 +105,71 @@ public void checkRecordCreate() throws Exception { @Test public void checkDeleteGraphElements() throws Exception { - //testEachServer((serverIndex) -> { - final int serverIndex = 0; + testEachServer((serverIndex) -> { + LogManager.instance().log(this, Level.INFO, "TESTS SERVER " + serverIndex); + + String v1 = new JSONObject(createRecord(serverIndex, "{\"@type\":\"V1\",\"name\":\"Jay\",\"surname\":\"Miner\",\"age\":69}")).getString("result"); + testEachServer((checkServer) -> { + try { + Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + v1)).getJSONArray("result").isEmpty(), "server " + serverIndex); + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "Error on checking for V1 on server " + checkServer); + throw e; + } + }); + String v2 = new JSONObject(createRecord(serverIndex, "{\"@type\":\"V1\",\"name\":\"Elon\",\"surname\":\"Musk\",\"age\":50}")).getString("result"); + testEachServer((checkServer) -> { + try { + Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + v2)).getJSONArray("result").isEmpty(), "server " + serverIndex); + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "Error on checking for V2 on server " + checkServer); + throw e; + } + }); + String e1 = new JSONObject(command(serverIndex, "create edge E1 from " + v1 + " to " + v2)).getJSONArray("result").getJSONObject(0).getString("@rid"); + testEachServer((checkServer) -> { + try { + Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + e1)).getJSONArray("result").isEmpty(), "server " + serverIndex); + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "Error on checking on E1 on server " + checkServer); + throw e; + } + }); + String v3 = new JSONObject(createRecord(serverIndex, "{\"@type\":\"V1\",\"name\":\"Nikola\",\"surname\":\"Tesla\",\"age\":150}")).getString("result"); + testEachServer((checkServer) -> { + try { + Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + v3)).getJSONArray("result").isEmpty(), "server " + serverIndex); + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "Error on checking for V3 on server " + checkServer); + throw e; + } + }); + String e2 = new JSONObject(command(serverIndex, "create edge E2 from " + v2 + " to " + v3)).getJSONArray("result").getJSONObject(0).getString("@rid"); + testEachServer((checkServer) -> { + try { + Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + e2)).getJSONArray("result").isEmpty(), "server " + serverIndex); + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "Error on checking for E2 on server " + checkServer); + throw e; + } + }); command(serverIndex, "delete from " + v1); - testEachServer((checkServer) -> { - Assertions.assertTrue(new JSONObject(command(checkServer, "select from " + v1)).getJSONArray("result").isEmpty()); - Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + v2)).getJSONArray("result").isEmpty()); - Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + v3)).getJSONArray("result").isEmpty()); - Assertions.assertTrue(new JSONObject(command(checkServer, "select from " + e1)).getJSONArray("result").isEmpty()); - Assertions.assertFalse(new JSONObject(command(checkServer, "select from " + e2)).getJSONArray("result").isEmpty()); + try { + Assertions.assertTrue(new JSONObject(command(checkServer, "select from " + v1)).getJSONArray("result").isEmpty(), "server " + serverIndex); + Assertions.assertTrue(new JSONObject(command(checkServer, "select from " + e1)).getJSONArray("result").isEmpty(), "server " + serverIndex); + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "Error on checking for right deletion on server " + checkServer); + throw e; + } }); -// }); + }); } private String createRecord(final int serverIndex, final String payload) throws IOException { diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java index d28061bca1..5e5250cea3 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java @@ -20,6 +20,7 @@ import com.arcadedb.exception.SchemaException; import com.arcadedb.exception.TransactionException; import com.arcadedb.index.Index; +import com.arcadedb.network.binary.ServerIsNotTheLeaderException; import com.arcadedb.schema.Property; import com.arcadedb.schema.Schema; import com.arcadedb.schema.Type; @@ -65,7 +66,7 @@ public void testReplication() throws Exception { try { databases[1].getSchema().createVertexType("RuntimeVertex1"); Assertions.fail(); - } catch (SchemaException e) { + } catch (ServerIsNotTheLeaderException e) { // EXPECTED }