Skip to content

Conversation

@codope
Copy link
Member

@codope codope commented Jan 19, 2022

What is the purpose of the pull request

(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vinothchandar vinothchandar added the rfc Request for comments label Jan 19, 2022
We introduce a new action `index` which will denote the index building process,
the mechanics of which is as follows:

1. From an external process, users can issue a CREATE INDEX or similar statement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should support this via regular writers as well. just scheduling. it will make life easier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean for inline scheduling like how we have for other table services?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

inflight writer, that is just about to commit concurrently, has a very
high chance of seeing the indexing plan and aborting itself.

We can just introduce a lock for adding events to the timeline and these races
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a single write mode, users may not have configured any lock service and we don't enforce one as of today. something to keep in mind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to clearly document these, along with other operations that cannot be performed without lock provider configured. As safety, should the indexer always error out if there there is no lock provider configured?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should error out. I tried to think of a way without taking any lock but we need this minimal locking. We should call it out in documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for requiring locking.

Having wrong or missing data from the MDT is very difficult to debug in the long run and can have serious data quality issues. Also, anyone having enough scale to be requiring asyc indexing should be able to choose one of the many locking options available.

that. We will correct this during indexing action completion. In the
average case, this may not happen and the design has liveness.

3. When the indexing process is about to complete, it will check for all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not very clear what does the indexer does here apart from populating base files for all instants having commit time < t. for eg, what does indexer does to completed instants > t up until now ?

If I am not wrong, "check for all completed commit actions to ensure each of them added entries per its indexing plan" refers to all instants up until now. would be good to call it out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 high level indexer should only write the base files.

3. When the indexing process is about to complete, it will check for all
completed commit actions to ensure each of them added entries per its
indexing plan, otherwise simply abort after a configurable timeout. Let's
call this the **indexing check**.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, lets say this is the timeline when indexer started
C1, C2,.... C5 (inflight), C6, C7, C8. Start indexer.

indexer will build base file with all info until C4.
what happens to data pertaining to C6, C7, C8? these are already completed ones.
essentially indexer will go through every completed commit up until now and ensure they are applied to new partition. if its < C= C4, it goes to base file. if its> C4, it goes as delta commit is it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indexer should work upto C8. anything inflight between C4 and C8, has to do the "indexing check". This is a valid case that siva's pointing out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct and if the indexer times out before C5 completes then it will abort. Next time, indexing will start again with C4 as base instant and run indexing check again.

**Case 2: Indexer fails while writer is inflight**

Writer will commit adding log entries to the metadata partition. Indexer will
fetch the last instant for which indexing was done from `.index.inflight` file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would like to understand this more. so the indexer will keep updating the index.inflight meta file is it? like checkpointing ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer valid. To keep things simple, either an MDT partition is available or not available and this will be known through table config. There is some value in checkpointing, especially for indexes that take time, but depending on timeline adds more complexity and we will have to deal with more correctness issues.

We can just introduce a lock for adding events to the timeline and these races
would vanish completely, still providing great scalability and asynchrony for
these processes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see details on when exactly regular writers will start to make synchronous updates. Also, when exactly the callers can start using the new index that got built out? whats the source of truth. we can rely on timeline completed instant for the index, but after archival? also loading timeline everytime might be costly as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the indexing action completed, any MDT partition that is currently not being indexed, are considered ready for use

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some more details. Table config will be the source of truth.


b) Inflight writer about to commit, but indexing completed just before that.

In this case, since the indexer completed before the writer, so it has already
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will sync up directly. I don't get this fully.

@vinothchandar vinothchandar changed the title [HUDI-3225] Add RFC for async metadata indexing [HUDI-3225] [RFC-45] for async metadata indexing Jan 26, 2022
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to cover the "reindexing" scenario, where somehow new file slices are created for the MDT index partition, and older slices are cleaned/deleted.

need to carefully review the multi writer scenarios still, but thus far seems to be close to what the JIRA had.

I'd also ask for a review from @prashantwason

provider and only one writer can access MDT in read-write mode. Hence, any write
to MDT is guarded by the data table lock. This ensures only one write is
committed to MDT at any point in time and thus guarantees serializability.
However, locking overhead adversely affects the write throughput and will reach
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how the metadata indexing solves the multi-writer problem for MDT. Strictly speaking we just need table service scheduling on MDT by guarded by the lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metadata table is unique in the respect that each write to MDT will involve multiple partitions to be updated together in a transaction. So I do not see a truly parallel commit to MDT possible.


1. From an external process, users can issue a CREATE INDEX or similar statement
to trigger indexing for an existing table.
1. This will add a `<instant_time>.index.requested` to the timeline, which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indexing.requested ? all actions are verbs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index is a noun as well as verb.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd prefer index for brevity, and none of our action end with -ing. But let me know if you think indexing is more appropriate, i can change it.

that. We will correct this during indexing action completion. In the
average case, this may not happen and the design has liveness.

3. When the indexing process is about to complete, it will check for all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 high level indexer should only write the base files.

3. When the indexing process is about to complete, it will check for all
completed commit actions to ensure each of them added entries per its
indexing plan, otherwise simply abort after a configurable timeout. Let's
call this the **indexing check**.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indexer should work upto C8. anything inflight between C4 and C8, has to do the "indexing check". This is a valid case that siva's pointing out.

inflight writer, that is just about to commit concurrently, has a very
high chance of seeing the indexing plan and aborting itself.

We can just introduce a lock for adding events to the timeline and these races
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to clearly document these, along with other operations that cannot be performed without lock provider configured. As safety, should the indexer always error out if there there is no lock provider configured?

