diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 7bdc40545c..7527029468 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -61,6 +61,10 @@ dependencies { api(project(":polaris-persistence-nosql-inmemory")) api(project(":polaris-persistence-nosql-mongodb")) + api(project(":polaris-persistence-nosql-maintenance-api")) + api(project(":polaris-persistence-nosql-maintenance-cel")) + api(project(":polaris-persistence-nosql-maintenance-spi")) + api(project(":polaris-config-docs-annotations")) api(project(":polaris-config-docs-generator")) diff --git a/codestyle/checkstyle.xml b/codestyle/checkstyle.xml index d3986dc3e7..feda236cdd 100644 --- a/codestyle/checkstyle.xml +++ b/codestyle/checkstyle.xml @@ -43,7 +43,7 @@ - + diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 83438c876b..b3e112a78e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -48,6 +48,7 @@ awssdk-bom = { module = "software.amazon.awssdk:bom", version = "2.38.2" } awaitility = { module = "org.awaitility:awaitility", version = "4.3.0" } azuresdk-bom = { module = "com.azure:azure-sdk-bom", version = "1.3.2" } caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version = "3.2.3" } +cel-bom = { module = "org.projectnessie.cel:cel-bom", version = "0.5.3" } commons-lang3 = { module = "org.apache.commons:commons-lang3", version = "3.19.0" } commons-text = { module = "org.apache.commons:commons-text", version = "1.14.0" } errorprone = { module = "com.google.errorprone:error_prone_core", version = "2.44.0" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 1f7bd19cc5..84a584ae9f 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -73,6 +73,10 @@ polaris-persistence-nosql-benchmark=persistence/nosql/persistence/benchmark polaris-persistence-nosql-standalone=persistence/nosql/persistence/standalone polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextension polaris-persistence-nosql-varint=persistence/nosql/persistence/varint +# persistence / maintenance +polaris-persistence-nosql-maintenance-api=persistence/nosql/persistence/maintenance/api +polaris-persistence-nosql-maintenance-cel=persistence/nosql/persistence/maintenance/retain-cel +polaris-persistence-nosql-maintenance-spi=persistence/nosql/persistence/maintenance/spi # persistence / database specific implementations polaris-persistence-nosql-inmemory=persistence/nosql/persistence/db/inmemory polaris-persistence-nosql-mongodb=persistence/nosql/persistence/db/mongodb diff --git a/persistence/nosql/persistence/maintenance/README.md b/persistence/nosql/persistence/maintenance/README.md new file mode 100644 index 0000000000..d16031b635 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/README.md @@ -0,0 +1,22 @@ + + +Maintenance service, +see [API package javadoc](api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java) +for information. \ No newline at end of file diff --git a/persistence/nosql/persistence/maintenance/api/build.gradle.kts b/persistence/nosql/persistence/maintenance/api/build.gradle.kts new file mode 100644 index 0000000000..27b2d24416 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/api/build.gradle.kts @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris NoSQL persistence maintenance - service interfaces" + +dependencies { + implementation(project(":polaris-persistence-nosql-api")) + compileOnly(project(":polaris-persistence-nosql-realms-api")) + compileOnly(project(":polaris-persistence-nosql-maintenance-spi")) + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + implementation(libs.guava) + + compileOnly(libs.smallrye.config.core) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + compileOnly("com.fasterxml.jackson.core:jackson-databind") +} diff --git a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java new file mode 100644 index 0000000000..8fb0a493f3 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.api; + +import static com.google.common.base.Preconditions.checkState; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import java.time.Duration; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +/** Maintenance service configuration. */ +@ConfigMapping(prefix = "polaris.persistence.maintenance") +@PolarisImmutable +@JsonSerialize(as = ImmutableMaintenanceConfig.class) +@JsonDeserialize(as = ImmutableMaintenanceConfig.class) +public interface MaintenanceConfig { + + long DEFAULT_EXPECTED_REFERENCE_COUNT = 100; + + /** + * Provides the expected number of references in all realms to retain, defaults to {@value + * #DEFAULT_EXPECTED_REFERENCE_COUNT}, must be at least {@code 100}. This value is used as the + * default if no information of a previous maintenance run is present, it is also the minimum + * number of expected references. + */ + @WithDefault("" + DEFAULT_EXPECTED_REFERENCE_COUNT) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalLong expectedReferenceCount(); + + long DEFAULT_EXPECTED_OBJ_COUNT = 100_000; + + /** + * Provides the expected number of objects in all realms to retain, defaults to {@value + * #DEFAULT_EXPECTED_OBJ_COUNT}, must be at least {@code 100000}. This value is used as the + * default if no information of a previous maintenance run is present, it is also the minimum + * number of expected objects. + */ + @WithDefault("" + DEFAULT_EXPECTED_OBJ_COUNT) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalLong expectedObjCount(); + + double DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER = 1.1; + + /** + * Maintenance service sizes the bloom-filters used to hold the identified references and objects + * according to the expression {@code lastRun.numberOfIdentified * countFromLastRunMultiplier}. + * The default is to add 10% to the number of identified items. + */ + @WithDefault("" + DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalDouble countFromLastRunMultiplier(); + + double DEFAULT_INITIALIZED_FPP = 0.00001; + + /** + * False-positive-probability (FPP) used to initialize the bloom-filters for identified references + * and objects. + */ + @WithDefault("" + DEFAULT_INITIALIZED_FPP) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalDouble filterInitializedFpp(); + + double DEFAULT_MAX_ACCEPTABLE_FPP = 0.00005; + + /** + * Expected maximum false-positive-probability (FPP) used to check the bloom-filters for + * identified references and objects. + * + *

If the FPP of a bloom filter exceeds this value, no individual references or objects will be + * purged. + */ + @WithDefault("" + DEFAULT_MAX_ACCEPTABLE_FPP) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalDouble maxAcceptableFilterFpp(); + + int DEFAULT_RETAINED_RUNS = 50; + + /** + * Number of retained {@linkplain MaintenanceRunInformation maintenance run objects}, must be at + * least {@code 2}. + */ + @WithDefault("" + DEFAULT_RETAINED_RUNS) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalInt retainedRuns(); + + String DEFAULT_CREATED_AT_GRACE_TIME_STRING = "PT3H"; + Duration DEFAULT_CREATED_AT_GRACE_TIME = Duration.parse(DEFAULT_CREATED_AT_GRACE_TIME_STRING); + + /** + * Objects and references that have been created after a maintenance run has started are + * never purged. This option defines an additional grace time to when the maintenance run has + * started. + * + *

This value is a safety net for two reasons: + * + *

+ */ + @WithDefault(DEFAULT_CREATED_AT_GRACE_TIME_STRING) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @JsonFormat(shape = JsonFormat.Shape.STRING) + Optional createdAtGraceTime(); + + /** + * Optionally limit the number of objects scanned per second. Default is to not throttle object + * scanning. + */ + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalInt objectScanRateLimitPerSecond(); + + /** + * Optionally limit the number of references scanned per second. + * + *

Default is to not throttle reference scanning. + */ + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalInt referenceScanRateLimitPerSecond(); + + int DEFAULT_DELETE_BATCH_SIZE = 10; + + /** Size of the delete-batches when purging objects. */ + @WithDefault("" + DEFAULT_DELETE_BATCH_SIZE) + @JsonInclude(JsonInclude.Include.NON_ABSENT) + OptionalInt deleteBatchSize(); + + static ImmutableMaintenanceConfig.Builder builder() { + return ImmutableMaintenanceConfig.builder(); + } + + @Value.Check + default void check() { + expectedReferenceCount() + .ifPresent(v -> checkState(v > 0, "expectedReferenceCount must be positive")); + expectedObjCount().ifPresent(v -> checkState(v > 0, "expectedObjCount must be positive")); + countFromLastRunMultiplier() + .ifPresent(v -> checkState(v > 1d, "countFromLastRunMultiplier must be greater than 1.0d")); + filterInitializedFpp() + .ifPresent( + v -> checkState(v > 0d && v <= 1d, "filterInitializedFpp must be > 0.0d and <= 1.0d")); + maxAcceptableFilterFpp() + .ifPresent( + v -> + checkState(v > 0d && v <= 1d, "maxAcceptableFilterFpp must be > 0.0d and <= 1.0d")); + retainedRuns().ifPresent(v -> checkState(v >= 2, "retainedRuns must 2 or greater")); + createdAtGraceTime() + .ifPresent(v -> checkState(!v.isNegative(), "createdAtGraceTime must not be negative")); + objectScanRateLimitPerSecond() + .ifPresent(v -> checkState(v >= 0, "objectScanRateLimitPerSecond must not be negative")); + referenceScanRateLimitPerSecond() + .ifPresent(v -> checkState(v >= 0, "referenceScanRateLimitPerSecond must not be negative")); + deleteBatchSize().ifPresent(v -> checkState(v > 0, "deleteBatchSize must be positive")); + } +} diff --git a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunInformation.java b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunInformation.java new file mode 100644 index 0000000000..834ec8d081 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunInformation.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.api; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.nosql.api.backend.Backend; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier; +import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector; +import org.immutables.value.Value; + +@PolarisImmutable +@JsonSerialize(as = ImmutableMaintenanceRunInformation.class) +@JsonDeserialize(as = ImmutableMaintenanceRunInformation.class) +public interface MaintenanceRunInformation { + + Instant started(); + + @JsonFormat(shape = JsonFormat.Shape.STRING) + Optional finished(); + + @Value.Default + default boolean success() { + return false; + } + + /** Human-readable status message. */ + @JsonInclude(JsonInclude.Include.NON_ABSENT) + Optional statusMessage(); + + /** Human-readable detailed information, possibly including technical error information. */ + @JsonInclude(JsonInclude.Include.NON_ABSENT) + Optional detailedInformation(); + + Optional referenceStats(); + + Optional objStats(); + + Map perRealmReferenceStats(); + + Map> perRealmPerObjTypeStats(); + + /** + * Number of realms that were directly purges, if the {@linkplain Backend backend} {@linkplain + * Backend#supportsRealmDeletion() supports} this. + */ + OptionalInt purgedRealms(); + + /** Number of invocations of {@link RetainedCollector#retainObject(ObjRef)}. */ + OptionalLong identifiedObjs(); + + /** Number of invocations of {@link RetainedCollector#retainReference(String)}. */ + OptionalLong identifiedReferences(); + + static ImmutableMaintenanceRunInformation.Builder builder() { + return ImmutableMaintenanceRunInformation.builder(); + } + + @PolarisImmutable + @JsonSerialize(as = ImmutableMaintenanceStats.class) + @JsonDeserialize(as = ImmutableMaintenanceStats.class) + interface MaintenanceStats { + + static ImmutableMaintenanceStats.Builder builder() { + return ImmutableMaintenanceStats.builder(); + } + + /** + * Number of scanned items. + * + *

If a persisted object has been persisted using multiple parts, each part is counted. + */ + OptionalLong scanned(); + + /** + * Number of scanned items that were retained, because those were {@linkplain + * RetainedCollector#retainObject(ObjRef) indicated} to be retained by a {@linkplain + * PerRealmRetainedIdentifier realm identifier} or {@linkplain ObjTypeRetainedIdentifier + * obj-type identifier}. + * + *

If a persisted object has been persisted using multiple parts, each part is counted. + */ + OptionalLong retained(); + + /** + * Number of items that were written after the {@linkplain + * MaintenanceConfig#createdAtGraceTime() calculated grace time}. + * + *

If a persisted object has been persisted using multiple parts, each part is counted. + */ + OptionalLong newer(); + + /** + * Number of scanned items that have been purged. + * + *

If a persisted object has been persisted using multiple parts, each part is counted. + */ + OptionalLong purged(); + } +} diff --git a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunSpec.java b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunSpec.java new file mode 100644 index 0000000000..307f353ffd --- /dev/null +++ b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunSpec.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.api; + +import java.util.Set; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.nosql.api.Realms; +import org.immutables.value.Value; + +/** + * Configures a maintenance run. + * + *

Must specify both the realms to purge and the realms to retain. The two sets are + * distinct to allow certain database specific and implementation detail optimizations. Existing + * data of realms that are in neither of the sets {@linkplain #realmsToPurge() to purge} and + * {@linkplain #realmsToProcess() to process} will be ignored, not processed at all. + * + *

Reserved realms, realm IDs that start with {@code ::}, except {@value Realms#SYSTEM_REALM_ID}, + * are considered to be "special" and are not processed, and all references and objects in those + * realms are retained. + */ +@PolarisImmutable +public interface MaintenanceRunSpec { + Set realmsToPurge(); + + Set realmsToProcess(); + + /** Whether to run maintenance on the system realm, defaults to {@code true}. */ + @Value.Default + default boolean includeSystemRealm() { + return true; + } + + static ImmutableMaintenanceRunSpec.Builder builder() { + return ImmutableMaintenanceRunSpec.builder(); + } +} diff --git a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceService.java b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceService.java new file mode 100644 index 0000000000..d1b1ebe435 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.api; + +import jakarta.annotation.Nonnull; +import java.util.List; +import org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus; + +public interface MaintenanceService { + /** + * Generates a maintenance service run-specification containing realms in states {@link + * RealmStatus#ACTIVE ACTIVE} and {@link RealmStatus#INACTIVE INACTIVE} as "to retain" and realms + * in state {@link RealmStatus#PURGING PURGING} as "to purge". + */ + @Nonnull + MaintenanceRunSpec buildMaintenanceRunSpec(); + + /** + * Perform maintenance. + * + * @param maintenanceRunSpec define the mandatory run-specification, see {@link + * #buildMaintenanceRunSpec()} + * @return information about the maintenance run + */ + @Nonnull + MaintenanceRunInformation performMaintenance(@Nonnull MaintenanceRunSpec maintenanceRunSpec); + + /** + * Retrieve information about recent maintenance runs. The number of available elements is + * configured via {@link MaintenanceConfig#retainedRuns()}. + */ + @Nonnull + List maintenanceRunLog(); +} diff --git a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java new file mode 100644 index 0000000000..7d6088e132 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Maintenance operations include a bunch of tasks that are regularly executed against a backend + * database. + * + *

