Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Mar 4, 2022

This PR is trying to refactor the flink write path to remove those underlying old writers to the partition specified writers ( Introduced from @aokolnychyi ).

I was reviewing the PR #4132 from @dungdm93, which is the similar thing that this one is trying to accomplish. I tried to rewrite it because I wanted to try out the details of the design for myself and get a better understanding of those new writers. In this way, I think I can provide more background & details design from my perspective. After high level design are okay, then I think we can make the whole PR to be smaller ones and let's collaborate together to get this work done @dungdm93 .

@openinx openinx added this to the Iceberg 0.14.0 Release milestone Mar 4, 2022
@openinx openinx marked this pull request as draft March 4, 2022 02:45
private final int[] equalityFieldIds;
private final SortOrder sortOrder;
private DeleteFile deleteFile = null;
private long rowOffset = 0;
Copy link
Contributor

@dungdm93 dungdm93 Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about name recordCount or rowCount?
IMO rowOffset is specific for tracking record write to file, recordCount is more general.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the BaseFile is using the recordCount, I think it's better to keep this consistent with the BaseFile's recordCount :-)

@openinx
Copy link
Member Author

openinx commented Mar 8, 2022

FYI @rdblue , @aokolnychyi , @dungdm93 . I think this draft PR is ready to review now. Mind to take a look when you have a chance ?

@openinx openinx marked this pull request as ready for review March 8, 2022 04:26
* @param partition a partition or null if the spec is unpartitioned
*/
void write(T row, PartitionSpec spec, StructLike partition);
PathOffset write(T row, PartitionSpec spec, StructLike partition);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the PartitioningWriter will have multiple partition writers in its internal implementation, so we can not just introduce the two methods in this interface:

  /**
   * Returns the file path that are currently opened.
   *
   * @return the current file path.
   */
  CharSequence location();

  /**
   * Returns the row offset that are currently writing, starting from 0.
   *
   * @return the current row offset.
   */
  long rowOffset();

Because those location() and rowOffset() are binded to specific writers. In theory, different writers should have different location() and rowOffset(). So here we add a return value PathOffset here to get the latest wrote path & offset for the wrote row.

import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.StructProjection;

public class BaseEqualityDeltaWriter<T> implements EqualityDeltaWriter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any style guidelines for documentation? Typically its nice to have javadoc on al public classes/members that aren't overwritten.


public class BaseEqualityDeltaWriter<T> implements EqualityDeltaWriter<T> {

private final ThreadLocal<PositionDelete<T>> posDelete = ThreadLocal.withInitial(PositionDelete::create);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe document why thread local is used here?

void write(T row);

/**
* Returns the file path that are currently opened.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Returns the file path that are currently opened.
* Returns the file path that is currently opened.

Is "opened" important here? what happens if the writer is closed?

CharSequence location();

/**
* Returns the row offset that are currently writing, starting from 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Returns the row offset that are currently writing, starting from 0.
* Returns the row offset that will be written to next, starting from 0.

Is this interface intended to be seekable at any point? If not maybe call this rowsWritten which might be clearer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we've discussed in https://github.com/apache/iceberg/pull/4264/files#r819238913, I agree recordCount is a good & consistent name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we've discussed in https://github.com/apache/iceberg/pull/4264/files#r819238913, I agree recordCount is a good & consistent name.

Comment on lines 104 to 109
if (!retireOldKey(asStructLike.apply(key), spec, partition)) {
equalityWriter.write(key, spec, partition);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be needed to reconsider because of this PR: #4364

@openinx openinx force-pushed the flink-writer-refactor branch from 588e7b3 to 6f329e1 Compare March 28, 2022 09:57
@github-actions github-actions bot added the data label Mar 28, 2022
@openinx openinx force-pushed the flink-writer-refactor branch from 3bc727c to be391f7 Compare March 29, 2022 09:51
@openinx
Copy link
Member Author

openinx commented Apr 2, 2022

CC @rdblue @aokolnychyi , Would you mind to take a look when you have a chance ?

@openinx openinx force-pushed the flink-writer-refactor branch from be391f7 to 86f83fa Compare April 18, 2022 07:52
@rdblue rdblue removed this from the Iceberg 0.14.0 Release milestone Jun 28, 2022
@github-actions
Copy link

github-actions bot commented Aug 7, 2024

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 7, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

4 participants