We can just introduce a lock for adding events to the timeline and these races
would vanish completely, still providing great scalability and asynchrony for
these processes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the indexing action completed, any MDT partition that is currently not being indexed, are considered ready for use

@codope codope force-pushed the rfc-45-async-index branch from 1cb4c54 to bd3b354 Compare March 9, 2022 04:55
@codope
Copy link
Member Author

codope commented Mar 9, 2022

@vinothchandar @nsivabalan This is ready for review again. Following has changed since the last review:

  • Filegroup in metadata partition will be initialized while scheduling index action. No new filegroup is initialized by writers while indexing is in progress.
  • Table config will be the source of truth for what metadata partitions are available, instead of relying on the commit metadata on timeline.

Copy link
Member

@prashantwason prashantwason left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

to trigger indexing for an existing table.
1. This will add a `<instant_time>.index.requested` to the timeline, which
contains the indexing plan.
2. From here on, the index building process will continue to build an index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be reflected by choosing the index timestamp as t?
E.g. t.index.requested ?

Table service operations on the metadata table usually take in the timestamp of the last op with a suffix - 001 for compaction, 002 for clean etc.

So it may be good to have this as t001. index.requested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can do that and can avoid little serde cost. It can also ease debugging. However, i should point out that index action will be written on the data timeline as it will be known to the user.

provider and only one writer can access MDT in read-write mode. Hence, any write
to MDT is guarded by the data table lock. This ensures only one write is
committed to MDT at any point in time and thus guarantees serializability.
However, locking overhead adversely affects the write throughput and will reach
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metadata table is unique in the respect that each write to MDT will involve multiple partitions to be updated together in a transaction. So I do not see a truly parallel commit to MDT possible.

We introduce a new action `index` which will denote the index building process,
the mechanics of which is as follows:

1. From an external process, users can issue a CREATE INDEX or similar statement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

inflight writer, that is just about to commit concurrently, has a very
high chance of seeing the indexing plan and aborting itself.

We can just introduce a lock for adding events to the timeline and these races
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for requiring locking.

Having wrong or missing data from the MDT is very difficult to debug in the long run and can have serious data quality issues. Also, anyone having enough scale to be requiring asyc indexing should be able to choose one of the many locking options available.

Writer will commit adding log entries to the metadata partition. However, table
config will indicate that partition is not ready to use. When indexer is
re-triggered, it will check the plan and table config to figure out which MDT
partitions to index and start indexing for those partitions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the indexer starts the next time, it will choose a different instant time. Hence, the older log blocks written are no longer valid. So I think each time the indexer starts (either the first time or after a failure), it should clean out the older file groups and create new ones (with newer instant time).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the plan. But, it will start from scratch only for the partitions that were partially indexed i.e. partitions for which table config was not updated in the last indexing. Table config update always happens at the end of indexing for a partition.

We don't want to start all over again for all the partitions. So, let's say at some t indexer was scheduled and it wrote t.index.requested with plan of indexing files and column_stats partitions. It completed files but failed midway for column_stats. Then table config will show that only files partition is available for reads/updates. When indexer starts the next time, it will see a pending index action, reads the plan as well as table config and figures out that only column_stats index is pending. Will clean the older filegroups for column_stats and choose the latest completed instant (without holes) on data timeline and create new filegroup and so on.

If this sounds right, I can update this example in the RFC.

re-triggered, it will check the plan and table config to figure out which MDT
partitions to index and start indexing for those partitions.

**Case 3: Race conditions**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another race condition possible:

  1. Writer is in inflight mode
  2. Indexer is starting and creating the file-groups. Suppose there are 100 file-groups to be created.
  3. Writer just finished and tries to write log blocks - it only sees a subset of file-groups created yet (as the above step 2 above has not completed yet). This will cause writer to incorrectly write updated to lesser number of shards.

In essence:

  1. Locking is required
  2. Indexer need to hold lock which creating the file-groups too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Intialization fo filegroups happen when index is scheduled. While scheduling we can take a lock. I'll update in RFC.

@codope codope force-pushed the rfc-45-async-index branch from 7afccec to 62db921 Compare March 11, 2022 12:04
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. few minor clarifications.

2. From here on, the index building process will continue to build an index
up to instant time `t`, where `t` is the latest completed instant time on
the timeline without any
"holes" i.e. no pending async operations prior to it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessarily async. it could be regular writes too. in case of multi-writers, there could be a failed commit waiting to be rolled back.

create log files in the same filegroup for the metadata index update. This will
happen within the existing data table lock.

The indexer runs in a loop until the metadata for data upto `t0` plus the data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would like to understand the loop here. I thought we will just go for one round and then timeout. will sync up f2f.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right..i'll word it better..what i meant is run until timeout.

Further, suppose there were two inflight writers Writer1 and Writer2 (with
inflight instants `t1` and `t2` respectively) while the indexing was requested
or inflight. In this case, the writers will check for pending index action and
find a pending instant `t3`. Now, if the metadata index creation is pending,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the attached image, I see locks. Would be good to cover whats the critical section for which we acquire lock here for entire design in general.
for eg:
regular writers when checking for pending indexing?
regular writers to check for completed partitions in MDT? (from table config)
async indexer while updating the hoodie table config ?
etc. something like this. I am not claiming we need to acquire lock for all of above. But a list like this would be good to call it out explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, i'll update. basically, we need lock when:

  1. creating filegroups while scheduling
  2. writing to MDT timeline.

@codope codope added the priority:blocker Production down; release blocker label Mar 16, 2022
@codope codope force-pushed the rfc-45-async-index branch from 62db921 to ee05ae2 Compare April 1, 2022 15:28
@hudi-bot
Copy link
Collaborator

hudi-bot commented Apr 1, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker rfc Request for comments

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants