Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class HoodieCommonConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
.defaultValue(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially was not in favor of this change, but now thinking about it a little more and especially in the light of #6358, i think this is the right thing to do: for ex, after #6358, we'd be allowing to go writes, which might have columns dropped in the new batch. Now, there are 2 scenarios based on whether the reconciliation is enabled or not:

  1. If reconciliation is enabled: we will be favoring table's schema and use it as a writer-schema. So in that case we will rewrite the incoming batch into the table's schema before applying it to the table.

  2. If reconciliation is disabled: we will be favoring incoming batch's schema and use it as a writer-schema. In this case, for ex, for COW, we will be reading the table in its existing schema, but the new base files will be written in the writer's schema (ie w/ the column dropped)

Both of these approaches are legitimate and could be preferred in different circumstances. What's important here for us is to pick the right default setting that would minimize the surprise effect.

Having reflected on this for some time now i think, that enabling reconciliation by default makes more sense as it protects table's schema from accidental mishaps in the incoming batches. And if somebody prefers the flow #2 the could easily opt-in for it by simply disabling the reconciliation.

WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. That was precisely the intention behind flipping this default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, in its current form reconciliation doesn't allow to evolve the schema (unless comprehensive Schema Evolution is enabled) since it'll be essentially just favoring the table's schema always (there's no way for you to add new column for ex, other than switching off reconciliation)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the difference mainly around the case of dropping a column?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m thinking whether we should decouple the handling of drop column separately instead of turning on “schema reconciliation” by default, e.g., we should still allow new columns to be added instead of dropping them to favor table’s schema by default, while properly handling the column drop (maybe a different config?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua it's more of a discussion what should be the default behavior:

  • Should we (by default) favor existing table's schema a SoT and rewrite incoming batch into it (unless Schema Evolution is enabled, in that case we will try to evolve the schema)
  • Should we (by default) favor incoming batch's schema as the schema we want table to be rewritten in

I still think that the #1 is a safer option as a default (optimizing for least amount of surprise to the user)

Copy link
Contributor

@alexeykudinkin alexeykudinkin Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazdy i think we just need to clearly disambiguate our configuration to make sure users can clearly understand what they can achieve and how (see my previous comment #6196 (comment)): what you're describing could be achieved today enabling Reconciliation and Schema Evolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin afaik Schema Evolution config is there because it's an experimental feature and soon it will become GA? Then this config should be enabled by default or deprecated, will this logic hold then? I feel like hudi config is already very broad and therefore a bit hard to grasp and users would appreciate if it was one switch instead of a combination of two

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazdy correct, when Schema Evolution will become GA (cc @xiarixiaoyao) we will be flipping it to be on by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin @kazdy , now schema evolution cannot read by hive and presto, but we aready has pr to support that
#6989
prestodb/presto#18557
#7045

once those pr merged, i think it will be ok.

.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
// NOTE: catalog is the source of truth for schema, so schema reconsiliation is disabled.
RECONCILE_SCHEMA.key -> "false"
)
.filter { case (_, v) => v != null }
}
Expand Down