Types of maintenance operations include: + * + *

+ * + *

Discussion + * + *

Not all databases offer support to perform "prefix key" deletions, which are, for example, + * necessary to purge a whole realm. Some databases do support "deleting a huge number of rows". + * Some have another API for prefix-key deletions, for example, Google's BigTable {@code + * dropRowRange} on the table-admin-client. Relational databases may require different + * configurations with respect to isolation level to run those maintenance operations in a "better" + * way. Some databases do not support such "prefix-key deletions" at all, for example, Apache + * Cassandra or RocksDb or Amazon's DynamoDb. + * + *

{@link org.apache.polaris.persistence.nosql.api.backend.Backend Backend} implementations + * therefore expose whether it can leverage "prefix-key deletions" when one or more realms are to be + * purged. If a {@code Backend} does not support "prefix-key deletions", the whole repository has to + * be scanned. + * + *

Purging unreferenced data + * + *

The other maintenance operations like purging a catalog or unreferenced objects or references + * a two-step approach that works even for large multi-tenant setups: + * + *

    + *
  1. Memoize the current timestamp, subtract some amount to account for expected wall-clock + * drifts. + *
  2. Identify all objects and references that must be retained, memoize those in a probabilistic + * data structure (bloom filter). See below. + *
  3. Scan the whole database to identify the objects and references that were not identified as + * being referenced in the previous step. + *
  4. Delete the unreferenced objects and references if, and only if, their {@link + * org.apache.polaris.persistence.nosql.api.obj.Obj#createdAtMicros() createdAtMicros()} + * timestamp is less than the timestamp memoized in the first step. + *
+ * + *

Identifying objects and references + * + *

Implementations of {@link jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} + * {@link org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier + * PerRealmRetainedIdentifier} are called to identify the references and objects that have to be + * retained for a realm. + * + *

Implementations of {@link jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} + * {@link org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier + * ObjTypeRetainedIdentifier} are called for each identified object of the requested object type. + * + *

Realm status + * + *

The maintenance service implementation will check the current {@linkplain + * org.apache.polaris.persistence.nosql.realms.api.RealmDefinition#status() status} of the realm to + * retain and to purge, that the status is valid for being retained (valid: {@linkplain + * org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#ACTIVE ACTIVE} and + * {@linkplain org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#INACTIVE + * INACTIVE}) and being purged (valid: {@linkplain + * org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#PURGING PURGING}). + * Realms that have been asked to be purged and for which no data has been encountered will be + * state-transitioned to {@linkplain + * org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#PURGED PURGED}. + * + *

System realm {@value org.apache.polaris.persistence.nosql.api.Realms#SYSTEM_REALM_ID} + * + *

The system realm is maintained like every other realm. + * + *

Future export use cases (TBD/TBC) + * + *

These can be useful in a hosted and multi-tenant SaaS environment, when an export of the data + * for a particular realm is requested. + * + *

+ */ +package org.apache.polaris.persistence.nosql.maintenance.api; diff --git a/persistence/nosql/persistence/maintenance/retain-cel/build.gradle.kts b/persistence/nosql/persistence/maintenance/retain-cel/build.gradle.kts new file mode 100644 index 0000000000..f40bd31d34 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/retain-cel/build.gradle.kts @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + alias(libs.plugins.jmh) + id("polaris-server") +} + +description = "Polaris NoSQL persistence maintenance - reference retain check using CEL" + +dependencies { + implementation(project(":polaris-persistence-nosql-api")) + implementation(platform(libs.cel.bom)) + implementation("org.projectnessie.cel:cel-standalone") + implementation(libs.caffeine) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + + testCompileOnly(platform(libs.jackson.bom)) + testCompileOnly("com.fasterxml.jackson.core:jackson-annotations") + + jmhImplementation(libs.jmh.core) + jmhAnnotationProcessor(libs.jmh.generator.annprocess) +} diff --git a/persistence/nosql/persistence/maintenance/retain-cel/src/jmh/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicateBench.java b/persistence/nosql/persistence/maintenance/retain-cel/src/jmh/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicateBench.java new file mode 100644 index 0000000000..480130066b --- /dev/null +++ b/persistence/nosql/persistence/maintenance/retain-cel/src/jmh/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicateBench.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.maintenance.cel; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.time.Duration; +import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj; +import org.apache.polaris.persistence.nosql.api.obj.Obj; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@Warmup(iterations = 4, time = 1000, timeUnit = MILLISECONDS) +@Measurement(iterations = 5, time = 1000, timeUnit = MILLISECONDS) +@Fork(1) +@Threads(4) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(NANOSECONDS) +public class CelReferenceContinuePredicateBench { + + @State(Scope.Benchmark) + public static class BenchmarkParam { + BaseCommitObj commitObj; + + CelReferenceContinuePredicate predicateConstantCondition; + CelReferenceContinuePredicate predicateOneCondition; + CelReferenceContinuePredicate predicateTwoConditions; + + @Setup + public void init() { + commitObj = + new BaseCommitObj() { + @Override + public long seq() { + throw new UnsupportedOperationException(); + } + + @Override + public long[] tail() { + throw new UnsupportedOperationException(); + } + + @Override + public ObjType type() { + throw new UnsupportedOperationException(); + } + + @Override + public long id() { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public String versionToken() { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public Obj withCreatedAtMicros(long createdAt) { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public Obj withNumParts(int numParts) { + throw new UnsupportedOperationException(); + } + }; + predicateConstantCondition = + new CelReferenceContinuePredicate<>("refName", o -> Duration.ZERO, "true"); + predicateOneCondition = + new CelReferenceContinuePredicate<>("refName", o -> Duration.ZERO, "ageDays < 100"); + predicateTwoConditions = + new CelReferenceContinuePredicate<>( + "refName", o -> Duration.ZERO, "ageDays < 100 || commits < 100"); + } + } + + @Benchmark + public boolean constantCondition(BenchmarkParam param) { + return param.predicateConstantCondition.test(param.commitObj); + } + + @Benchmark + public boolean oneCondition(BenchmarkParam param) { + return param.predicateOneCondition.test(param.commitObj); + } + + @Benchmark + public boolean twoConditions(BenchmarkParam param) { + return param.predicateTwoConditions.test(param.commitObj); + } +} diff --git a/persistence/nosql/persistence/maintenance/retain-cel/src/main/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicate.java b/persistence/nosql/persistence/maintenance/retain-cel/src/main/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicate.java new file mode 100644 index 0000000000..cb78536e7d --- /dev/null +++ b/persistence/nosql/persistence/maintenance/retain-cel/src/main/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicate.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.maintenance.cel; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import jakarta.annotation.Nonnull; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.polaris.persistence.nosql.api.Persistence; +import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj; +import org.projectnessie.cel.checker.Decls; +import org.projectnessie.cel.relocated.com.google.api.expr.v1alpha1.Decl; +import org.projectnessie.cel.tools.Script; +import org.projectnessie.cel.tools.ScriptCreateException; +import org.projectnessie.cel.tools.ScriptException; +import org.projectnessie.cel.tools.ScriptHost; +import org.projectnessie.cel.types.jackson.JacksonRegistry; + +/** + * Provides a CEL script based "retain reference commit object" predicate used for {@code + * RetainCollector.refRetain*()} functions. + * + *

The predicate is coded as a CEL script, + * using cel-java. + * + *

Micro benchmarks prove that the CEL scripts execute pretty fast, definitely fast enough to + * justify the flexibility of having scripts in the configuration. + * + *

The scripts have access to the following declared values: + * + *

    + *
  • {@value #VAR_REF} (string) name of the reference + *
  • {@value #VAR_COMMITS} (64-bit int) number of the currently processed commit, starting at + * {@code 1} + *
  • {@value #VAR_AGE_DAYS} (64-bit int) age of currently processed commit in days + *
  • {@value #VAR_AGE_HOURS} (64-bit int) age of currently processed commit in hours + *
  • {@value #VAR_AGE_MINUTES} (64-bit int) age of currently processed commit in minutes + *
+ * + *

Scripts must return a {@code boolean} yielding whether the commit shall be retained. + * Note that maintenance-service implementations can keep the first not-to-be-retained commit. + * + *

Example scripts + * + *

    + *
  • {@code ageDays < 30 || commits <= 10} retains the reference history with at least 10 + * commits and commits that are younger than 30 days + *
  • {@code true} retains the whole reference history + *
  • {@code false} retains the most recent commit + *
+ * + *

A static cache retains up to 100 compiled CEL scripts, each up to 24 hours after its last use. + */ +public class CelReferenceContinuePredicate implements Predicate { + private static final ScriptHost SCRIPT_HOST = + ScriptHost.newBuilder().registry(JacksonRegistry.newRegistry()).build(); + + private static final Cache SCRIPT_CACHE = + Caffeine.newBuilder() + .expireAfterAccess(Duration.ofHours(24)) + .maximumSize(100) + .scheduler(Scheduler.systemScheduler()) + .build(); + + private static final String VAR_REF = "ref"; + private static final String VAR_COMMITS = "commits"; + private static final String VAR_AGE_MINUTES = "ageMinutes"; + private static final String VAR_AGE_HOURS = "ageHours"; + private static final String VAR_AGE_DAYS = "ageDays"; + + private static final List DECLS = + List.of( + Decls.newVar(VAR_REF, Decls.String), + // Decls.Int == 64 bit (aka Java long) + Decls.newVar(VAR_COMMITS, Decls.Int), + Decls.newVar(VAR_AGE_MINUTES, Decls.Int), + Decls.newVar(VAR_AGE_HOURS, Decls.Int), + Decls.newVar(VAR_AGE_DAYS, Decls.Int)); + + static Script createScript(String source) { + try { + return SCRIPT_HOST.buildScript(source).withDeclarations(DECLS).build(); + } catch (ScriptCreateException e) { + throw new RuntimeException(e); + } + } + + static Script getScript(String source) { + return SCRIPT_CACHE.get(source, CelReferenceContinuePredicate::createScript); + } + + private final String refName; + private final Function objAge; + private final Script script; + private long numCommit; + + public CelReferenceContinuePredicate( + @Nonnull String refName, @Nonnull Persistence persistence, @Nonnull String script) { + this(refName, persistence::objAge, script); + } + + public CelReferenceContinuePredicate( + @Nonnull String refName, @Nonnull Function objAge, @Nonnull String script) { + this.refName = refName; + this.objAge = objAge; + this.script = getScript(script); + } + + @Override + public boolean test(O o) { + var age = objAge.apply(o); + var args = + Map.of( + VAR_REF, refName, + VAR_COMMITS, ++numCommit, + VAR_AGE_MINUTES, age.toMinutes(), + VAR_AGE_HOURS, age.toHours(), + VAR_AGE_DAYS, age.toDays()); + try { + return script.execute(Boolean.class, args); + } catch (ScriptException e) { + throw new RuntimeException(e); + } + } +} diff --git a/persistence/nosql/persistence/maintenance/retain-cel/src/test/java/org/apache/polaris/maintenance/cel/TestCelReferenceContinuePredicate.java b/persistence/nosql/persistence/maintenance/retain-cel/src/test/java/org/apache/polaris/maintenance/cel/TestCelReferenceContinuePredicate.java new file mode 100644 index 0000000000..3e3086f4db --- /dev/null +++ b/persistence/nosql/persistence/maintenance/retain-cel/src/test/java/org/apache/polaris/maintenance/cel/TestCelReferenceContinuePredicate.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.maintenance.cel; + +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Stream; +import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.projectnessie.cel.tools.ScriptCreateException; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestCelReferenceContinuePredicate { + @InjectSoftAssertions protected SoftAssertions soft; + + @Test + public void invalidScript() { + soft.assertThatRuntimeException() + .isThrownBy(() -> CelReferenceContinuePredicate.createScript("invalidScript")) + .withCauseInstanceOf(ScriptCreateException.class); + soft.assertThatRuntimeException() + .isThrownBy(() -> CelReferenceContinuePredicate.getScript("invalidScript")) + .withCauseInstanceOf(ScriptCreateException.class); + } + + @Test + public void cacheWorks() { + var s1 = CelReferenceContinuePredicate.getScript("true"); + var s2 = CelReferenceContinuePredicate.getScript("true"); + soft.assertThat(s1).isSameAs(s2); + soft.assertThat(s1).isNotSameAs(CelReferenceContinuePredicate.getScript("false")); + } + + @ParameterizedTest + @MethodSource + public void scripts(String script, boolean[] expected, Iterable commits) { + var predicate = + new CelReferenceContinuePredicate<>( + "testRefName", + (Function) + o -> Duration.ofMillis(TimeUnit.MICROSECONDS.toMillis(o.createdAtMicros())), + script); + var iter = commits.iterator(); + for (int i = 0; i < expected.length; i++) { + var exp = expected[i]; + soft.assertThat(predicate.test(iter.next())) + .describedAs("test at commit %s", i + 1) + .isEqualTo(exp); + } + } + + static Stream scripts() { + var dummy = mock(BaseCommitObj.class); + return Stream.of( + arguments("false", new boolean[] {false, false, false}, List.of(dummy, dummy, dummy)), + arguments( + "ref == 'testRefName'", new boolean[] {true, true, true}, List.of(dummy, dummy, dummy)), + arguments("commits < 3", new boolean[] {true, true, false}, List.of(dummy, dummy, dummy)), + arguments( + "ageMinutes < 100", + new boolean[] {true, true, true, false}, + List.of( + yieldAge(Duration.ofMinutes(0)), + yieldAge(Duration.ofMinutes(1)), + yieldAge(Duration.ofMinutes(99)), + yieldAge(Duration.ofMinutes(100)))), + arguments( + "ageHours < 100", + new boolean[] {true, true, true, false}, + List.of( + yieldAge(Duration.ofHours(0)), + yieldAge(Duration.ofHours(1)), + yieldAge(Duration.ofHours(99)), + yieldAge(Duration.ofHours(100)))), + arguments( + "ageDays < 100", + new boolean[] {true, true, true, false}, + List.of( + yieldAge(Duration.ofDays(0)), + yieldAge(Duration.ofDays(1)), + yieldAge(Duration.ofDays(99)), + yieldAge(Duration.ofDays(100))))); + } + + static BaseCommitObj yieldAge(Duration duration) { + var m = mock(BaseCommitObj.class); + when(m.createdAtMicros()).thenReturn(TimeUnit.MILLISECONDS.toMicros(duration.toMillis())); + return m; + } +} diff --git a/persistence/nosql/persistence/maintenance/spi/build.gradle.kts b/persistence/nosql/persistence/maintenance/spi/build.gradle.kts new file mode 100644 index 0000000000..9e493b58e8 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/spi/build.gradle.kts @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = + "Polaris NoSQL persistence maintenance - SPI to implement identifiers of references and objects to retain" + +dependencies { + implementation(project(":polaris-persistence-nosql-api")) + compileOnly(project(":polaris-persistence-nosql-maintenance-cel")) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") +} diff --git a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/CountDownPredicate.java b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/CountDownPredicate.java new file mode 100644 index 0000000000..f32a792ace --- /dev/null +++ b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/CountDownPredicate.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.spi; + +import java.util.function.Predicate; + +/** + * Predicate that yields {@code true} for the number of {@link #test(Object)} invocations given to + * the constructor. + */ +public final class CountDownPredicate implements Predicate { + private int remaining; + + public CountDownPredicate(int remaining) { + if (remaining <= 0) { + throw new IllegalArgumentException("remaining must be > 0"); + } + this.remaining = remaining; + } + + @Override + public boolean test(T t) { + if (remaining <= 0) { + return false; + } + remaining--; + return true; + } +} diff --git a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/ObjTypeRetainedIdentifier.java b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/ObjTypeRetainedIdentifier.java new file mode 100644 index 0000000000..3cc6484fc5 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/ObjTypeRetainedIdentifier.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.spi; + +import jakarta.annotation.Nonnull; +import org.apache.polaris.persistence.nosql.api.obj.Obj; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.obj.ObjType; +import org.apache.polaris.persistence.nosql.api.ref.Reference; + +/** + * Implementations of this interface are called for objects that have been identified to be + * retained either by a {@link PerRealmRetainedIdentifier} or another {@link + * ObjTypeRetainedIdentifier}. + * + *

Polaris extensions and plugins that persist non-standard {@linkplain Reference references} or + * {@linkplain Obj objects} must provide an implementation of this interface to ensure that the + * required references and objects are not purged. + * + *

Implementation must be annotated as {@link + * jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} for CDI usage. + */ +public interface ObjTypeRetainedIdentifier { + /** Human-readable name. */ + String name(); + + /** The object type that the implementation handles. */ + @Nonnull + ObjType handledObjType(); + + /** + * Called for every scanned object with the ID {@code objRef} having the object type yielded by + * {@link #handledObjType()}. + * + *

Any exception thrown from this function aborts the whole maintenance run. Exceptions thrown + * from functionality called by the implementation must be properly handled. + * + * @param collector instance that collects the objects and references to retain + * @param objRef ID of the object that has been scanned + */ + void identifyRelatedObj(@Nonnull RetainedCollector collector, @Nonnull ObjRef objRef); +} diff --git a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/PerRealmRetainedIdentifier.java b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/PerRealmRetainedIdentifier.java new file mode 100644 index 0000000000..7b56564d8e --- /dev/null +++ b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/PerRealmRetainedIdentifier.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.spi; + +import jakarta.annotation.Nonnull; +import org.apache.polaris.persistence.nosql.api.obj.Obj; +import org.apache.polaris.persistence.nosql.api.ref.Reference; + +/** + * Implementations of this interface are called by the maintenance service for every realm to + * retain. + * + *

Implementation must be annotated as {@link + * jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} for CDI-usage. + */ +public interface PerRealmRetainedIdentifier { + /** Human-readable name. */ + String name(); + + /** + * Called to identify "live" references and objects for a realm. + * + *

The given {@linkplain RetainedCollector collector} must be invoked for every {@linkplain + * Reference reference} and {@linkplain Obj object} to retain. The maintenance service is allowed + * to purge references and objects that were not passed to the {@linkplain RetainedCollector + * collector's} {@code retain*()} functions. + * + *

Any exception thrown from this function aborts the whole maintenance run. Exceptions thrown + * from functionality called by the implementation must be properly handled. + * + *

The purpose of the {@code boolean} return value is meant as a safety net in case to not + * accidentally purge a realm. + * + * @param collector consumer of "live" references and objects + * @return {@code true} if this function was able to handle the realm, {@code false} if the + * implementation did not process the realm or wants to defer the decision to another + * implementation. + */ + boolean identifyRetained(@Nonnull RetainedCollector collector); +} diff --git a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/RetainedCollector.java b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/RetainedCollector.java new file mode 100644 index 0000000000..19eaabcf8e --- /dev/null +++ b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/RetainedCollector.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.maintenance.spi; + +import static org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_ID; +import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.OBJ_REF_SERIALIZER; + +import jakarta.annotation.Nonnull; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.polaris.maintenance.cel.CelReferenceContinuePredicate; +import org.apache.polaris.persistence.nosql.api.Persistence; +import org.apache.polaris.persistence.nosql.api.commit.Commits; +import org.apache.polaris.persistence.nosql.api.index.IndexContainer; +import org.apache.polaris.persistence.nosql.api.index.IndexStripe; +import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.apache.polaris.persistence.nosql.api.ref.Reference; + +public interface RetainedCollector { + /** ID of the realm being processed. */ + @Nonnull + String realm(); + + default boolean isSystemRealm() { + return SYSTEM_REALM_ID.equals(realm()); + } + + /** + * {@link Persistence Persistence} configured for the current {@linkplain #realm() realm}. + * + *

References and objects that are read or written via this {@link Persistence} are + * automatically retained. + * + *

The returned {@link Persistence Persistence} bypasses the cache to avoid polluting the + * production cache with accesses from the maintenance service. + * + *

If the reference name(s) and {@linkplain ObjRef object IDs} are known in advance, it is more + * efficient to just call the {@link #retainReference(String)}/{@link #retainObject(ObjRef)} + * functions, because those will not access the backend database. + */ + @Nonnull + Persistence realmPersistence(); + + /** + * Instruct the maintenance service to retain the reference with the given name. + * + *

References that are fetched via {@link #realmPersistence()} are automatically marked to be + * retained. + */ + void retainReference(@Nonnull String name); + + /** + * Instruct the maintenance service to retain the reference with the given object ID. + * + *

Objects that are fetched via {@link #realmPersistence()} are automatically marked to be + * retained. + */ + void retainObject(@Nonnull ObjRef objRef); + + default void indexRetain(IndexContainer indexContainer) { + indexContainer.stripes().stream().map(IndexStripe::segment).forEach(this::retainObject); + } + + /** + * Same as {@link #refRetain(String, Class, Predicate, Consumer, ProgressListener)}, without a + * {@link ProgressListener}. + */ + default void refRetain( + String ref, Class clazz, Predicate continuePredicate, Consumer retainedObjConsumer) { + refRetain(ref, clazz, continuePredicate, retainedObjConsumer, new ProgressListener<>() {}); + } + + interface ProgressListener { + default void onCommit(O obj, long commit) {} + + default void onIndexEntry(long inCommit, long total) {} + + default void cut() {} + + default void finished() {} + } + + /** + * Functionality to identify the objects in a {@link Reference} to retain by {@linkplain + * Commits#commitLog(String, OptionalLong, Class) walking} the commit log. + * + *

For flexibility, consider using {@link CelReferenceContinuePredicate}. + * + * @param ref reference name, automatically marked as to-be-retained + * @param clazz type of the {@linkplain Reference#pointer() referenced objects} + * @param continuePredicate predicate to test whether to continue processing the reference + * @param retainedObjConsumer called for every retained object + * @param type of the {@linkplain Reference#pointer() referenced objects} + */ + default void refRetain( + String ref, + Class clazz, + Predicate continuePredicate, + Consumer retainedObjConsumer, + ProgressListener progressListener) { + var persistence = realmPersistence(); + var commits = persistence.commits(); + var numCommits = 0L; + for (var iter = commits.commitLog(ref, OptionalLong.empty(), clazz); iter.hasNext(); ) { + var obj = iter.next(); + retainedObjConsumer.accept(obj); + + progressListener.onCommit(obj, ++numCommits); + + // WARNING! The "stop" predicate must happen AFTER all referenced objects have been retained, + // doing the test before the loop over the above index would lead to the inconsistent state! + if (!continuePredicate.test(obj)) { + progressListener.cut(); + break; + } + } + progressListener.finished(); + } + + /** + * Same as {@link #refRetainIndexToSingleObj(String, Class, Predicate, Function, ProgressListener, + * Consumer)}, but without a {@link ProgressListener}. + */ + default void refRetainIndexToSingleObj( + String ref, + Class clazz, + Predicate continuePredicate, + Function> indexToObjIdFromRetainedObj, + Consumer retainedObjConsumer) { + refRetainIndexToSingleObj( + ref, + clazz, + continuePredicate, + indexToObjIdFromRetainedObj, + new ProgressListener<>() { + @Override + public void onCommit(O obj, long commit) { + retainedObjConsumer.accept(obj); + } + }, + x -> {}); + } + + default void refRetainIndexToSingleObj( + String ref, + Class clazz, + Predicate continuePredicate, + Function> indexToObjIdFromRetainedObj) { + refRetainIndexToSingleObj(ref, clazz, continuePredicate, indexToObjIdFromRetainedObj, x -> {}); + } + + /** + * Similar to {@link #refRetain(String, Class, Predicate, Consumer)}, with convenience to iterate + * over an {@link IndexContainer} having {@link ObjRef} index-element values to mark those as + * to-be-retained. + * + *

For flexibility, consider using {@link CelReferenceContinuePredicate}. + * + * @param ref reference name + * @param clazz type of the {@linkplain Reference#pointer() referenced objects} + * @param continuePredicate predicate to test whether to continue processing the reference + * @param indexToObjIdFromRetainedObj function to extract the {@link IndexContainer} from objects + * @param type of the {@linkplain Reference#pointer() referenced objects} + */ + default void refRetainIndexToSingleObj( + String ref, + Class clazz, + Predicate continuePredicate, + Function> indexToObjIdFromRetainedObj, + ProgressListener progressListener, + Consumer indexedObjRefConsumer) { + var total = new AtomicLong(); + refRetain( + ref, + clazz, + continuePredicate, + obj -> { + var elem = 0L; + var t = total.get(); + + // TODO we can save a lot of time when there is a (long) history to retain to skip + // inspecting already seen index segments (as a performance optimization.), but that + // requires some changes to the index APIs. + + for (var entry : + indexToObjIdFromRetainedObj + .apply(obj) + .indexForRead(realmPersistence(), OBJ_REF_SERIALIZER)) { + ObjRef indexedObjRef = entry.getValue(); + retainObject(indexedObjRef); + // ^ is for persistence.fetch(principalEntry.getValue(), PrincipalObj.class); + ++elem; + progressListener.onIndexEntry(elem, t + elem); + + indexedObjRefConsumer.accept(indexedObjRef); + } + total.addAndGet(elem); + }, + progressListener); + } + + /* + + Consumer retainedObjConsumer, + */ +} diff --git a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/package-info.java b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/package-info.java new file mode 100644 index 0000000000..d7f6c4af58 --- /dev/null +++ b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Interfaces needed to provide implementations to identify "live" references and objects for + * maintenance. + */ +package org.apache.polaris.persistence.nosql.maintenance.spi;