Skip to content

Conversation

@ajantha-bhat
Copy link
Member

@ajantha-bhat ajantha-bhat commented Apr 30, 2022

ALL_MANIFESTS computation is a heavy operation when lot of snapshots exist as it involves reading of the the manifest list file for every snapshot.
We are currently having duplicate computation of the same in many places in spark actions.

This PR aims to improve the performance of spark actions and stored procedures by computing once and reusing it in other locations using dataset.persist().

@github-actions github-actions bot added the spark label Apr 30, 2022
@ajantha-bhat
Copy link
Member Author

cc: @RussellSpitzer , @rdblue

.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
Dataset<Row> allManifests = loadMetadataTable(staticTable, ALL_MANIFESTS);
return withFileType(buildValidContentFileDF(staticTable, allManifests), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(allManifests), MANIFEST))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this change the performance? Don't we have to compute all manifests in both locations here? Or Does changing to an object let Spark cache the relation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loadMetadataTable was called twice, now it is only once for ALL_MANIFESTS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or Does changing to an object let Spark cache the relation?

I thought compute of dataset will happen in the first action and the results are reused for both the steps.

Copy link
Member Author

@ajantha-bhat ajantha-bhat May 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer : I got what you meant now.

I will fix by adding persist(), it will cache and reuse it. Finally it will avoid two time reading the manifest list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed and verified the scanning by adding and checking logs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes I should have been more clear, I was referring to the fact that the dataset would be recomputed on both lines. The "loadMetadataTable" function should be very fast but the actual planning should be expensive and that would require a cache of some kind.

I'm a little worried in general about persisting things since I want to make sure we clean up our caches asap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thing to worry about here is what is the additional cost of the persist operation here. Running a persist is not free and we need to check for sure that doing this cache is cost effective. In my experience 3 uses of a persisted df is most of the time worth it, but 2 sometimes is not (Very much dependent on the computation leading up to the cached df)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very much dependent on the computation leading up to the cached df

Because it involves the IO operation, it will definitely help when there are hundreds or thousands of snapshots.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer , which is better? a cache or manually calling dataset.collect() on that allManifest df and building new dataset on top of those collected rows and reusing it on both the location?

Copy link
Member

@RussellSpitzer RussellSpitzer May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat The problem is that persisting does another IO serialization of what should be basically the same amount of information but hopefully in a more readable form. Persist by default is a Memory and Disk based cache. You really need to test it out to be sure.

For Cache vs Collect. Cache is probably much better, pieces would be stored on executors and the IO would hopefully be mostly local, doing a Collect and and building a DF from it would essentially bring all data back to the driver serialize, then deserialize and send everything back out. This is always worse than cache/persist

@ajantha-bhat ajantha-bhat marked this pull request as draft May 2, 2022 05:15
Column joinCond = nameEqual.and(actualContains);
List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
.as(Encoders.STRING())
.unpersist()
Copy link
Member

@RussellSpitzer RussellSpitzer May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this will trigger the unpersist on "allManifests". It would only trigger it on the join result, this shouldn't cascade.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I figured that out, but didn't figured out the final solution. So, changed to draft :)

But as you said, I need to call allManifests table again I guess.


// determine expired files
this.expiredFiles = originalFiles.except(validFiles);
this.expiredFiles = originalFiles.except(validFiles).unpersist();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again I believe if you want to get rid of the cache I do think you need to call that the allManifests table

@szehon-ho
Copy link
Member

szehon-ho commented May 5, 2022

@RussellSpitzer pointed me to this, I had a pr is orthogonal to this, to avoid duplicate computation of all_reachable_files here #3457 To me that was the bigger time consumer (exploring all reachable files), though maybe I need to re-do that pr. Wasn't sure how much bottleneck getting all_manifests was.

Anyway, agree with @RussellSpitzer that maybe cache is a better option than persist? It'd be great to see some numbers for tables with huge snapshots for these two options vs today, if possible. I think if , if we go with this approach, it should probably be 1) configurable , 2) able to be GC'ed sooner than later.

@ajantha-bhat
Copy link
Member Author

@szehon-ho :

@RussellSpitzer pointed me to this, I had a pr is orthogonal to this, to avoid duplicate computation of all_reachable_files here #3457 To me that was the bigger time consumer (exploring all reachable files), though maybe I need to re-do that pr. Wasn't sure how much bottleneck getting all_manifests was.

yeah, scanning the all_manifest table twice was the major problem for me.

Anyway, agree with @RussellSpitzer that maybe cache is a better option than persist? It'd be great to see some numbers for tables with huge snapshots for these two options vs today, if possible. I think if , if we go with this approach, it should probably be 1) configurable , 2) able to be GC'ed sooner than later.

Sure, I will make it configurable option to cache or not and get the performance report locally with large number of snapshots. I will work on this over this weekend.

@szehon-ho
Copy link
Member

Also FYI @aokolnychyi , if you wanted to take a look as well

@ajantha-bhat
Copy link
Member Author

@RussellSpitzer, @szehon-ho, @aokolnychyi: PR is ready for review.

a) I tested with local file system and local test case in IDE, without cache (22ms), with cache (18ms) with 1000 manifest list files to read.
Not so much difference, probably because it is local file system. As PR avoids reading manifest list files again (verified with logs), in S3 file system probably better difference can be seen with huge number of snapshots.

