Skip to content

Conversation

@wypoon
Copy link
Contributor

@wypoon wypoon commented Sep 25, 2020

Now that a snapshot has a schema id associated with it, when reading a snapshot, we should use the schema referenced by that schema id. We implement this for Spark 2. The implementation for Spark 3 is deferred pending resolution of #3269. Changes for other engines that support time travel need to be implemented separately. In addition, for snapshots that are written before Iceberg added schema id to snapshot, we plan to implement recovering the schema by reading previous metadata files, in a future change.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 25, 2020

This is for #1501.

@rdblue
Copy link
Contributor

rdblue commented Sep 25, 2020

Thanks for working on this, @wypoon! This was just recently pointed out by another contributor as well. We're currently tracking this in #1029, which has a bit more detail about how this should be built.

While I like that your solution found a way to recover the old snapshot, we don't want to need to read old metadata files to recover it. Iceberg should track the schema that was used for each snapshot. That means a few steps are needed:

  1. Add a list, schemas, to table metadata that has all of the schemas for known snapshots, like partition-specs
  2. Add a schema-id to each schema so that the schemas can be referenced by id
  3. Add a schema-id to each snapshot to track the schema that was current when it was created

Then we would need to add a Snapshot.schema method to retrieve the schema of a snapshot directly. Would you like to work on a PR for the first one?

@wypoon
Copy link
Contributor Author

wypoon commented Sep 25, 2020

@rdblue, thanks for the feedback and the pointer to #1029.
About step 1 in the steps you outlined, would the .metadata.json file then contain a schema field with the current schema and a (new) schemas field with a list of all the schemas (including the current)?
Sure, I'd be happy to work on a PR for this step.

@rdblue
Copy link
Contributor

rdblue commented Sep 25, 2020

Yes, we would add schemas to track recent schemas (referenced by any known snapshot) and keep schema for now. We would also add current-schema-id to track which schema in schemas is the current one. Eventually, we will deprecate schema and just use the one identified by id.

@electrum
Copy link
Contributor

Should we read the old metadata files as a fallback for data written before this addition?

@rdblue
Copy link
Contributor

rdblue commented Sep 28, 2020

Should we read the old metadata files as a fallback for data written before this addition?

We could, but it doesn't seem worth the effort to maintain it to me. If other people disagree, I'm fine adding it.

@electrum
Copy link
Contributor

The question is what to do when we access a historical snapshot for older snapshots. It seems we have a few choices:

  1. Use the current schema, as we do today.
  2. Fail the query.
  3. Fallback to reading old metadata files.

Using the current schema seems like the worst choice, as that results in inconsistent behavior for reasons that are invisible to the user. Failing the query would break behavior that works today.

