Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validators for History / Snapshot Retention Policy #259

Merged
merged 21 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.openhouse.jobs.util;

import com.linkedin.openhouse.tables.client.model.Retention;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** History config class. This is app side representation of /tables policies->history */
@Builder
@Getter
@EqualsAndHashCode
@ToString
public class HistoryConfig {
private final Retention.GranularityEnum granularity;
private final int maxAge;
private final int versions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.openhouse.gen.tables.client.api.SnapshotApi;
import com.linkedin.openhouse.gen.tables.client.api.TableApi;
import com.linkedin.openhouse.gen.tables.client.model.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.gen.tables.client.model.History;
import com.linkedin.openhouse.gen.tables.client.model.Policies;
import com.linkedin.openhouse.gen.tables.client.model.PolicyTag;
import com.linkedin.openhouse.gen.tables.client.model.Retention;
Expand Down Expand Up @@ -330,4 +331,62 @@ public void testPoliciesReplicationExistsUpdateExistsForMultiple() {
updatedPolicies.getReplication().getConfig().get(1).getInterval(), "2D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 2);
}

@Test
public void testPoliciesHistoryInMetadataNoUpdate() {
Map<String, String> props = new HashMap<>();
props.put(
"policies",
"{\"history\": {\"maxAge\": \"1\", \"granularity\": \"DAY\", \"versions\": \"2\"}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(1, updatedPolicies.getHistory().getMaxAge());
Assertions.assertEquals(
History.GranularityEnum.DAY, updatedPolicies.getHistory().getGranularity());
Assertions.assertEquals(2, updatedPolicies.getHistory().getVersions());
}

@Test
public void testNoPoliciesHistoryExistsButUpdateExists() {
Map<String, String> props = new HashMap<>();
props.put(
"updated.openhouse.policy",
"{\"history\": {\"maxAge\": \"1\", \"granularity\": \"DAY\", \"versions\": \"2\"}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(1, updatedPolicies.getHistory().getMaxAge());
Assertions.assertEquals(
History.GranularityEnum.DAY, updatedPolicies.getHistory().getGranularity());
Assertions.assertEquals(2, updatedPolicies.getHistory().getVersions());
}

@Test
public void testPoliciesHistoryExistsUpdate() {
Map<String, String> props = new HashMap<>();
props.put(
"openhouse.policy",
"{\"history\": {\"maxAge\": \"2\", \"granularity\": \"HOUR\", \"versions\": \"3\"}}");
props.put(
"updated.openhouse.policy",
"{\"history\": {\"maxAge\": \"1\", \"granularity\": \"DAY\", \"versions\": \"2\"}, \"sharingEnabled\": true}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(1, updatedPolicies.getHistory().getMaxAge());
Assertions.assertEquals(
History.GranularityEnum.DAY, updatedPolicies.getHistory().getGranularity());
Assertions.assertEquals(2, updatedPolicies.getHistory().getVersions());
Assertions.assertEquals(true, updatedPolicies.getSharingEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ Policies buildUpdatedPolicies(TableMetadata metadata) {
if (patchUpdatedPolicy.getRetention() != null) {
policies.setRetention(patchUpdatedPolicy.getRetention());
}

// Update sharing config
if (patchUpdatedPolicy.getSharingEnabled() != null) {
policies.sharingEnabled(patchUpdatedPolicy.getSharingEnabled());
Expand All @@ -215,6 +216,11 @@ Policies buildUpdatedPolicies(TableMetadata metadata) {
if (patchUpdatedPolicy.getReplication() != null) {
policies.replication(patchUpdatedPolicy.getReplication());
}
// Update history config
if (patchUpdatedPolicy.getHistory() != null) {
policies.setHistory(patchUpdatedPolicy.getHistory());
}

return policies;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.openhouse.tables.api.spec.v0.request.components;

import io.swagger.v3.oas.annotations.media.Schema;
import javax.validation.Valid;
import javax.validation.constraints.PositiveOrZero;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder(toBuilder = true)
@EqualsAndHashCode
@Getter
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class History {
@Schema(
description = "Time period in count <granularity> to keep the snapshot history on the table",
example = "3,4,5")
@PositiveOrZero(
message = "Incorrect count specified. retention.maxAge has to be a positive integer")
@Valid
int maxAge;

@Schema(description = "time period granularity for the snapshot history", example = "hour, day")
@Valid
TimeGranularity granularity;

@Schema(
description =
"Number of snapshots to keep within history for the table after snapshot expiration",
example = "3,4,5")
@PositiveOrZero(
message = "Incorrect count specified. history.versions has to be a positive integer")
int versions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public class Policies {
example = "{replication:{config:[{destination: clusterA, interval: 12H}]}}")
@Valid
Replication replication;

@Schema(
description =
"Holds the history configuration for snapshots of the table, which can be configured by maxAge by time and/or number of versions",
example = "{history:{maxAge:3, granularity: 'day', versions: 5}}")
@Valid
History history;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class Retention {
example = "hour, day, month, year")
@NotNull(message = "Incorrect granularity specified. retention.granularity cannot be null")
@Valid
TimePartitionSpec.Granularity granularity;
TimeGranularity granularity;

@Valid
@Schema(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.linkedin.openhouse.tables.api.spec.v0.request.components;

import lombok.Getter;

@Getter
public enum TimeGranularity {
HOUR("H"),
DAY("D"),
MONTH("M"),
YEAR("Y");

private final String granularity;

TimeGranularity(String granularity) {
this.granularity = granularity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,5 @@ public class TimePartitionSpec {

@Schema(description = "Granularity of the time partition.")
@NotNull(message = "granularity cannot be null")
Granularity granularity;

@Getter
public enum Granularity {
HOUR("H"),
DAY("D"),
MONTH("M"),
YEAR("Y");

private final String granularity;

Granularity(String granularity) {
this.granularity = granularity;
}
}
TimeGranularity granularity;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.linkedin.openhouse.tables.api.validator.impl;

import com.linkedin.openhouse.common.api.spec.TableUri;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.History;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimeGranularity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class HistoryPolicySpecValidator {

private String failureMessage = "";
private String errorField = "";

protected boolean validate(History history, TableUri tableUri) {
if (history != null) {
if (history.getMaxAge() <= 0 && history.getVersions() <= 0) {
failureMessage =
String.format(
"Must define either a time based retention or count based retention for snapshots in table %s",
tableUri);
return false;
}

if (history.getGranularity() == null && history.getMaxAge() > 0
|| history.getGranularity() != null && history.getMaxAge() <= 0) {
failureMessage =
String.format(
"Incorrect maxAge specified. history.maxAge must be defined together with history.granularity for table %s",
tableUri);
return false;
}

if (!validateHistoryConfigMaxAgeWithinBounds(history)) {
failureMessage =
String.format(
"History for the table [%s] max age must be between 1 to 3 days", tableUri);
return false;
}

if (!validateHistoryConfigVersionsWithinBounds(history)) {
failureMessage =
String.format("History for the table [%s] must be between 2 to 100 versions", tableUri);
return false;
}
}
return true;
}

/**
* Validate that the amount of time to retain history of table snapshots is between 1 to 3 days
* versions
*
* @param history
* @return
*/
protected boolean validateHistoryConfigMaxAgeWithinBounds(History history) {
int maxAge = history.getMaxAge();
TimeGranularity granularity = history.getGranularity();
boolean isGranularityValid =
granularity == null
|| granularity.equals(TimeGranularity.HOUR)
|| granularity.equals(TimeGranularity.DAY);

boolean isMaxAgeValid =
granularity != null
&& (maxAge < 3 && granularity.equals(TimeGranularity.DAY)
|| maxAge < 72 && granularity.equals(TimeGranularity.HOUR))
&& (maxAge > 1 && granularity.equals(TimeGranularity.DAY)
|| maxAge > 24 && granularity.equals(TimeGranularity.HOUR));

return isGranularityValid && isMaxAgeValid;
}

protected boolean validateHistoryConfigVersionsWithinBounds(History history) {
int versions = history.getVersions();
return versions >= 2 && versions <= 100;
}

public String getMessage() {
return failureMessage;
}

public String getField() {
return errorField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public class OpenHouseTablesApiValidator implements TablesApiValidator {

@Autowired private Validator validator;

@Autowired private PoliciesSpecValidator policiesSpecValidator;
@Autowired private RetentionPolicySpecValidator retentionPolicySpecValidator;

@Autowired private ClusteringSpecValidator clusteringSpecValidator;

@Autowired private ReplicationConfigValidator replicationConfigValidator;

@Autowired private HistoryPolicySpecValidator historyPolicySpecValidator;

@Override
public void validateGetTable(String databaseId, String tableId) {
List<String> validationFailures = new ArrayList<>();
Expand Down Expand Up @@ -132,7 +134,7 @@ private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequ
.clusterId(createUpdateTableRequestBody.getClusterId())
.databaseId(createUpdateTableRequestBody.getDatabaseId())
.build();
if (!policiesSpecValidator.validate(
if (!retentionPolicySpecValidator.validate(
createUpdateTableRequestBody.getPolicies(),
createUpdateTableRequestBody.getTimePartitioning(),
tableUri,
Expand All @@ -141,7 +143,8 @@ private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequ
Arrays.asList(
String.format(
"%s : %s",
policiesSpecValidator.getField(), policiesSpecValidator.getMessage())));
retentionPolicySpecValidator.getField(),
retentionPolicySpecValidator.getMessage())));
}
if (createUpdateTableRequestBody.getPolicies() != null
&& createUpdateTableRequestBody.getPolicies().getReplication() != null) {
Expand All @@ -155,6 +158,18 @@ private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequ
replicationConfigValidator.getMessage())));
}
}
if (createUpdateTableRequestBody.getPolicies() != null
&& createUpdateTableRequestBody.getPolicies().getHistory() != null) {
if (!historyPolicySpecValidator.validate(
createUpdateTableRequestBody.getPolicies().getHistory(), tableUri)) {
throw new RequestValidationFailureException(
Arrays.asList(
String.format(
"%s : %s",
historyPolicySpecValidator.getField(),
historyPolicySpecValidator.getMessage())));
}
}
}

@SuppressWarnings("checkstyle:OperatorWrap")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
@Component
@Slf4j
public class PoliciesSpecValidator {
public class RetentionPolicySpecValidator {

private String failureMessage = "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.openhouse.common.api.validator.ValidatorConstants;
import com.linkedin.openhouse.common.exception.RequestValidationFailureException;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimeGranularity;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Transform;
import com.linkedin.openhouse.tables.model.TableDto;
Expand Down Expand Up @@ -236,21 +237,21 @@ public PartitionSpec toPartitionSpec(TableDto tableDto) {
* @param partitionField partitionField
* @return optional.empty() or optional.of(granularity)
*/
private Optional<TimePartitionSpec.Granularity> toGranularity(PartitionField partitionField) {
private Optional<TimeGranularity> toGranularity(PartitionField partitionField) {
/* String based comparison is necessary as the classes are package-private */
TimePartitionSpec.Granularity granularity = null;
TimeGranularity granularity = null;
switch (partitionField.transform().toString()) {
case "year":
granularity = TimePartitionSpec.Granularity.YEAR;
granularity = TimeGranularity.YEAR;
break;
case "month":
granularity = TimePartitionSpec.Granularity.MONTH;
granularity = TimeGranularity.MONTH;
break;
case "hour":
granularity = TimePartitionSpec.Granularity.HOUR;
granularity = TimeGranularity.HOUR;
break;
case "day":
granularity = TimePartitionSpec.Granularity.DAY;
granularity = TimeGranularity.DAY;
break;
default:
break;
Expand Down
Loading
Loading