Skip to content

Conversation

@Polectron
Copy link
Contributor

@Polectron Polectron commented Mar 21, 2023

Building on top of #7033 uses pyarrow.dataset.Scanner.head(num_rows) and a multiprocessing.Value as a counter to limit the number rows retrieved on pyiceberg.io.pyarrow.project_table(), avoids reading more files if the desired quantity specified by limit has been reached

Closes #7013

@github-actions github-actions bot added the INFRA label Mar 21, 2023
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great @Polectron I've left some suggestions, let me know what you think


if limit:
arrow_table = fragment_scanner.head(limit)
with rows_counter.get_lock():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this lock because we already did the expensive work. This will make the code a bit simpler and avoid locking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The official python documentation for multiprocessing.Value suggests that non atomic operations like += should use a lock. Otherwise a race condition could happen where multiple reads end at the same time and overwrite the row counter, causing that we keep reading rows even if we already read enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for the explanation. Currently, we don't do multi-processing, but multi-threading. I did some extensive testing and noticed that multi-processing wasn't substantially faster than multithreading. Probably because most time is spent in fetching the files, and reading the data, which all happens in Arrow, which bypasses the GIL.

Copy link
Contributor Author

@Polectron Polectron Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you made the right choice going for multi-threading because the task is IO-bound and there are potentially thousands of tasks that need to be executed. Even though we use multi-threading which is bound to the GIL some operations might still not be safe (like += which performs a read and a write as two separate operations).
Also, even though the tasks are run on threads in a ThreadPool, multiprocessing.Value implements a multiprocessing.RLock by default wich is compatible with both processes and threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clearing this up!

def __setattr__(self, name: str, value: Any) -> None:
# The file_format is written as a string, so we need to cast it to the Enum
if name == "file_format":
value = FileFormat[value]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious. Is this related to changes in pyarrow.py or just an independent fix for parsing file_format in DataFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was introduced by merging #7033

@Polectron
Copy link
Contributor Author

I don't really understand whats failing on the Python Integration test, it sometimes doesn't find some of the tables created by provision.py, maybe it's because in python-integration.yml the sleep after launching the docker container isn't long enough so the tests start running before the tables finish being created?

@Polectron Polectron requested a review from Fokko March 22, 2023 11:25
@Fokko
Copy link
Contributor

Fokko commented Mar 22, 2023

@Polectron I noticed this as well. The current master is red because of failing Python integration tests. Maybe we can increase the timeout for now.

docker-compose -f dev/docker-compose-integration.yml build
docker-compose -f dev/docker-compose-integration.yml up -d
sleep 20
sleep 30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for now 👍🏻

@Fokko
Copy link
Contributor

Fokko commented Mar 22, 2023

@Polectron thanks, this looks great! I have one final ask, could you update the docs under python/mkdocs/docs/? Otherwise, people won't use this awesome feature.

@Polectron
Copy link
Contributor Author

@Fokko
Done! 🎉

@Fokko
Copy link
Contributor

Fokko commented Mar 23, 2023

@Polectron looks like #7148 went in, can you rebase?

@Polectron Polectron requested a review from Fokko March 23, 2023 19:12

if limit:
arrow_table = fragment_scanner.head(limit)
with rows_counter.get_lock():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clearing this up!

@Fokko Fokko merged commit 25360c0 into apache:master Mar 24, 2023
@Fokko
Copy link
Contributor

Fokko commented Mar 24, 2023

Thanks for working on this @Polectron, this is a great addition 👍🏻

@Fokko Fokko added this to the PyIceberg 0.4.0 release milestone Mar 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PyIceberg, Implement Limit filter to preview Table Data

3 participants