While I agree that having more code to handle older data is annoying, the complexity here seems low (~20 lines of code) relative to the benefit. We have much higher complexity in other areas, such as supporting non-projected identity columns for converted Hive tables (and that's not even covered by the specification).

@rdblue
Copy link
Contributor

rdblue commented Sep 29, 2020

While I agree that having more code to handle older data is annoying, the complexity here seems low (~20 lines of code) relative to the benefit.

I agree with that. And since we plan to make schema tracking required in v2, we can easily remove this work-around.

@rdblue
Copy link
Contributor

rdblue commented Sep 29, 2020

@wypoon, are you interested in working on both? We could update this PR to use the work-around you implemented and to update the Snapshot API (add Snapshot.schema()). Then we can add the new schema tracking separately. What do you think?

*
* @return this table's schema
*/
Schema schemaForSnapshotAsOfTime(long timestampMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of changing the table API in this commit, I think we only need to add a schema method to Snapshot. That will be used later when we have better tracking. Not adding too many methods to the public API in this commit will give us more flexibility later and make this simpler to get in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can avoid adding the schemaForSnaphot methods to the Table API. I think only BaseTable needs to have the methods. However, the places where I need to call schemaForSnapshot have references to simply Table; I'd have to check if the Table is an instance of BaseTable.
I also suggested in the general comments that Snapshot could simply have a schemaId method in the future. And that would then get used in a revised implementation of schemaForSnapshot in BaseTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with you. We have some flexibility here with how we look up the schema for a given snapshot. The main thing is that we don't want to expose any methods through the Table API right now. And since we don't have a schema id yet, we can't really add that method to Snapshot.

Eventually, I think we would want Snapshot to return its own schema rather than requiring a lookup, which is why I suggested that option. But we might be able to avoid the problem for now.

Like I said, I just don't think we want to add anything to Table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the methods from Table, adding them only to BaseTable. Thus the Table API is unchanged.

this.schema = SparkSchemaUtil.prune(getTableSchema(), requestedSchema, filterExpression(), caseSensitive);
} else {
this.schema = table.schema();
this.schema = getTableSchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to updates here, we will need to update schema handling in BaseTableScan so that the projection returned by the scan is based on the schema of the Snapshot that will be scanned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only point I don't understand. As far as I can tell, BaseTableScan is constructed with the user's requested schema. If the user is wanting to read from an old snapshot, then they should be supplying a requested schema compatible with the snapshot's schema. I couldn't see anything I needed to change in BaseTableScan. If I'm missing something here, can you please point out to me the parts I need to change?

I added a new test case where a requested schema is used, and that revealed a bug in the spark2 IcebergSource#createReader. In that method, there is a call to SparkSchemaUtil.convert(Schema, StructType) whose only purpose seems to be to check that the requested StructType does not contain fields in the Schema (for then the call would cause an exception), and here I needed to use the snapshot Schema rather than the table Schema. Other than that, the projection returned by the scan appears to be correct.

TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
}

private Schema getTableSchema() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method name could be more descriptive. The verb "get" doesn't add anything so we don't normally use it. And this is also not the table schema, it is a snapshot's schema. How about renaming this to snapshotSchema()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to snapshotSchema as you suggest.


// Build Spark table based on Iceberg table, and return it
return new SparkTable(icebergTable, schema);
return new SparkTable(icebergTable, schema, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that SparkTable should include a snapshotId so that it returns the correct snapshot's schema to the analyzer, when possible. I don't think that it needs to include these options, when no other options are used by the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted snapshotId and asOfTimestamp from options here if they appear and pass the values to SparkTable in its constructor instead of having SparkTable do that. I honestly think it's just half a dozen of one and six of the other.

@edwinchoi
Copy link

I think you need to check .snapshots() not .snapshotLog(). The latter seems to only retain the history of changes since the table was created/replaced. You can see in TableMetadata.buildReplacement that the prior history entries are not retained in the replacement.

Also, from what I can tell, the metadata is always refreshed after the snapshot has been created (e.g., SnapshotProducer.commit(), BaseTransaction.commitTransaction(), etc... each refresh the metadata as part of the commit protocol). If that is always the case, then the loop should match the Snapshot with the first MetadataLogEntry where MetadataLogEntry.timestampMillis() >= Snapshot.timestampMillis(), and not the last MetadataLogEntry.timestampMillis() <= Snapshot.timestampMillis().

As a sanity check, the code could check the expected snapshot-id against the TableMetadata that is read.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 29, 2020

Thanks everyone for your interest in this work!
@rdblue, I think it makes sense to get a version of this PR in first, and separately, do the work to associate a schema with each snapshot, while aligning this PR with the other work as much as possible.
To check that we're on the same page, your original suggestion was to add a list schemas to table metadata, add a schema-id to each schema, and add a schema-id to each snapshot. When this is done, Snapshot should have a schema() method.
IIUC, the schema still has to be read from the table metadata file, referenced using the schema-id from the snapshot. Thus an implementation of Snapshot would need a reference to the table metadata, correct?
Right now, to get the schema, I need to start from the table metadata file. There is currently no obvious way to get from snapshot to table metadata. I'd need to change the implementation of Snapshot to have a reference to the current metadata. Does that make sense to you? Or am I missing something?

@rdblue
Copy link
Contributor

rdblue commented Sep 29, 2020

Good observations, @edwinchoi. I'm not sure why the snapshot log is cleared out when a table is replaced. That log should track the snapshot that was current a given time, which seems like the right one to use. We use it for time travel queries, for example. But since the log is not there, we can use the metadata file log instead as you suggested to find the snapshot that was current at some time.

We should not use the list of snapshots because there is no guarantee that any given snapshot was ever the current version (like a branch in git that is never the main branch).

@rdblue
Copy link
Contributor

rdblue commented Sep 29, 2020

There is currently no obvious way to get from snapshot to table metadata

Yes, we will need to have a way to get the right schema from table metadata by ID. I think what might make sense for now is to pass a Function<Long, Schema> in that does the lookup by snapshot ID. Then the schemaForSnapshot method could be package-private and passed into the Snapshot through its constructor. All of this would be package-private so we can change it later as we need to.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 29, 2020

There is currently no obvious way to get from snapshot to table metadata

Yes, we will need to have a way to get the right schema from table metadata by ID. I think what might make sense for now is to pass a Function<Long, Schema> in that does the lookup by snapshot ID. Then the schemaForSnapshot method could be package-private and passed into the Snapshot through its constructor. All of this would be package-private so we can change it later as we need to.

It seems to me that it'd be simpler if in the future, Snapshot would have a schemaId() method rather than a schema() method; since the places that need to obtain a schema for a Snapshot already have a reference to TableMetadata at hand, and only need to call Snapshot.schemaId() and then use it to look up the schema in the TableMetadata. So the implementation of getSchemaForSnapshot(long) in BaseTable can stay the way it is now, and change later when Snapshot.schemaId() becomes available.
What do you think?

@rdblue
Copy link
Contributor

rdblue commented Sep 30, 2020

I posted in a comment above, but I'll echo it here: I don't think we should be adding anything to the Table API that we are going to want to remove later. Since we don't have schema ids, I don't think we can add the API that we eventually want right now. Adding Snapshot.schema seems like something we can do earlier and that would be the most friendly API long term. That's why I suggest it. We can do something else, as long as we are careful and only add non-public methods.

@edwinchoi
Copy link

@rdblue, how would a snapshot be recorded in the metadata if it weren't at some point current? Intermediate snapshots are removed on commit, so I would imagine the only case where the metadata could refer to snapshots that were never current is if the commit were rejected. In that case, the snapshots wouldn't be reachable from the current commit. What am I missing?

Example:

-- S1
CREATE TABLE test.ns.tbl USING iceberg AS
SELECT * FROM VALUES (1, "Alice"), (2, "Bob") AS (id, fname);

-- S2
CREATE OR REPLACE TABLE test.ns.tbl USING iceberg AS
SELECT * FROM VALUES (1, 5, "alice"), (2, 3, "bob") AS (id, name_len, name) ;

-- S3
INSERT INTO test.ns.tbl VALUES (3, 5, "carol")

The table test.ns.tbl has 3 valid states. So we should be able to read each one individually. Table.history(), after all 3 statements have executed, only returns entries for S2 and S3; while Table.snapshots() returns all 3.

You can rewind to S1, so it seems time-travel is using snapshots not the log. Trying to read the data will fail since it tries to cast the 2nd field to an integer, but it will succeed if you rewrite the snippet to use (id, name, name_len).

@rdblue
Copy link
Contributor

rdblue commented Sep 30, 2020

There are a few cases where a snapshot might not have been the current state:

  1. For the write-audit-publish (WAP) pattern, there is an option to only stage a commit and not update the table's current-snapshot-id. In this case, the writer updates the table by creating a new snapshot. Then an auditor reads the snapshot and validates it (with row counts, for example), and if the snapshot looks good, commits the snapshot as the current table state. This allows reports to be validated before going live.
  2. Single-table transactions will create individual snapshots, but the table state goes directly to the latest snapshot when the transaction commits.

Also, the table state can be rolled back to a previous snapshot and new commits will form a new history afterwards.

@edwinchoi
Copy link

Thanks for elaborating. My thoughts...

For the write-audit-publish (WAP) pattern, there is an option to only stage a commit and not update the table's current-snapshot-id. In this case, the writer updates the table by creating a new snapshot. Then an auditor reads the snapshot and validates it (with row counts, for example), and if the snapshot looks good, commits the snapshot as the current table state. This allows reports to be validated before going live.

I don't think WAP is working as expected under RTAS.

spark.sql("""
CREATE TABLE test.ns.tbl
USING iceberg
TBLPROPERTIES ('write.wap.enabled'='true')
AS SELECT * FROM VALUES (1, "Alice"), (2, "Bob") AS (id, fname)
""")
spark.conf.set("spark.wap.id", "12345")
spark.sql("""
CREATE OR REPLACE TABLE test.ns.tbl
USING iceberg
AS SELECT * FROM VALUES (1, 5, "alice"), (2, 3, "bob") AS (id, name_len, name)
""")
spark.conf.unset("spark.wap.id")

After running this, the schema from the staged change is showing up but the data that should exist isn't accessible, i.e., DESC test.ns.tbl shows the new schema and SELECT * FROM test.ns.tbl is coming up empty. Even under a simple schema change ALTER TABLE ... ADD COLUMN ..., the change is taking effect prematurely.

Also, the table state can be rolled back to a previous snapshot and new commits will form a new history afterwards.

I don't believe this changes the notion of what was current at some point in time. If you view the timeline as being from the database's point of view, then rolling back doesn't change the fact that at some point in time, a snapshot, that is now inaccessible, was visible in the database.

@rdblue
Copy link
Contributor

rdblue commented Oct 1, 2020

I don't think WAP is working as expected under RTAS.

You're right. I think we should disable WAP with RTAS until we figure out what the behavior should be.

I don't believe [a rollback] changes the notion of what was current at some point in time.

That's right, but it is an example of not being able to simply use the list of snapshots. This was a side point that supports the idea that point in time lookup should be done using the snapshot log, or metadata file log if it is not available, and snapshot lookup should be done strictly by id.

@wypoon wypoon force-pushed the schema-for-snapshot branch from 2a9bc7a to d34fdbd Compare October 8, 2020 04:34
@wypoon
Copy link
Contributor Author

wypoon commented Oct 8, 2020

I think you need to check .snapshots() not .snapshotLog(). The latter seems to only retain the history of changes since the table was created/replaced. You can see in TableMetadata.buildReplacement that the prior history entries are not retained in the replacement.

Also, from what I can tell, the metadata is always refreshed after the snapshot has been created (e.g., SnapshotProducer.commit(), BaseTransaction.commitTransaction(), etc... each refresh the metadata as part of the commit protocol). If that is always the case, then the loop should match the Snapshot with the first MetadataLogEntry where MetadataLogEntry.timestampMillis() >= Snapshot.timestampMillis(), and not the last MetadataLogEntry.timestampMillis() <= Snapshot.timestampMillis().

As a sanity check, the code could check the expected snapshot-id against the TableMetadata that is read.

My understanding from @rdblue's replies is that one should use the snapshot log, so I have stuck with that. Also, from what I see, the metadata timestamp is always the same as the snapshot timestamp when the metadata is written for a new snapshot. I changed the loop to look for the metadata log entry whose timestamp == the snapshot timestamp (and break out of the loop). (I could also use >= as you suggest, but it should make no difference.)

FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
fs.delete(tablePath, true);
catalog.dropTable(currentIdentifier, false);
if (currentIdentifier != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code used to be in the spark common module before Anton removed it. On rebase, it was moved to v2.4, and I did not scrutinize what happened as there were no conflicts to resolve and tests passed.
This is actually a fix for a latent bug that was exposed when I skipped tests. Every test in TestIcebergSourceTablesBase calls createTable, which in this subclass sets the static currentIdentifier. At the end of each test, this @After method drops the table identified by currentIdentifier. If the test is skipped, this @After method is still run, and will fail because the table was already dropped after the previous test.
My impetus was to skip the tests I added when testing Spark 3. Now that the code is duplicated and the v3.0 version of TestIcebergSourceTablesBase does not have the added tests, test skipping using Assume is not necessary. Neverthelss, I'd argue that this is still a bug and I propose to keep the fix here and add it to v.3.0 as well.
Your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense.

@rdblue
Copy link
Contributor

rdblue commented Oct 20, 2021

I think this is almost ready. Just a few minor things to fix.

wypoon and others added 13 commits October 21, 2021 12:22
According to Edwin Choi, in order to get the schema for a snapshot,
the only safe option is to scan the metadata files to find the one
where the current-snapshot-id matches target snapshot id.
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
Rebased on master.
Use constants from SparkReadOptions.
Implement snapshotSchema() in SparkFilesScan as it extends SparkBatchScan.
Avoid introducing new methods to BaseTable.
Add helper methods to SnapshotUtil instead.
Move recovery of schema from previous metadata files in the event
that snapshot does not have associated schema id to new PR.
Remove snapshotSchema method from SparkBatchScam and its subclasses,
as it is not needed.
Adjust schema in BaseTableScan when useSnapshot is called.
Use the existing CatalogAndIdentifier and swap out the Identifier for a
snapshot-aware TableIdentifier if snapshotId or asOfTimestamp is set.
Fix a bug in BaseTableScan#useSnapshot.
Some clean up in SnapshotUtil.
Some streamlining in added unit tests.
Refactor spark2 Reader to configure the TableScan on construction,
and let the TableScan get the schema for the snapshot.
Rename new TableIdentifier to SparkTableIdentifier to avoid confusion
with existing TableIdentifier (in different package).
Add convenience constructor to PathIdentifier to avoid modifying tests
for it.
…ableScan.

Use SnapshotUtil.snapshotIdAsOfTime in BaseTableScan#asOfTime.
Move formatTimestampMillis from BaseTableScan to SnapshotUtil in order to
use it there (BaseTableScan is a package-private and not a public class).
Fix some error messages.
@wypoon wypoon force-pushed the schema-for-snapshot branch from 290c4a0 to afec25c Compare October 21, 2021 19:51
Remove some unused code.
Sync v3.0 with changed code in v2.4.
Comment on lines -1114 to +1115
.option("snapshot-id", String.valueOf(firstCommitId))
.option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep v3.0 in sync with v2.4 to the extent applicable.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 21, 2021

@rdblue are we gating changes to spark/v3.0 until v3.2 is added? If so, I can remove the v3.0 changes here. If you really want, I can remove the fix for TestIcebergSourceHiveTables altogether and submit a separate PR for it after the dust settles on the Spark 3 reorg. I'd really like this change to go in sooner than later.

@rdblue rdblue merged commit ffee32c into apache:master Oct 21, 2021
@rdblue
Copy link
Contributor

rdblue commented Oct 21, 2021

Thanks @wypoon! Looks great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants