Skip to content

Conversation

@pan3793
Copy link
Member

@pan3793 pan3793 commented May 29, 2023

This PR aims to make IcebergSource extends SessionConfigSupport to improve the Spark DataSource v2 API coverage.

/**
 * A mix-in interface for {@link TableProvider}. Data sources can implement this interface to
 * propagate session configs with the specified key-prefix to all data source operations in this
 * session.
 *
 * @since 3.0.0
 */
@Evolving
public interface SessionConfigSupport extends TableProvider {

  /**
   * Key prefix of the session configs to propagate, which is usually the data source name. Spark
   * will extract all session configs that starts with `spark.datasource.$keyPrefix`, turn
   * `spark.datasource.$keyPrefix.xxx -> yyy` into `xxx -> yyy`, and propagate them to all
   * data source operations in this session.
   */
  String keyPrefix();
}

It allows to set read/write options by setting Spark session configuration when using the DataFrame API to read/write tables. For examples,

// set write option through session configuration
spark.sql("set spark.datasource.iceberg.<write-opt-key>=<value>")

spark.write
  .format("iceberg")
  .option("<write-opt-key>", "<value>") // equivalent w/ the above SET statement
  ...

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, excited to use SQL syntax to influence spark write options. I am curious to see if similar can be applied to DELETE and MERGE INTO which can only be achieved via SQL today in iceberg

@pan3793
Copy link
Member Author

pan3793 commented Jun 1, 2023

@dramaticlly thanks, I refined the test code.

@pan3793
Copy link
Member Author

pan3793 commented Jun 6, 2023

Kindly ping @RussellSpitzer @aokolnychyi @rdblue

@github-actions
Copy link

github-actions bot commented Sep 2, 2024

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Sep 2, 2024
@github-actions
Copy link

github-actions bot commented Sep 9, 2024

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, small suggestion for the test, let me know what you think