@Test
  public void testExpireSnapshotUsingNamedArgs() {
    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);

    for (int i = 0; i < 1000; i++) {
      sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
    }

    Table table = validationCatalog.loadTable(tableIdent);

    Assert.assertEquals("Should be 1000 snapshots", 1000, Iterables.size(table.snapshots()));

    waitUntilAfter(table.currentSnapshot().timestampMillis());

    Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));

    long time = System.currentTimeMillis();

    List<Object[]> output = sql(
        "CALL %s.system.expire_snapshots(" +
            "older_than => TIMESTAMP '%s'," +
            "table => '%s'," +
            "retain_last => 999, use_caching => false)",
        catalogName, currentTimestamp, tableIdent);

    System.out.println("time taken in ms: " + (System.currentTimeMillis() - time));
  }

  time taken in ms: 18478  -- with cache
  time taken in ms: 22589  -- without cache

b) Also in the base code I found we use caching for RewriteManifestsProcedure, so I used the same pattern and syntax (keeping cache as default).
Also found a bug in the base code and raised PR #4722

c) I have some more plans for improving the expire_snaphots (different idea than #3457), I will work in the subsequent PR.

Idea is that add a new column "from_snapshot_id" while preparing the actual files, then filter out (NOT IN filter) the expired snapshot ids rows from persisted output (without scanning again) and then same logic of df.except() to find the expired files.

@ajantha-bhat ajantha-bhat marked this pull request as ready for review May 8, 2022 12:06
action.retainLast(retainLastNum);
}

if (maxConcurrentDeletes != null && maxConcurrentDeletes > 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxConcurrentDeletes > 0 is already checked in the preconditions above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's best to leave changes like this for a separate cleanup pr

@szehon-ho
Copy link
Member

szehon-ho commented May 10, 2022

c) I have some more plans for improving the expire_snaphots (different idea than #3457), I will work in the subsequent PR.

Idea is that add a new column "from_snapshot_id" while preparing the actual files, then filter out (NOT IN filter) the expired snapshot ids rows from persisted output (without scanning again) and then same logic of df.except() to find the expired files.

The problem I think is that there's not many Iceberg utilities to project anything other than partition filter to do the filtering . I spent some time to look again, and tried to use time-travel which is effectively snapshot-filtering in #4736 but unfortunately it didn't work as manifest table does not support it. You can take a look if that also makes sense to pursue that path (to implement time-travel on manifest table).

Anyway look forward to working together on this.

@ajantha-bhat
Copy link
Member Author

@RussellSpitzer , @szehon-ho : What you guys think about this PR?

.as(Encoders.STRING())
.collectAsList();

if (useCaching) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be in a finally block, otherwise if we have an error in the collect as list there is a possibility that we do not uncache. For services running these actions that would be a problem.

@RussellSpitzer
Copy link
Member

@RussellSpitzer, @szehon-ho, @aokolnychyi: PR is ready for review.

a) I tested with local file system and local test case in IDE, without cache (22ms), with cache (18ms) with 1000 manifest list files to read. Not so much difference, probably because it is local file system. As PR avoids reading manifest list files again (verified with logs), in S3 file system probably better difference can be seen with huge number of snapshots.

@Test
  public void testExpireSnapshotUsingNamedArgs() {
    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);

    for (int i = 0; i < 1000; i++) {
      sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
    }

    Table table = validationCatalog.loadTable(tableIdent);

    Assert.assertEquals("Should be 1000 snapshots", 1000, Iterables.size(table.snapshots()));

    waitUntilAfter(table.currentSnapshot().timestampMillis());

    Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));

    long time = System.currentTimeMillis();

    List<Object[]> output = sql(
        "CALL %s.system.expire_snapshots(" +
            "older_than => TIMESTAMP '%s'," +
            "table => '%s'," +
            "retain_last => 999, use_caching => false)",
        catalogName, currentTimestamp, tableIdent);

    System.out.println("time taken in ms: " + (System.currentTimeMillis() - time));
  }

  time taken in ms: 18478  -- with cache
  time taken in ms: 22589  -- without cache

b) Also in the base code I found we use caching for RewriteManifestsProcedure, so I used the same pattern and syntax (keeping cache as default). Also found a bug in the base code and raised PR #4722

c) I have some more plans for improving the expire_snaphots (different idea than #3457), I will work in the subsequent PR.

Idea is that add a new column "from_snapshot_id" while preparing the actual files, then filter out (NOT IN filter) the expired snapshot ids rows from persisted output (without scanning again) and then same logic of df.except() to find the expired files.

The benchmarks really need to be on datasets that take in the "minutes" to run if we really want to see the difference. The key would be to setup a test with thousands or tens of thousands of manifests.

result = deleteFiles(reachableFileDF.collectAsList().iterator());
}
if (useCaching) {
allManifests.unpersist();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be in a finally block

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allManifestsBefore = loadMetadataTable(staticTableBefore, ALL_MANIFESTS);
useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
if (useCaching) {
allManifestsBefore.persist();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want to do the "memory only" cache here (and in all the other usages) ... but I'm not sure

@ajantha-bhat
Copy link
Member Author

closing as this is stale and we have improved the performance by other PRs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants