Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,24 @@ public class PolarisSynchronizer {

private final boolean haltOnFailure;

private final boolean diffOnly;

public PolarisSynchronizer(
Logger clientLogger,
boolean haltOnFailure,
SynchronizationPlanner synchronizationPlanner,
PolarisService source,
PolarisService target,
ETagManager etagManager) {
ETagManager etagManager,
boolean diffOnly) {
this.clientLogger =
clientLogger == null ? LoggerFactory.getLogger(PolarisSynchronizer.class) : clientLogger;
this.haltOnFailure = haltOnFailure;
this.syncPlanner = synchronizationPlanner;
this.source = source;
this.target = target;
this.etagManager = etagManager;
this.diffOnly = diffOnly;
}

/**
Expand Down Expand Up @@ -1035,18 +1039,25 @@ public void syncNamespaces(
try {
Map<String, String> sourceNamespaceMetadata =
sourceIcebergCatalogService.loadNamespaceMetadata(namespace);
Map<String, String> targetNamespaceMetadata =
targetIcebergCatalogService.loadNamespaceMetadata(namespace);

if (sourceNamespaceMetadata.equals(targetNamespaceMetadata)) {
clientLogger.info(
"Namespace metadata for namespace {} in namespace {} for catalog {} was not modified, skipping. - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
continue;
if (this.diffOnly) {
// if only configured to migrate the diff between the source and the target Polaris,
// then we can load the target namespace metadata and perform a comparison to discontinue early
// if we notice the metadata did not change

Map<String, String> targetNamespaceMetadata =
targetIcebergCatalogService.loadNamespaceMetadata(namespace);

if (sourceNamespaceMetadata.equals(targetNamespaceMetadata)) {
clientLogger.info(
"Namespace metadata for namespace {} in namespace {} for catalog {} was not modified, skipping. - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
continue;
}
}

targetIcebergCatalogService.setNamespaceProperties(namespace, sourceNamespaceMetadata);
Expand Down Expand Up @@ -1206,7 +1217,7 @@ public void syncTables(
try {
Table table;

if (sourceIcebergCatalogService instanceof PolarisIcebergCatalogService polarisCatalogService) {
if (this.diffOnly && sourceIcebergCatalogService instanceof PolarisIcebergCatalogService polarisCatalogService) {
String etag = etagManager.getETag(catalogName, tableId);
table = polarisCatalogService.loadTable(tableId, etag);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,58 @@
import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;

/**
* Sync planner that attempts to create total parity between the source and target Polaris
* instances. This involves creating new entities, overwriting entities that exist on both source
* and target, and removing entities that exist only on the target.
* Planner that implements the base level strategy that can be applied to synchronize the source and target.
* Can be configured at different levels of modification.
*/
public class SourceParitySynchronizationPlanner implements SynchronizationPlanner {
public class BaseStrategyPlanner implements SynchronizationPlanner {

/**
* Sort entities from the source into create, overwrite, and remove categories
* The strategy to employ when using {@link BaseStrategyPlanner}.
*/
public enum Strategy {

/**
* Only create entities that exist on source but don't already exist on the target
*/
CREATE_ONLY,

/**
* Create entities that do not exist on the target, and overwrite existing ones with same name/identifier
*/
CREATE_AND_OVERWRITE,

/**
* Create entities that exist on the source and not target, update entities that exist on both, remove entities
* from the target that do not exist on the source.
*/
REPLICATE

}

private final Strategy strategy;

public BaseStrategyPlanner(Strategy strategy) {
this.strategy = strategy;
}

/**
* Sort entities from the source into create, overwrite, remove, and skip categories
* on the basis of which identifiers exist on the source and target Polaris.
* Identifiers that are both on the source and target instance will be marked
* for overwrite. Entities that are only on the source instance will be marked for
* creation. Entities that are only on the target instance will be marked for deletion.
* for overwrite if overwriting is enabled. Entities that are only on the source instance
* will be marked for creation. Entities that are only on the target instance will be marked for deletion
* only if the {@link Strategy#REPLICATE} strategy is used.
* @param entitiesOnSource the entities from the source
* @param entitiesOnTarget the entities from the target
* @param supportOverwrites true if "overwriting" the entity is necessary. Most grant record entities do not need overwriting.
* @param requiresOverwrites true if "overwriting" the entity is necessary. Most grant record entities do not need overwriting.
* @param entityIdentifierSupplier consumes an entity and returns an identifying representation of that entity
* @return a {@link SynchronizationPlan} with the entities sorted based on the souce parity strategy
* @param <T> the type of the entity
*/
private <T> SynchronizationPlan<T> sortOnIdentifier(
Collection<T> entitiesOnSource,
Collection<T> entitiesOnTarget,
boolean supportOverwrites,
boolean requiresOverwrites,
Function<T, Object> entityIdentifierSupplier
) {
Set<Object> sourceEntityIdentifiers = entitiesOnSource.stream().map(entityIdentifierSupplier).collect(Collectors.toSet());
Expand All @@ -65,11 +94,28 @@ private <T> SynchronizationPlan<T> sortOnIdentifier(

for (T entityOnSource : entitiesOnSource) {
Object sourceEntityId = entityIdentifierSupplier.apply(entityOnSource);

if (targetEntityIdentifiers.contains(sourceEntityId)) {
if (supportOverwrites) {
// If an entity with this identifier exists on both the source and the target

if (strategy == Strategy.CREATE_ONLY) {
// if the same entity identifier is on the source and target,
// but we only permit creates, skip it
plan.skipEntity(entityOnSource);
} else {
// if the same entity identifier is on the source and the target,
// overwrite the one on the target with the one on the source
plan.overwriteEntity(entityOnSource);

if (requiresOverwrites) {
// If the entity requires a drop-and-recreate to perform an overwrite.
// some grant records can be "created" indefinitely even if they already exists, for example,
// catalog roles can be assigned the same principal role many times
plan.overwriteEntity(entityOnSource);
} else {
// if the entity is not a type that requires "overwriting" in the sense of
// dropping and recreating, then just create it again
plan.createEntity(entityOnSource);
}
}
} else {
// if the entity identifier only exists on the source, that means
Expand All @@ -89,7 +135,15 @@ private <T> SynchronizationPlan<T> sortOnIdentifier(
// or a catalog role was revoked from a principal role, in which case the target
// should reflect this change when the tool is run multiple times, because we don't
// want to take chances with over-extending privileges
plan.removeEntity(entityOnTarget);

if (strategy == Strategy.REPLICATE) {
plan.removeEntity(entityOnTarget);
} else {
// skip children here because if we want to remove the entity
// and then that means it does not exist on the source, so there are no child
// entities to sync
plan.skipEntityAndSkipChildren(entityOnTarget);
}
}
}

Expand All @@ -99,7 +153,7 @@ private <T> SynchronizationPlan<T> sortOnIdentifier(
@Override
public SynchronizationPlan<Principal> planPrincipalSync(
List<Principal> principalsOnSource, List<Principal> principalsOnTarget) {
return sortOnIdentifier(principalsOnSource, principalsOnTarget, /* supportsOverwrites */ true, Principal::getName);
return sortOnIdentifier(principalsOnSource, principalsOnTarget, /* requiresOverwrites */ true, Principal::getName);
}

@Override
Expand All @@ -111,7 +165,7 @@ public SynchronizationPlan<PrincipalRole> planAssignPrincipalsToPrincipalRolesSy
return sortOnIdentifier(
assignedPrincipalRolesOnSource,
assignedPrincipalRolesOnTarget,
/* supportsOverwrites */ false, // do not need to overwrite an assignment of a principal role to a principal
/* requiresOverwrites */ false, // do not need to overwrite an assignment of a principal role to a principal
PrincipalRole::getName
);
}
Expand All @@ -123,15 +177,15 @@ public SynchronizationPlan<PrincipalRole> planPrincipalRoleSync(
return sortOnIdentifier(
principalRolesOnSource,
principalRolesOnTarget,
/* supportsOverwrites */ true,
/* requiresOverwrites */ true,
PrincipalRole::getName
);
}

@Override
public SynchronizationPlan<Catalog> planCatalogSync(
List<Catalog> catalogsOnSource, List<Catalog> catalogsOnTarget) {
return sortOnIdentifier(catalogsOnSource, catalogsOnTarget, /* supportsOverwrites */ true, Catalog::getName);
return sortOnIdentifier(catalogsOnSource, catalogsOnTarget, /* requiresOverwrites */ true, Catalog::getName);
}

@Override
Expand All @@ -140,7 +194,7 @@ public SynchronizationPlan<CatalogRole> planCatalogRoleSync(
List<CatalogRole> catalogRolesOnSource,
List<CatalogRole> catalogRolesOnTarget) {
return sortOnIdentifier(
catalogRolesOnSource, catalogRolesOnTarget, /* supportsOverwrites */ true, CatalogRole::getName);
catalogRolesOnSource, catalogRolesOnTarget, /* requiresOverwrites */ true, CatalogRole::getName);
}

@Override
Expand All @@ -152,7 +206,7 @@ public SynchronizationPlan<GrantResource> planGrantSync(
return sortOnIdentifier(
grantsOnSource,
grantsOnTarget,
/* supportsOverwrites */ false,
/* requiresOverwrites */ false,
grant -> grant // grants can just be compared by the entire generated object
);
}
Expand All @@ -166,7 +220,7 @@ public SynchronizationPlan<PrincipalRole> planAssignPrincipalRolesToCatalogRoles
return sortOnIdentifier(
assignedPrincipalRolesOnSource,
assignedPrincipalRolesOnTarget,
/* supportsOverwrites */ false,
/* requiresOverwrites */ false,
PrincipalRole::getName
);
}
Expand All @@ -177,7 +231,7 @@ public SynchronizationPlan<Namespace> planNamespaceSync(
Namespace namespace,
List<Namespace> namespacesOnSource,
List<Namespace> namespacesOnTarget) {
return sortOnIdentifier(namespacesOnSource, namespacesOnTarget, /* supportsOverwrites */ true, ns -> ns);
return sortOnIdentifier(namespacesOnSource, namespacesOnTarget, /* requiresOverwrites */ true, ns -> ns);
}

@Override
Expand All @@ -187,6 +241,6 @@ public SynchronizationPlan<TableIdentifier> planTableSync(
Set<TableIdentifier> tablesOnSource,
Set<TableIdentifier> tablesOnTarget) {
return sortOnIdentifier(
tablesOnSource, tablesOnTarget, /* supportsOverwrites */ true, tableId -> tableId);
tablesOnSource, tablesOnTarget, /* requiresOverwrites */ true, tableId -> tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface PlannerWrapper {

private final List<PlannerWrapper> plannerWrappers = new ArrayList<>();

private SynchronizationPlannerBuilder(SourceParitySynchronizationPlanner innermost) {
private SynchronizationPlannerBuilder(BaseStrategyPlanner innermost) {
this.innermost = innermost;
}

Expand Down Expand Up @@ -90,7 +90,7 @@ public SynchronizationPlanner build() {
}
}

static SynchronizationPlannerBuilder builder(SourceParitySynchronizationPlanner innermost) {
static SynchronizationPlannerBuilder builder(BaseStrategyPlanner innermost) {
return new SynchronizationPlannerBuilder(innermost);
}

Expand Down
Loading