-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: add TableMetadata update builder #2957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,9 +57,14 @@ public class TableMetadata implements Serializable { | |
| static final int INITIAL_SPEC_ID = 0; | ||
| static final int INITIAL_SORT_ORDER_ID = 1; | ||
| static final int INITIAL_SCHEMA_ID = 0; | ||
| static final long INITIAL_SNAPSHOT_ID = -1; | ||
|
|
||
| private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1); | ||
|
|
||
| public static TableMetadataUpdateBuilder builderFrom(TableMetadata base) { | ||
| return new TableMetadataUpdateBuilder(base); | ||
| } | ||
|
|
||
| public static TableMetadata newTableMetadata(Schema schema, | ||
| PartitionSpec spec, | ||
| SortOrder sortOrder, | ||
|
|
@@ -343,6 +348,10 @@ public String toString() { | |
| "Invalid table metadata: Cannot find current version"); | ||
| } | ||
|
|
||
| InputFile file() { | ||
| return file; | ||
| } | ||
|
|
||
| public int formatVersion() { | ||
| return formatVersion; | ||
| } | ||
|
|
@@ -459,6 +468,10 @@ public Snapshot currentSnapshot() { | |
| return snapshotsById.get(currentSnapshotId); | ||
| } | ||
|
|
||
| long currentSnapshotId() { | ||
| return currentSnapshotId; | ||
| } | ||
|
|
||
| public List<Snapshot> snapshots() { | ||
| return snapshots; | ||
| } | ||
|
|
@@ -475,10 +488,7 @@ public TableMetadata withUUID() { | |
| if (uuid != null) { | ||
| return this; | ||
| } else { | ||
| return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location, | ||
| lastSequenceNumber, lastUpdatedMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, | ||
| currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this).generateUUID().build(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -500,11 +510,14 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) { | |
| builder.add(new Schema(newSchemaId, newSchema.columns(), newSchema.identifierFieldIds())); | ||
| } | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| lastSequenceNumber, System.currentTimeMillis(), newLastColumnId, | ||
| newSchemaId, builder.build(), defaultSpecId, updatedSpecs, lastAssignedPartitionId, | ||
| defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog, | ||
| addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this have a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous file is added by default from the base metadata during build, unless explicitly overwritten.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes I know that this is being added during |
||
| .refreshLastUpdateMillis() | ||
| .withLastColumnId(newLastColumnId) | ||
| .withSchemas(builder.build()) | ||
| .withCurrentSchemaId(newSchemaId) | ||
| .withSpecs(updatedSpecs) | ||
| .withSortOrders(updatedSortOrders) | ||
| .build(); | ||
| } | ||
|
|
||
| // The caller is responsible to pass a newPartitionSpec with correct partition field IDs | ||
|
|
@@ -538,11 +551,12 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { | |
| builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec)); | ||
| } | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, newDefaultSpecId, | ||
| builder.build(), Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId()), | ||
| defaultSortOrderId, sortOrders, properties, | ||
| currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withDefaultSpecId(newDefaultSpecId) | ||
| .withSpecs(builder.build()) | ||
| .withLastAssignedPartitionId(Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId())) | ||
| .build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's a bit confusing when some stuff is set via builder methods and other stuff is set when |
||
| } | ||
|
|
||
| public TableMetadata replaceSortOrder(SortOrder newOrder) { | ||
|
|
@@ -577,10 +591,11 @@ public TableMetadata replaceSortOrder(SortOrder newOrder) { | |
| } | ||
| } | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog, | ||
| addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withDefaultSortOrderId(newOrderId) | ||
| .withSortOrders(builder.build()) | ||
| .build(); | ||
| } | ||
|
|
||
| public TableMetadata addStagedSnapshot(Snapshot snapshot) { | ||
|
|
@@ -593,11 +608,11 @@ public TableMetadata addStagedSnapshot(Snapshot snapshot) { | |
| .add(snapshot) | ||
| .build(); | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, | ||
| currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId, | ||
| defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog, | ||
| addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .withLastSequenceNumber(snapshot.sequenceNumber()) | ||
| .withLastUpdatedMillis(snapshot.timestampMillis()) | ||
| .withSnapshots(newSnapshots) | ||
| .build(); | ||
| } | ||
|
|
||
| public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { | ||
|
|
@@ -619,11 +634,13 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { | |
| .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())) | ||
| .build(); | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, | ||
| currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId, | ||
| defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, | ||
| addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .withLastSequenceNumber(snapshot.sequenceNumber()) | ||
| .withLastUpdatedMillis(snapshot.timestampMillis()) | ||
| .withCurrentSnapshotId(snapshot.snapshotId()) | ||
| .withSnapshots(newSnapshots) | ||
| .withSnapshotLog(newSnapshotLog) | ||
| .build(); | ||
| } | ||
|
|
||
| public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) { | ||
|
|
@@ -652,10 +669,11 @@ public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) { | |
| } | ||
| } | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered, | ||
| ImmutableList.copyOf(newSnapshotLog), addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withSnapshots(filtered) | ||
| .withSnapshotLog(ImmutableList.copyOf(newSnapshotLog)) | ||
| .build(); | ||
| } | ||
|
|
||
| private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { | ||
|
|
@@ -676,26 +694,22 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { | |
| .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId())) | ||
| .build(); | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| lastSequenceNumber, nowMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), snapshots, | ||
| newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this actually need to be equal to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, will update them to match |
||
| .withCurrentSnapshotId(snapshot.snapshotId()) | ||
| .withSnapshotLog(newSnapshotLog) | ||
| .build(); | ||
| } | ||
|
|
||
| public TableMetadata replaceProperties(Map<String, String> rawProperties) { | ||
| ValidationException.check(rawProperties != null, "Cannot set properties to null"); | ||
| Map<String, String> newProperties = unreservedProperties(rawProperties); | ||
| TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, location, | ||
| lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots, | ||
| snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties)); | ||
|
|
||
| int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); | ||
| if (formatVersion != newFormatVersion) { | ||
| metadata = metadata.upgradeToFormatVersion(newFormatVersion); | ||
| } | ||
|
|
||
| return metadata; | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withFormatVersion(newFormatVersion) | ||
| .withProperties(newProperties) | ||
| .build(); | ||
| } | ||
|
|
||
| public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) { | ||
|
|
@@ -711,10 +725,10 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) { | |
| Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId, | ||
| "Cannot set invalid snapshot log: latest entry is not the current snapshot"); | ||
|
|
||
| return new TableMetadata(null, formatVersion, uuid, location, | ||
| lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, | ||
| snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withSnapshotLog(newSnapshotLog) | ||
| .build(); | ||
| } | ||
|
|
||
| private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) { | ||
|
|
@@ -835,49 +849,41 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update | |
| schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), freshSchema.identifierFieldIds())); | ||
| } | ||
|
|
||
| TableMetadata metadata = new TableMetadata(null, formatVersion, uuid, newLocation, | ||
| lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(), | ||
| specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId()), | ||
| orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties), | ||
| -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties)); | ||
|
|
||
| if (formatVersion != newFormatVersion) { | ||
| metadata = metadata.upgradeToFormatVersion(newFormatVersion); | ||
| } | ||
|
|
||
| return metadata; | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withFormatVersion(newFormatVersion) | ||
| .withLocation(newLocation) | ||
| .withLastColumnId(newLastColumnId.get()) | ||
| .withCurrentSchemaId(freshSchemaId) | ||
| .withSchemas(schemasBuilder.build()) | ||
| .withDefaultSpecId(specId) | ||
| .withSpecs(specListBuilder.build()) | ||
| .withLastAssignedPartitionId(Math.max(lastAssignedPartitionId, newSpec.lastAssignedFieldId())) | ||
| .withDefaultSortOrderId(orderId) | ||
| .withSortOrders(sortOrdersBuilder.build()) | ||
| .withProperties(ImmutableMap.copyOf(newProperties)) | ||
| .withCurrentSnapshotId(INITIAL_SNAPSHOT_ID) | ||
| .withSnapshotLog(ImmutableList.of()) | ||
| .build(); | ||
| } | ||
|
|
||
| public TableMetadata updateLocation(String newLocation) { | ||
| return new TableMetadata(null, formatVersion, uuid, newLocation, | ||
| lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, | ||
| snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withLocation(newLocation) | ||
| .build(); | ||
| } | ||
|
|
||
| public TableMetadata upgradeToFormatVersion(int newFormatVersion) { | ||
| Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this logic here is moved to the builder. What we are doing here is simple precondition checks and then directly run the constructor logic with the new format version, so the logic in the original PR to call upgradeToFormatVersion is unnecessary, and can be centralized to the build logic. |
||
| "Cannot upgrade table to unsupported format version: v%s (supported: v%s)", | ||
| newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION); | ||
| Preconditions.checkArgument(newFormatVersion >= formatVersion, | ||
| "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion); | ||
|
|
||
| if (newFormatVersion == formatVersion) { | ||
| return this; | ||
| } | ||
|
|
||
| return new TableMetadata(null, newFormatVersion, uuid, location, | ||
| lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs, | ||
| lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, | ||
| snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); | ||
| } | ||
|
|
||
| private List<MetadataLogEntry> addPreviousFile(InputFile previousFile, long timestampMillis) { | ||
| return addPreviousFile(previousFile, timestampMillis, properties); | ||
| return builderFrom(this) | ||
| .refreshLastUpdateMillis() | ||
| .withFormatVersion(newFormatVersion) | ||
| .build(); | ||
| } | ||
|
|
||
| private List<MetadataLogEntry> addPreviousFile(InputFile previousFile, long timestampMillis, | ||
| Map<String, String> updatedProperties) { | ||
| static List<MetadataLogEntry> addPreviousFile(InputFile previousFile, long timestampMillis, | ||
| Map<String, String> updatedProperties, | ||
| List<MetadataLogEntry> previousFiles) { | ||
| if (previousFile == null) { | ||
| return previousFiles; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this also have a
.withFile(null)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's null by default, is it worth adding it explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, not worth adding it explicitly. I was confused with
baseFileandfile