Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,10 @@ public void setWrapper(final String name, final Object instance) {
this.wrappers.put(name, instance);
}

public QueryEngineManager getQueryEngineManager() {
return queryEngineManager;
}

public void saveConfiguration() throws IOException {
FileUtils.writeFile(configurationFile, configuration.toJSON());
}
Expand Down
46 changes: 32 additions & 14 deletions engine/src/main/java/com/arcadedb/database/TransactionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.arcadedb.index.Index;
import com.arcadedb.index.lsm.LSMTreeIndexAbstract;
import com.arcadedb.log.LogManager;
import com.arcadedb.utility.Pair;

import java.io.*;
import java.util.*;
Expand Down Expand Up @@ -70,6 +69,16 @@ public class TransactionContext implements Transaction {

public enum STATUS {INACTIVE, BEGUN, COMMIT_1ST_PHASE, COMMIT_2ND_PHASE}

public class TransactionPhase1 {
public final Binary result;
public final List<MutablePage> modifiedPages;

public TransactionPhase1(final Binary result, final List<MutablePage> modifiedPages) {
this.result = result;
this.modifiedPages = modifiedPages;
}
}

public TransactionContext(final DatabaseInternal database) {
this.database = database;
this.walFlush = WALFile.getWALFlushType(database.getConfiguration().getValueAsInteger(GlobalConfiguration.TX_WAL_FLUSH));
Expand Down Expand Up @@ -99,17 +108,17 @@ public Binary commit() {
if (status != STATUS.BEGUN)
throw new TransactionException("Transaction already in commit phase");

final Pair<Binary, List<MutablePage>> changes = commit1stPhase(true);
final TransactionPhase1 phase1 = commit1stPhase(true);

if (changes != null)
commit2ndPhase(changes);
if (phase1 != null)
commit2ndPhase(phase1);
else
reset();

if (database.getSchema().getEmbedded().isDirty())
database.getSchema().getEmbedded().saveConfiguration();

return changes != null ? changes.getFirst() : null;
return phase1 != null ? phase1.result : null;
}

public Record getRecordFromCache(final RID rid) {
Expand Down Expand Up @@ -377,6 +386,9 @@ public void commitFromReplica(final WALFile.WALTransaction buffer,
indexChanges.setKeys(keysTx);
indexChanges.addFilesToLock(modifiedFiles);

final int dictionaryFileId = database.getSchema().getDictionary().getId();
boolean dictionaryModified = false;

for (WALFile.WALPage p : buffer.pages) {
final PaginatedFile file = database.getFileManager().getFile(p.fileId);
final int pageSize = file.getPageSize();
Expand All @@ -396,10 +408,16 @@ public void commitFromReplica(final WALFile.WALTransaction buffer,
newPageCounters.put(pageId.getFileId(), pageId.getPageNumber() + 1);
} else
modifiedPages.put(pageId, page);

if (!dictionaryModified && dictionaryFileId == pageId.getFileId())
dictionaryModified = true;
}

database.commit();

if (dictionaryModified)
database.getSchema().getDictionary().reload();

} catch (ConcurrentModificationException e) {
rollback();
throw e;
Expand All @@ -412,7 +430,7 @@ public void commitFromReplica(final WALFile.WALTransaction buffer,
/**
* Locks the files in order, then checks all the pre-conditions.
*/
public Pair<Binary, List<MutablePage>> commit1stPhase(final boolean isLeader) {
public TransactionPhase1 commit1stPhase(final boolean isLeader) {
if (status == STATUS.INACTIVE)
throw new TransactionException("Transaction not started");

Expand Down Expand Up @@ -453,7 +471,6 @@ public Pair<Binary, List<MutablePage>> commit1stPhase(final boolean isLeader) {

// CHECK THE VERSIONS FIRST
final List<MutablePage> pages = new ArrayList<>();

final PageManager pageManager = database.getPageManager();

for (final Iterator<MutablePage> it = modifiedPages.values().iterator(); it.hasNext(); ) {
Expand Down Expand Up @@ -481,13 +498,11 @@ public Pair<Binary, List<MutablePage>> commit1stPhase(final boolean isLeader) {

if (useWAL) {
txId = database.getTransactionManager().getNextTransactionId();

LogManager.instance().log(this, Level.FINE, "Creating buffer for TX %d (threadId=%d)", null, txId, Thread.currentThread().getId());

//LogManager.instance().log(this, Level.FINE, "Creating buffer for TX %d (threadId=%d)", null, txId, Thread.currentThread().getId());
result = database.getTransactionManager().createTransactionBuffer(txId, pages);
}

return new Pair<>(result, pages);
return new TransactionPhase1(result, pages);

} catch (DuplicatedKeyException | ConcurrentModificationException e) {
rollback();
Expand All @@ -499,7 +514,10 @@ public Pair<Binary, List<MutablePage>> commit1stPhase(final boolean isLeader) {
}
}

public void commit2ndPhase(final Pair<Binary, List<MutablePage>> changes) {
public void commit2ndPhase(final TransactionContext.TransactionPhase1 changes) {
if (changes == null)
return;

if (database.getMode() == PaginatedFile.MODE.READ_ONLY)
throw new TransactionException("Cannot commit changes because the database is open in read-only mode");

Expand All @@ -511,9 +529,9 @@ public void commit2ndPhase(final Pair<Binary, List<MutablePage>> changes) {
final PageManager pageManager = database.getPageManager();

try {
if (changes.getFirst() != null)
if (changes.result != null)
// WRITE TO THE WAL FIRST
database.getTransactionManager().writeTransactionToWAL(changes.getSecond(), walFlush, txId, changes.getFirst());
database.getTransactionManager().writeTransactionToWAL(changes.modifiedPages, walFlush, txId, changes.result);

// AT THIS POINT, LOCK + VERSION CHECK, THERE IS NO NEED TO MANAGE ROLLBACK BECAUSE THERE CANNOT BE CONCURRENT TX THAT UPDATE THE SAME PAGE CONCURRENTLY
// UPDATE PAGE COUNTER FIRST
Expand Down
3 changes: 1 addition & 2 deletions engine/src/main/java/com/arcadedb/index/IndexFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.arcadedb.index.lsm.LSMTreeIndexAbstract;
import com.arcadedb.schema.Type;

import java.io.*;
import java.util.*;

public class IndexFactory {
Expand All @@ -32,7 +31,7 @@ public void register(final String type, final IndexFactoryHandler handler) {

public IndexInternal createIndex(final String indexType, final DatabaseInternal database, final String indexName, final boolean unique, final String filePath,
final PaginatedFile.MODE mode, final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy,
final Index.BuildIndexCallback callback) throws IOException {
final Index.BuildIndexCallback callback) {
final IndexFactoryHandler handler = map.get(indexType);

if (handler == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.arcadedb.index.lsm.LSMTreeIndexAbstract;
import com.arcadedb.schema.Type;

import java.io.*;

public interface IndexFactoryHandler {
IndexInternal create(DatabaseInternal database, String name, boolean unique, String filePath, PaginatedFile.MODE mode, Type[] keyTypes, int pageSize,
LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback) throws IOException;
LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, Index.BuildIndexCallback callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public class LSMTreeFullTextIndex implements Index, IndexInternal {
public static class IndexFactoryHandler implements com.arcadedb.index.IndexFactoryHandler {
@Override
public IndexInternal create(final DatabaseInternal database, final String name, final boolean unique, final String filePath, final PaginatedFile.MODE mode,
final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback)
throws IOException {
final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback) {
return new LSMTreeFullTextIndex(database, name, filePath, mode, pageSize, callback);
}
}
Expand All @@ -85,12 +84,8 @@ public PaginatedComponent createOnLoad(final DatabaseInternal database, final St
*/
public LSMTreeFullTextIndex(final DatabaseInternal database, final String name, final String filePath, final PaginatedFile.MODE mode, final int pageSize,
final BuildIndexCallback callback) {
try {
analyzer = new StandardAnalyzer();
underlyingIndex = new LSMTreeIndex(database, name, false, filePath, mode, new Type[] { Type.STRING }, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.ERROR);
} catch (IOException e) {
throw new IndexException("Cannot create search engine (error=" + e + ")", e);
}
analyzer = new StandardAnalyzer();
underlyingIndex = new LSMTreeIndex(database, name, false, filePath, mode, new Type[] { Type.STRING }, pageSize, LSMTreeIndexAbstract.NULL_STRATEGY.ERROR);
}

/**
Expand Down
13 changes: 8 additions & 5 deletions engine/src/main/java/com/arcadedb/index/lsm/LSMTreeIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public class LSMTreeIndex implements RangeIndex, IndexInternal {
public static class IndexFactoryHandler implements com.arcadedb.index.IndexFactoryHandler {
@Override
public IndexInternal create(final DatabaseInternal database, final String name, final boolean unique, final String filePath, final PaginatedFile.MODE mode,
final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback)
throws IOException {
final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy, final BuildIndexCallback callback) {
return new LSMTreeIndex(database, name, unique, filePath, mode, keyTypes, pageSize, nullStrategy);
}
}
Expand Down Expand Up @@ -99,9 +98,13 @@ public PaginatedComponent createOnLoad(final DatabaseInternal database, final St
* Called at creation time.
*/
public LSMTreeIndex(final DatabaseInternal database, final String name, final boolean unique, String filePath, final PaginatedFile.MODE mode,
final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy) throws IOException {
this.name = name;
this.mutable = new LSMTreeIndexMutable(this, database, name, unique, filePath, mode, keyTypes, pageSize, nullStrategy);
final Type[] keyTypes, final int pageSize, final LSMTreeIndexAbstract.NULL_STRATEGY nullStrategy) {
try {
this.name = name;
this.mutable = new LSMTreeIndexMutable(this, database, name, unique, filePath, mode, keyTypes, pageSize, nullStrategy);
} catch (IOException e) {
throw new IndexException("Error on creating index '" + name + "'", e);
}
}

/**
Expand Down
4 changes: 3 additions & 1 deletion engine/src/main/java/com/arcadedb/log/DefaultLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ public void installCustomFormatter() {
}
}

public void log(final Object requester, final Level level, String message, final Throwable exception, final String context, final Object arg1,
public void log(final Object requester, Level level, String message, final Throwable exception, final String context, final Object arg1,
final Object arg2, final Object arg3, final Object arg4, final Object arg5, final Object arg6, final Object arg7, final Object arg8, final Object arg9,
final Object arg10, final Object arg11, final Object arg12, final Object arg13, final Object arg14, final Object arg15, final Object arg16,
final Object arg17) {
if (message == null)
return;

//level = Level.SEVERE;

final String requesterName;
if (requester instanceof String)
requesterName = (String) requester;
Expand Down
8 changes: 8 additions & 0 deletions engine/src/main/java/com/arcadedb/query/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import java.util.*;

public interface QueryEngine {
interface AnalyzedQuery {
boolean isIdempotent();

boolean isDDL();
}

interface QueryEngineFactory {
boolean isAvailable();

Expand All @@ -29,6 +35,8 @@ interface QueryEngineFactory {
QueryEngine getInstance(DatabaseInternal database);
}

AnalyzedQuery analyze(String query);

ResultSet query(String query, Map<String, Object> parameters);

ResultSet query(String query, Object... parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,18 @@
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultInternal;
import com.arcadedb.query.sql.executor.ResultSet;
import java.lang.reflect.InvocationTargetException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.lang.reflect.*;
import java.util.*;
import java.util.logging.*;
import java.util.stream.*;

public class CypherQueryEngine implements QueryEngine {
private static final String ENGINE_NAME = "cypher-engine";
private final Object arcadeGraph;
private final Object arcadeGraph;

public static class CypherQueryEngineFactory implements QueryEngineFactory {
private static Boolean available = null;
private static Boolean available = null;
private static Class<?> arcadeGraphClass;
private static Class<?> arcadeCypherClass;

Expand Down Expand Up @@ -84,6 +81,21 @@ protected CypherQueryEngine(final Object arcadeGraph) {
this.arcadeGraph = arcadeGraph;
}

@Override
public AnalyzedQuery analyze(String query) {
return new AnalyzedQuery() {
@Override
public boolean isIdempotent() {
return false;
}

@Override
public boolean isDDL() {
return false;
}
};
}

@Override
public ResultSet query(final String query, final Map<String, Object> parameters) {
return command(query, parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,20 @@ public ResultSet command(final String query, final Object... parameters) {
map.put((String) parameters[i], parameters[i + 1]);
return command(query, map);
}

@Override
public AnalyzedQuery analyze(String query) {
return new AnalyzedQuery() {
@Override
public boolean isIdempotent() {
return false;
}

@Override
public boolean isDDL() {
return false;
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ protected MongoQueryEngine(final Object mongoDBWrapper) {
this.mongoDBWrapper = mongoDBWrapper;
}

@Override
public AnalyzedQuery analyze(String query) {
return new AnalyzedQuery() {
@Override
public boolean isIdempotent() {
return false;
}

@Override
public boolean isDDL() {
return false;
}
};
}

@Override
public ResultSet query(final String query, final Map<String, Object> parameters) {
try {
Expand Down
16 changes: 16 additions & 0 deletions engine/src/main/java/com/arcadedb/query/sql/SQLQueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,20 @@ public ResultSet command(String query, Object... parameters) {

return statement.execute(database, parameters);
}

@Override
public AnalyzedQuery analyze(final String query) {
final Statement statement = SQLEngine.parse(query, database);
return new AnalyzedQuery() {
@Override
public boolean isIdempotent() {
return statement.isIdempotent();
}

@Override
public boolean isDDL() {
return statement.isDDL();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public void reset() {
this.next = 0;
}

@Override
public long estimateSize() {
return content.size();
}

public InternalResultSet copy() {
final InternalResultSet copy = new InternalResultSet();
copy.content = this.content;
Expand Down
Loading