Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntityCore;
Expand Down Expand Up @@ -69,7 +68,7 @@ public String buildKey(T value) {
* @param key key for that value
*/
public T read(String key) {
TreeMapMetaStore.this.ensureReadTr();
ensureReadTr();
T value = this.slice.getOrDefault(key, null);
return (value != null) ? this.copyRecord.apply(value) : null;
}
Expand All @@ -80,7 +79,7 @@ public T read(String key) {
* @param prefix key prefix
*/
public List<T> readRange(String prefix) {
TreeMapMetaStore.this.ensureReadTr();
ensureReadTr();
if (prefix.isEmpty()) {
return new ArrayList<>(this.slice.values());
}
Expand All @@ -100,7 +99,7 @@ public List<T> readRange(String prefix) {
* @param value value to write
*/
public void write(T value) {
TreeMapMetaStore.this.ensureReadWriteTr();
ensureReadWriteTr();
T valueToWrite = (value != null) ? this.copyRecord.apply(value) : null;
String key = this.buildKey(valueToWrite);
// write undo if needs be
Expand All @@ -116,7 +115,7 @@ public void write(T value) {
* @param key key for the record to remove
*/
public void delete(String key) {
TreeMapMetaStore.this.ensureReadWriteTr();
ensureReadWriteTr();
if (slice.containsKey(key)) {
// write undo if needs be
if (!this.undoSlice.containsKey(key)) {
Expand All @@ -132,15 +131,15 @@ public void delete(String key) {
* @param prefix key prefix for the record to remove
*/
public void deleteRange(String prefix) {
TreeMapMetaStore.this.ensureReadWriteTr();
ensureReadWriteTr();
List<T> elements = this.readRange(prefix);
for (T element : elements) {
this.delete(element);
}
}

void deleteAll() {
TreeMapMetaStore.this.ensureReadWriteTr();
ensureReadWriteTr();
slice.clear();
undoSlice.clear();
}
Expand All @@ -156,7 +155,7 @@ public void delete(T value) {

/** Rollback all changes made to this slice since transaction started */
private void rollback() {
TreeMapMetaStore.this.ensureReadWriteTr();
ensureReadWriteTr();
undoSlice.forEach(
(key, value) -> {
if (value == null) {
Expand Down Expand Up @@ -194,6 +193,7 @@ public boolean isWrite() {
private Transaction tr;

// diagnostic services
private final PolarisDiagnostics initialDiagnosticServices;
private PolarisDiagnostics diagnosticServices;

// all entities
Expand Down Expand Up @@ -301,6 +301,7 @@ public TreeMapMetaStore(@Nonnull PolarisDiagnostics diagnostics) {
policyMappingRecord.getTargetId()),
PolarisPolicyMappingRecord::new);

this.initialDiagnosticServices = diagnostics;
// no transaction open yet
this.diagnosticServices = diagnostics;
this.tr = null;
Expand Down Expand Up @@ -397,7 +398,7 @@ void rollback() {
}

/** Ensure that a read/write FDB transaction has been started */
public void ensureReadWriteTr() {
private void ensureReadWriteTr() {
this.diagnosticServices.check(
this.tr != null && this.tr.isWrite(), "no_write_transaction_started");
}
Expand All @@ -410,18 +411,16 @@ private void ensureReadTr() {
/**
* Run inside a read/write transaction
*
* @param callCtx call context to use
* @param transactionCode transaction code
* @return the result of the execution
*/
public <T> T runInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Supplier<T> transactionCode) {
@Nonnull PolarisDiagnostics diagnostics, @Nonnull Supplier<T> transactionCode) {

synchronized (lock) {
// execute transaction
try {
// init diagnostic services
this.diagnosticServices = callCtx.getDiagServices();
this.diagnosticServices = diagnostics;
this.startWriteTransaction();
return transactionCode.get();
} catch (Throwable e) {
Expand All @@ -431,26 +430,21 @@ public <T> T runInTransaction(
throw e;
} finally {
this.tr = null;
this.diagnosticServices = null;
this.diagnosticServices = this.initialDiagnosticServices;
}
}
}

/**
* Run inside a read/write transaction
*
* @param callCtx call context to use
* @param transactionCode transaction code
*/
/** Run inside a read/write transaction */
public void runActionInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Runnable transactionCode) {
@Nonnull PolarisDiagnostics diagnostics, @Nonnull Runnable transactionCode) {

synchronized (lock) {

// execute transaction
try {
// init diagnostic services
this.diagnosticServices = callCtx.getDiagServices();
this.diagnosticServices = diagnostics;
this.startWriteTransaction();
transactionCode.run();
} catch (Throwable e) {
Expand All @@ -460,54 +454,47 @@ public void runActionInTransaction(
throw e;
} finally {
this.tr = null;
this.diagnosticServices = null;
this.diagnosticServices = this.initialDiagnosticServices;
}
}
}

/**
* Run inside a read only transaction
*
* @param callCtx call context to use
* @param transactionCode transaction code
* @return the result of the execution
*/
public <T> T runInReadTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Supplier<T> transactionCode) {
@Nonnull PolarisDiagnostics diagnostics, @Nonnull Supplier<T> transactionCode) {
synchronized (lock) {

// execute transaction
try {
// init diagnostic services
this.diagnosticServices = callCtx.getDiagServices();
this.diagnosticServices = diagnostics;
this.startReadTransaction();
return transactionCode.get();
} finally {
this.tr = null;
this.diagnosticServices = null;
this.diagnosticServices = this.initialDiagnosticServices;
Comment on lines -487 to +479
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This aspect is a behavior change, but it seems to be a pretty reasonable one

}
}
}

/**
* Run inside a read only transaction
*
* @param callCtx call context to use
* @param transactionCode transaction code
*/
/** Run inside a read only transaction */
public void runActionInReadTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Runnable transactionCode) {
@Nonnull PolarisDiagnostics diagnostics, @Nonnull Runnable transactionCode) {
synchronized (lock) {

// execute transaction
try {
// init diagnostic services
this.diagnosticServices = callCtx.getDiagServices();
this.diagnosticServices = diagnostics;
this.startReadTransaction();
transactionCode.run();
} finally {
this.tr = null;
this.diagnosticServices = null;
this.diagnosticServices = this.initialDiagnosticServices;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public <T> T runInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Supplier<T> transactionCode) {

// run transaction on our underlying store
return store.runInTransaction(callCtx, transactionCode);
return store.runInTransaction(callCtx.getDiagServices(), transactionCode);
}

/** {@inheritDoc} */
Expand All @@ -88,15 +88,15 @@ public void runActionInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Runnable transactionCode) {

// run transaction on our underlying store
store.runActionInTransaction(callCtx, transactionCode);
store.runActionInTransaction(callCtx.getDiagServices(), transactionCode);
}

/** {@inheritDoc} */
@Override
public <T> T runInReadTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Supplier<T> transactionCode) {
// run transaction on our underlying store
return store.runInReadTransaction(callCtx, transactionCode);
return store.runInReadTransaction(callCtx.getDiagServices(), transactionCode);
}

/** {@inheritDoc} */
Expand All @@ -105,7 +105,7 @@ public void runActionInReadTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Runnable transactionCode) {

// run transaction on our underlying store
store.runActionInReadTransaction(callCtx, transactionCode);
store.runActionInReadTransaction(callCtx.getDiagServices(), transactionCode);
}

/**
Expand Down