Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.nio.file.Path;
import java.time.Clock;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.LocalPolarisMetaStoreManagerFactory;
Expand All @@ -45,13 +46,14 @@ public class EclipseLinkPolarisMetaStoreManagerFactory
@Inject EclipseLinkConfiguration eclipseLinkConfiguration;
@Inject PolarisStorageIntegrationProvider storageIntegrationProvider;

@SuppressWarnings("unused") // Required by CDI
protected EclipseLinkPolarisMetaStoreManagerFactory() {
this(null);
this(null, null);
}

@Inject
protected EclipseLinkPolarisMetaStoreManagerFactory(PolarisDiagnostics diagnostics) {
super(diagnostics);
protected EclipseLinkPolarisMetaStoreManagerFactory(Clock clock, PolarisDiagnostics diagnostics) {
super(clock, diagnostics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest;
Expand Down Expand Up @@ -89,14 +87,10 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
PolarisEclipseLinkMetaStoreSessionImpl session =
new PolarisEclipseLinkMetaStoreSessionImpl(
store, Mockito.mock(), realmContext, null, "polaris", RANDOM_SECRETS);
return new PolarisTestMetaStoreManager(
new TransactionalMetaStoreManagerImpl(),
new PolarisCallContext(
realmContext,
session,
diagServices,
new PolarisConfigurationStore() {},
timeSource.withZone(ZoneId.systemDefault())));
TransactionalMetaStoreManagerImpl metaStoreManager =
new TransactionalMetaStoreManagerImpl(clock);
PolarisCallContext callCtx = new PolarisCallContext(realmContext, session, diagServices);
return new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.sql.SQLException;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory {
final Map<String, EntityCache> entityCacheMap = new HashMap<>();
final Map<String, Supplier<BasePersistence>> sessionSupplierMap = new HashMap<>();

@Inject Clock clock;
@Inject PolarisDiagnostics diagnostics;
@Inject PolarisStorageIntegrationProvider storageIntegrationProvider;
@Inject Instance<DataSource> dataSource;
Expand All @@ -85,7 +87,7 @@ protected PrincipalSecretsGenerator secretsGenerator(
}

protected PolarisMetaStoreManager createNewMetaStoreManager() {
return new AtomicOperationMetaStoreManager();
return new AtomicOperationMetaStoreManager(clock);
}

private void initializeForRealm(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@

import java.io.InputStream;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Optional;
import javax.sql.DataSource;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager;
import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest;
Expand Down Expand Up @@ -71,14 +69,10 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
Mockito.mock(),
realmContext.getRealmIdentifier(),
schemaVersion);
return new PolarisTestMetaStoreManager(
new AtomicOperationMetaStoreManager(),
new PolarisCallContext(
realmContext,
basePersistence,
diagServices,
new PolarisConfigurationStore() {},
timeSource.withZone(ZoneId.systemDefault())));
AtomicOperationMetaStoreManager metaStoreManager = new AtomicOperationMetaStoreManager(clock);
PolarisCallContext callCtx =
new PolarisCallContext(realmContext, basePersistence, diagServices);
return new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
}

private static class H2JdbcConfiguration implements RelationalJdbcConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.polaris.core;

import jakarta.annotation.Nonnull;
import java.time.Clock;
import java.time.ZoneId;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.config.RealmConfigImpl;
Expand All @@ -36,42 +34,28 @@ public class PolarisCallContext implements CallContext {

// meta store which is used to persist Polaris entity metadata
private final BasePersistence metaStore;

// diag services
private final PolarisDiagnostics diagServices;

private final PolarisConfigurationStore configurationStore;

private final Clock clock;

private final RealmContext realmContext;

private final RealmConfig realmConfig;

public PolarisCallContext(
@Nonnull RealmContext realmContext,
@Nonnull BasePersistence metaStore,
@Nonnull PolarisDiagnostics diagServices,
@Nonnull PolarisConfigurationStore configurationStore,
@Nonnull Clock clock) {
@Nonnull PolarisConfigurationStore configurationStore) {
this.realmContext = realmContext;
this.metaStore = metaStore;
this.diagServices = diagServices;
this.configurationStore = configurationStore;
this.clock = clock;
this.realmConfig = new RealmConfigImpl(this.configurationStore, this.realmContext);
}

public PolarisCallContext(
@Nonnull RealmContext realmContext,
@Nonnull BasePersistence metaStore,
@Nonnull PolarisDiagnostics diagServices) {
this(
realmContext,
metaStore,
diagServices,
new PolarisConfigurationStore() {},
Clock.system(ZoneId.systemDefault()));
this(realmContext, metaStore, diagServices, new PolarisConfigurationStore() {});
}

public BasePersistence getMetaStore() {
Expand All @@ -82,10 +66,6 @@ public PolarisDiagnostics getDiagServices() {
return diagServices;
}

public Clock getClock() {
return clock;
}

@Override
public RealmContext getRealmContext() {
return realmContext;
Expand All @@ -111,6 +91,6 @@ public PolarisCallContext copy() {
String realmId = this.realmContext.getRealmIdentifier();
RealmContext realmContext = () -> realmId;
return new PolarisCallContext(
realmContext, this.metaStore, this.diagServices, this.configurationStore, this.clock);
realmContext, this.metaStore, this.diagServices, this.configurationStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -84,6 +85,12 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(AtomicOperationMetaStoreManager.class);

private final Clock clock;

public AtomicOperationMetaStoreManager(Clock clock) {
this.clock = clock;
}

/**
* Persist the specified new entity.
*
Expand Down Expand Up @@ -1234,7 +1241,7 @@ private void revokeGrantRecord(
.name("entityCleanup_" + entityToDrop.getId())
.typeCode(PolarisEntityType.TASK.getCode())
.subTypeCode(PolarisEntitySubType.NULL_SUBTYPE.getCode())
.createTimestamp(callCtx.getClock().millis());
.createTimestamp(clock.millis());
if (cleanupProperties != null) {
taskEntityBuilder.internalProperties(
PolarisObjectMapperUtil.serializeProperties(cleanupProperties));
Expand Down Expand Up @@ -1508,7 +1515,7 @@ private void revokeGrantRecord(
PolarisTaskConstants.TASK_TIMEOUT_MILLIS);
return taskState == null
|| taskState.executor == null
|| callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout;
|| clock.millis() - taskState.lastAttemptStartTime > taskAgeTimeout;
},
Function.identity(),
pageToken);
Expand All @@ -1524,8 +1531,7 @@ private void revokeGrantRecord(
PolarisObjectMapperUtil.deserializeProperties(task.getProperties());
properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId);
properties.put(
PolarisTaskConstants.LAST_ATTEMPT_START_TIME,
String.valueOf(callCtx.getClock().millis()));
PolarisTaskConstants.LAST_ATTEMPT_START_TIME, String.valueOf(clock.millis()));
properties.put(
PolarisTaskConstants.ATTEMPT_COUNT,
String.valueOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -56,9 +57,12 @@ public abstract class LocalPolarisMetaStoreManagerFactory<StoreType>
private static final Logger LOGGER =
LoggerFactory.getLogger(LocalPolarisMetaStoreManagerFactory.class);

private final Clock clock;
private final PolarisDiagnostics diagnostics;

protected LocalPolarisMetaStoreManagerFactory(@Nonnull PolarisDiagnostics diagnostics) {
protected LocalPolarisMetaStoreManagerFactory(
@Nonnull Clock clock, @Nonnull PolarisDiagnostics diagnostics) {
this.clock = clock;
this.diagnostics = diagnostics;
}

Expand All @@ -84,8 +88,8 @@ protected PrincipalSecretsGenerator secretsGenerator(
* Subclasses can override this to inject different implementations of PolarisMetaStoreManager
* into the existing realm-based setup flow.
*/
protected PolarisMetaStoreManager createNewMetaStoreManager() {
return new TransactionalMetaStoreManagerImpl();
protected PolarisMetaStoreManager createNewMetaStoreManager(Clock clock) {
return new TransactionalMetaStoreManagerImpl(clock);
}

private void initializeForRealm(
Expand All @@ -95,7 +99,7 @@ private void initializeForRealm(
realmContext.getRealmIdentifier(),
() -> createMetaStoreSession(backingStore, realmContext, rootCredentialsSet, diagnostics));

PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager();
PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(clock);
metaStoreManagerMap.put(realmContext.getRealmIdentifier(), metaStoreManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -90,6 +91,12 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(TransactionalMetaStoreManagerImpl.class);

private final Clock clock;

public TransactionalMetaStoreManagerImpl(Clock clock) {
this.clock = clock;
}

/**
* A version of BaseMetaStoreManager::persistNewEntity but instead of calling the one-shot
* immediate-peristence APIs of BasePersistence, expects to be run under an outer
Expand Down Expand Up @@ -1437,7 +1444,7 @@ private void bootstrapPolarisService(
.name("entityCleanup_" + entityToDrop.getId())
.typeCode(PolarisEntityType.TASK.getCode())
.subTypeCode(PolarisEntitySubType.NULL_SUBTYPE.getCode())
.createTimestamp(callCtx.getClock().millis())
.createTimestamp(clock.millis())
.properties(PolarisObjectMapperUtil.serializeProperties(properties));
if (cleanupProperties != null) {
taskEntityBuilder.internalProperties(
Expand Down Expand Up @@ -1964,7 +1971,7 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
PolarisTaskConstants.TASK_TIMEOUT_MILLIS);
return taskState == null
|| taskState.executor == null
|| callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout;
|| clock.millis() - taskState.lastAttemptStartTime > taskAgeTimeout;
},
Function.identity(),
pageToken);
Expand All @@ -1978,8 +1985,7 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
PolarisObjectMapperUtil.deserializeProperties(task.getProperties());
properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId);
properties.put(
PolarisTaskConstants.LAST_ATTEMPT_START_TIME,
String.valueOf(callCtx.getClock().millis()));
PolarisTaskConstants.LAST_ATTEMPT_START_TIME, String.valueOf(clock.millis()));
properties.put(
PolarisTaskConstants.ATTEMPT_COUNT,
String.valueOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS;

import java.time.ZoneId;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore;
import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl;
import org.mockito.Mockito;
Expand All @@ -35,14 +33,12 @@ public class PolarisTreeMapAtomicOperationMetaStoreManagerTest
public PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
AtomicOperationMetaStoreManager metaStoreManager = new AtomicOperationMetaStoreManager(clock);
PolarisCallContext callCtx =
new PolarisCallContext(
() -> "testRealm",
new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS),
diagServices,
new PolarisConfigurationStore() {},
timeSource.withZone(ZoneId.systemDefault()));

return new PolarisTestMetaStoreManager(new AtomicOperationMetaStoreManager(), callCtx);
diagServices);
return new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS;

import java.time.ZoneId;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl;
import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore;
import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl;
Expand All @@ -35,14 +33,13 @@ public class PolarisTreeMapMetaStoreManagerTest extends BasePolarisMetaStoreMana
public PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
TransactionalMetaStoreManagerImpl metaStoreManager =
new TransactionalMetaStoreManagerImpl(clock);
PolarisCallContext callCtx =
new PolarisCallContext(
() -> "testRealm",
new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS),
diagServices,
new PolarisConfigurationStore() {},
timeSource.withZone(ZoneId.systemDefault()));

return new PolarisTestMetaStoreManager(new TransactionalMetaStoreManagerImpl(), callCtx);
diagServices);
return new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
}
}
Loading