Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ void createSchemaWithDynamicSqlScript() {
""");

Schema schema = database.getSchema();
System.out.println("schema.toString() = " + schema.toString());
assertThat(schema.existsType("V1")).isTrue();
assertThat(schema.existsType("V2")).isTrue();
assertThat(schema.existsType("V3")).isTrue();
Expand Down Expand Up @@ -234,7 +233,7 @@ void multipleInsert() throws Exception {

@Test
@Disabled
void multipleInsertBAtched() throws Exception {
void multipleInsertBatched() throws Exception {
database.command("sqlscript", """
create vertex type `TEXT_EMBEDDING` if not exists;
create property TEXT_EMBEDDING.str if not exists STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.arcadedb.exception.RecordNotFoundException;
import com.arcadedb.exception.TimeoutException;
import com.arcadedb.exception.TransactionException;
import com.arcadedb.log.LogManager;
import com.arcadedb.query.sql.executor.InternalResultSet;
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultInternal;
Expand Down Expand Up @@ -93,9 +94,6 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import com.arcadedb.log.LogManager;
import java.util.logging.Level;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand All @@ -120,6 +118,8 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Collectors;

/**
* Remote Database implementation using gRPC protocol instead of HTTP. Extends
Expand All @@ -131,7 +131,6 @@
*/
public class RemoteGrpcDatabase extends RemoteDatabase {


private final ArcadeDbServiceGrpc.ArcadeDbServiceBlockingV2Stub blockingStub;
private final ArcadeDbServiceGrpc.ArcadeDbServiceStub asyncStub;
private final RemoteSchema schema;
Expand Down Expand Up @@ -282,7 +281,8 @@ public void commit() {
CommitTransactionResponse response = blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS)
.commitTransaction(request);

LogManager.instance().log(this, Level.FINE, "[After commit] Success: %s Committed: %s", response.getSuccess(), response.getCommitted());
LogManager.instance()
.log(this, Level.FINE, "[After commit] Success: %s Committed: %s", response.getSuccess(), response.getCommitted());

if (!response.getSuccess()) {
throw new TransactionException("Failed to commit transaction: " + response.getMessage());
Expand Down Expand Up @@ -332,7 +332,8 @@ public void rollback() {
RollbackTransactionResponse response = blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS)
.rollbackTransaction(request);

LogManager.instance().log(this, Level.FINE, "[After rollback] Success: %s Committed: %s", response.getSuccess(), response.getRolledBack());
LogManager.instance()
.log(this, Level.FINE, "[After rollback] Success: %s Committed: %s", response.getSuccess(), response.getRolledBack());

if (!response.getSuccess()) {
throw new TransactionException("Failed to rollback transaction: " + response.getMessage());
Expand Down Expand Up @@ -369,7 +370,8 @@ public void deleteRecord(final Record record) {

try {
if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT deleteRecord: db=%s, tx=%s, rid=%s", getName(), (transactionId != null), record.getIdentity());
LogManager.instance().log(this, Level.FINE, "CLIENT deleteRecord: db=%s, tx=%s, rid=%s", getName(), (transactionId != null),
record.getIdentity());
}

final DeleteRecordResponse resp = callUnary("DeleteRecord",
Expand Down Expand Up @@ -397,7 +399,9 @@ public boolean deleteRecord(final String rid, final long timeoutMs) {

try {
if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT deleteRecord: db=%s, tx=%s, rid=%s, timeoutMs=%s", getName(), (transactionId != null), rid, timeoutMs);
LogManager.instance()
.log(this, Level.FINE, "CLIENT deleteRecord: db=%s, tx=%s, rid=%s, timeoutMs=%s", getName(), (transactionId != null),
rid, timeoutMs);
}

final DeleteRecordResponse res = callUnary("DeleteRecord",
Expand Down Expand Up @@ -452,8 +456,12 @@ public ResultSet command(final String language, final String command, final Map<

private ResultSet commandInternal(final String language, final String command, final Map<String, Object> params) {

ExecuteCommandRequest.Builder requestBuilder = ExecuteCommandRequest.newBuilder().setDatabase(getName()).setCommand(command)
.setLanguage(language).setCredentials(buildCredentials());
ExecuteCommandRequest.Builder requestBuilder = ExecuteCommandRequest.newBuilder()
.setDatabase(getName())
.setCommand(command)
.setLanguage(language)
.setReturnRows(true)
.setCredentials(buildCredentials());

if (transactionId != null) {
requestBuilder.setTransaction(TransactionContext.newBuilder().setTransactionId(transactionId).setDatabase(getName()).build());
Expand All @@ -463,15 +471,12 @@ private ResultSet commandInternal(final String language, final String command, f
requestBuilder.putAllParameters(convertParamsToGrpcValue(params));
}

boolean returnRows = true;

requestBuilder.setReturnRows(returnRows);

try {

if (LogManager.instance().isDebugEnabled())
LogManager.instance().log(this, Level.FINE, "CLIENT executeCommand: db=%s, tx=%s, cmdLen=%s, params=%s", getName(), (transactionId != null),
requestBuilder.getCommand().length(), requestBuilder.getParametersCount());
LogManager.instance()
.log(this, Level.FINE, "CLIENT executeCommand: db=%s, tx=%s, cmdLen=%s, params=%s", getName(), (transactionId != null),
requestBuilder.getCommand().length(), requestBuilder.getParametersCount());

final ExecuteCommandResponse response = callUnary("ExecuteCommand",
() -> blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS).executeCommand(requestBuilder.build()));
Expand All @@ -485,21 +490,7 @@ private ResultSet commandInternal(final String language, final String command, f

ResultSet resultSet;

if (returnRows) {
resultSet = createGrpcResultSet(response);
} else {

resultSet = new InternalResultSet();

if (response.getAffectedRecords() > 0) {

Map<String, Object> result = new HashMap<>();
result.put("affected", response.getAffectedRecords());
result.put("executionTime", response.getExecutionTimeMs());

((InternalResultSet) resultSet).add(new ResultInternal(result));
}
}
resultSet = createGrpcResultSet(response);

return resultSet;
} catch (StatusRuntimeException | StatusException e) {
Expand Down Expand Up @@ -562,8 +553,9 @@ public ResultSet query(final String language, final String query, RemoteGrpcConf
try {

if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT executeQuery: db=%s, tx=%s, queryLen=%s, params=%s", getName(), (transactionId != null),
requestBuilder.getQuery().length(), requestBuilder.getParametersCount());
LogManager.instance()
.log(this, Level.FINE, "CLIENT executeQuery: db=%s, tx=%s, queryLen=%s, params=%s", getName(), (transactionId != null),
requestBuilder.getQuery().length(), requestBuilder.getParametersCount());
}

final ExecuteQueryRequest req = requestBuilder.build();
Expand Down Expand Up @@ -998,9 +990,10 @@ public String createRecord(final String cls, final Map<String, Object> props, fi

try {
if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT createRecord: db=%s, txOpen=%s, type=%s, propCount=%s, timeoutMs=%s", getName(),
(transactionId != null),
cls, props.size(), timeoutMs);
LogManager.instance()
.log(this, Level.FINE, "CLIENT createRecord: db=%s, txOpen=%s, type=%s, propCount=%s, timeoutMs=%s", getName(),
(transactionId != null),
cls, props.size(), timeoutMs);
}

final CreateRecordResponse res = callUnary("CreateRecord",
Expand Down Expand Up @@ -1058,9 +1051,11 @@ private boolean updateRecord(final String rid, final PropertiesUpdate partial, f

try {
if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT updateRecord(partial): db=%s, txOpen=%s, rid=%s, timeoutMs=%s", getName(), (transactionId != null),
rid,
timeoutMs);
LogManager.instance()
.log(this, Level.FINE, "CLIENT updateRecord(partial): db=%s, txOpen=%s, rid=%s, timeoutMs=%s", getName(),
(transactionId != null),
rid,
timeoutMs);
}

final UpdateRecordResponse res = callUnary("UpdateRecord",
Expand Down Expand Up @@ -1089,7 +1084,8 @@ private boolean updateRecordFull(final String rid, final GrpcRecord record, fina

try {
if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT updateRecord(full): db=%s, txOpen=%s, rid=%s, timeoutMs=%s", getName(), (transactionId != null), rid,
LogManager.instance().log(this, Level.FINE, "CLIENT updateRecord(full): db=%s, txOpen=%s, rid=%s, timeoutMs=%s", getName(),
(transactionId != null), rid,
timeoutMs);
}

Expand Down Expand Up @@ -1156,9 +1152,10 @@ public Record lookupByRID(final RID rid, final boolean loadContent) {

try {
if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT lookupByRID: db=%s, txOpen=%s, rid=%s, loadContent=%s, timeoutMs=%s", getName(),
(transactionId != null),
rid, loadContent, getTimeout());
LogManager.instance()
.log(this, Level.FINE, "CLIENT lookupByRID: db=%s, txOpen=%s, rid=%s, loadContent=%s, timeoutMs=%s", getName(),
(transactionId != null),
rid, loadContent, getTimeout());
}

final LookupByRidResponse resp = callUnary("LookupByRid",
Expand Down Expand Up @@ -1220,7 +1217,9 @@ public InsertSummary insertBulk(final InsertOptions options, final List<GrpcReco

try {
if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT insertBulk: rows=%s, timeoutMs=%s, tx=%s", req.getRowsCount(), timeoutMs, (transactionId != null));
LogManager.instance()
.log(this, Level.FINE, "CLIENT insertBulk: rows=%s, timeoutMs=%s, tx=%s", req.getRowsCount(), timeoutMs,
(transactionId != null));
}

// use callUnary so tx cross-thread checks + rpcSeq happen in one place
Expand Down Expand Up @@ -1285,8 +1284,10 @@ public void onCompleted() {
};

if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT ingestStream: db=%s, rows=%s, chunkSize=%s, timeoutMs=%s", getName(), protoRows.size(), chunkSize,
timeoutMs);
LogManager.instance()
.log(this, Level.FINE, "CLIENT ingestStream: db=%s, rows=%s, chunkSize=%s, timeoutMs=%s", getName(), protoRows.size(),
chunkSize,
timeoutMs);
}

// Open the client stream via wrapper (adds deadline, tx checks, unified error
Expand Down Expand Up @@ -1404,8 +1405,10 @@ private <T> InsertSummary ingestBidiCore(final List<T> rows, final InsertOptions
final List<GrpcRecord> protoRows = rows.stream().map(mapper).collect(Collectors.toList());

if (LogManager.instance().isDebugEnabled()) {
LogManager.instance().log(this, Level.FINE, "CLIENT ingestBidi start: rows=%s, chunkSize=%s, maxInflight=%s, timeoutMs=%s", protoRows.size(), chunkSize,
maxInflight, timeoutMs);
LogManager.instance()
.log(this, Level.FINE, "CLIENT ingestBidi start: rows=%s, chunkSize=%s, maxInflight=%s, timeoutMs=%s", protoRows.size(),
chunkSize,
maxInflight, timeoutMs);
}

// --- streaming state
Expand Down Expand Up @@ -1837,7 +1840,8 @@ private Map<String, GrpcValue> convertParamsToGrpcValue(Map<String, Object> para
private Object grpcValueToObject(GrpcValue grpcValue) {
Object out = ProtoUtils.fromGrpcValue(grpcValue);
if (LogManager.instance().isDebugEnabled())
LogManager.instance().log(this, Level.FINE, "CLIENT decode grpcValueToObject: %s -> %s", summarize(grpcValue), summarize(out));
LogManager.instance()
.log(this, Level.FINE, "CLIENT decode grpcValueToObject: %s -> %s", summarize(grpcValue), summarize(out));
return out;
}

Expand Down Expand Up @@ -1961,7 +1965,8 @@ private void logTx(String phase, String rpcOp) {
if (debugTx == null || !LogManager.instance().isDebugEnabled())
return;
TxDebug d = debugTx;
LogManager.instance().log(this, Level.FINE, "TXDBG %s db=%s tx#%s label=%s owner=%s now=%s rpcOp=%s rpcSeq=%s beginSent=%s committed=%s rolledBack=%s", phase,
LogManager.instance().log(this, Level.FINE,
"TXDBG %s db=%s tx#%s label=%s owner=%s now=%s rpcOp=%s rpcSeq=%s beginSent=%s committed=%s rolledBack=%s", phase,
d.dbName,
d.id, d.txLabel, tidName(d.ownerThread), tidName(Thread.currentThread()), rpcOp, d.rpcSeq.get(), d.beginRpcSent,
d.committed,
Expand All @@ -1974,8 +1979,10 @@ void checkCrossThreadUse(String where) {
return;
Thread now = Thread.currentThread();
if (now != d.ownerThread) {
LogManager.instance().log(this, Level.WARNING, "TXDBG CROSS-THREAD %s db=%s tx#%s owner=%s now=%s label=%s (begin site follows)", where, d.dbName, d.id,
tidName(d.ownerThread), tidName(now), d.txLabel, d.beginSite);
LogManager.instance()
.log(this, Level.WARNING, "TXDBG CROSS-THREAD %s db=%s tx#%s owner=%s now=%s label=%s (begin site follows)", where,
d.dbName, d.id,
tidName(d.ownerThread), tidName(now), d.txLabel, d.beginSite);
}
}

Expand Down
Loading
Loading