withSQLConf(
// set write option through session configuration
ImmutableMap.of("spark.datasource.iceberg.overwrite-mode", "dynamic"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion, wdyt to test with snapshot-property and assert that its set explicitly? It may make the test a bit more clear without needing to understand what overwrite-mode is?

Copy link
Member Author

@pan3793 pan3793 Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho Hmm.. sorry I don't get your point.

Let me explain my idea briefly, the test case should cover both the read and write paths:

  1. create a table, write some data into the table, and record the snapshot as s1
  2. overwrite the table with dynamic overwrite mode (test setting write options through session conf) and check the current snapshot of the table
  3. read the table from the snapshot s1 (test setting read options through session conf) and check the data

Copy link
Member

@szehon-ho szehon-ho Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea sorry, i was not clear. Its just a suggestion.

I think, because the test is to test SessionConfigSupport functionality only, it may be more clear for the reader if the first check (on write part) is like:

'spark.datasource.iceberg.snapshot-property.foo=bar'
and then check if foo is set on latest snapshot summary?

Because i think the reader of the test need to know what is 'dynamic overwrite' mode to understand the assert (its not related to the feature), whereas the above is a bit more self-explanatory imo.

I think the read part is decently understandable without additional context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed description. I am educated because I didn't know that Iceberg can add custom snapshot properties through options. I update the test case to follow the suggestions.

@github-actions github-actions bot removed the stale label Nov 18, 2024
@pan3793 pan3793 force-pushed the SessionConfigSupport branch from 75d88fa to b21335a Compare November 18, 2024 15:40
@pan3793
Copy link
Member Author

pan3793 commented Nov 18, 2024

rebased on the latest main branch


withSQLConf(
// set write option through session configuration
ImmutableMap.of("spark.datasource.iceberg.overwrite-mode", "dynamic"),
Copy link
Member

@szehon-ho szehon-ho Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea sorry, i was not clear. Its just a suggestion.

I think, because the test is to test SessionConfigSupport functionality only, it may be more clear for the reader if the first check (on write part) is like:

'spark.datasource.iceberg.snapshot-property.foo=bar'
and then check if foo is set on latest snapshot summary?

Because i think the reader of the test need to know what is 'dynamic overwrite' mode to understand the assert (its not related to the feature), whereas the above is a bit more self-explanatory imo.

I think the read part is decently understandable without additional context.

* over any namespace resolution.
*/
public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions {
public class IcebergSource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another comment, is it now multiple ways to configure properties (including #4011), it may be confusing to user. Worth to add a documentation about it, listing the precedence, ie:

I guess using dataframe API (to be double-checked)

  • explicit dataframe option
  • dataframe session default
  • if table exists, explicit table option
  • if table exists, table default

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the docs and hope it's clear now.

@github-actions github-actions bot added the docs label Nov 19, 2024
### Write options

Spark write options are passed when configuring the DataFrameWriter, like this:
Spark write options are passed when configuring the DataFrameWriterV2, like this:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the example with DataFrameWriterV2 because

The v1 DataFrame `write` API is still supported, but is not recommended.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @pan3793 i think it looks good, but it looks like it may take a few iterations on the doc part. Should we leave it for another pr to make this smaller? (can get the functionality in first)

| vectorization-enabled | As per table property | Overrides this table's read.parquet.vectorization.enabled |
| batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size |
| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
Iceberg 1.8.0 and later support setting read options by Spark session configuration `spark.datasource.iceberg.<key>=<value>`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good, but was also thinking of adding a section for priority as well as mentioned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be in its own section, like "session level configuration"?

when using DataFrame to read Iceberg tables, for example: `spark.datasource.iceberg.split-size=512m`, it has lower priority
than options explicitly passed to DataFrameReader.

| Spark option | Default | Description |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can revert change to this table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was auto-formatted by IDEA, reverted

@pan3793 pan3793 force-pushed the SessionConfigSupport branch from 362ccac to ea7a515 Compare November 21, 2024 09:14
@pan3793
Copy link
Member Author

pan3793 commented Nov 21, 2024

@szehon-ho WDYT of the current state? I keep the docs change minimally in this patch.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pan3793 for minimizing the changes. I think the doc can still be improved , i put the comments.

But I think it'd be faster if you want to get the code changes in first, to split it in another PR.

| vectorization-enabled | As per table property | Overrides this table's read.parquet.vectorization.enabled |
| batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size |
| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
Iceberg 1.8.0 and later support setting read options by Spark session configuration `spark.datasource.iceberg.<key>=<value>`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be in its own section, like "session level configuration"?

.append()
```

Iceberg 1.8.0 and later support setting write options by Spark session configuration `spark.datasource.iceberg.<key>=<value>`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we extract to its own section, no need to repeat it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I write it here because it's "Write options", actually, Spark has different concepts to allow the format/extensions to control the behavior, i.e. table properties, session configurations, options.

.table("catalog.db.table")
```

Iceberg 1.8.0 and later support setting read options by Spark session configuration `spark.datasource.iceberg.<key>=<value>`
Copy link
Member

@szehon-ho szehon-ho Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we need new section like 'Configuration Priority' where we can explain the order of precedence:
DataFrame Writes:

  • explicit dataframeWriter option
  • dataframe session default
  • if table exists, explicit table option
  • if table exists, table default

DataFrame Reads:

  • explicit dataFrameReader option
  • dataframe session default
  • if table exists, explicit table option
  • if table exists, table default

(please double check)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hesitate to write such a section because the situation looks more complex, some configurations are allowed to be set by dedicated session configuration, for example

  public boolean localityEnabled() {
    boolean defaultValue = Util.mayHaveBlockLocations(table.io(), table.location());
    return confParser
        .booleanConf()
        .option(SparkReadOptions.LOCALITY)
        .sessionConf(SparkSQLProperties.LOCALITY)
        .defaultValue(defaultValue)
        .parse();
  }

This reverts commit ea7a515.
This reverts commit d3b9f9d.
@pan3793
Copy link
Member Author

pan3793 commented Nov 22, 2024

@szehon-ho I made a minor change on assertion statement after your approval, also create two backports PR for Spark 3.3 and 3.5, thanks for your detailed review.

@szehon-ho
Copy link
Member

Sure, thanks its a good catch, assertThat is better.

@szehon-ho szehon-ho merged commit 9cc13b1 into apache:main Nov 23, 2024
31 checks passed
@szehon-ho
Copy link
Member

Merged, thanks @pan3793

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants