Skip to content

Conversation

@Fokko
Copy link
Contributor

@Fokko Fokko commented May 31, 2022

Reads Avro file by first reading the headers, and then extracting the schema Then we convert the Avro schema into Iceberg, and read the actual binary using the schema visitor.

The binary decoder and codecs have been copied from apache/avro because I didn't want to depend on the library for just that. Also, apache/avro uses the general IO interface, while we have our own FileStream interface for reading files.

To not make the PR too big, I'm working on the follow-up PRs:

  • Map the accessor onto an actual Python class
  • Support Manifest version 1 & 2
  • Support write/read schema

@Fokko Fokko force-pushed the fd-read-avro branch 3 times, most recently from 970872f to 730486a Compare June 1, 2022 13:42
Reads Avro file by first reading the headers, and then extracting the schema
Then we convert the Avro schema into Iceberg, and read the actual binary using the
schema visitor
@danielcweeks danielcweeks requested review from rdblue and samredai June 3, 2022 22:24
Copy link
Contributor

@samredai samredai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome @Fokko! I left some comments. For some of the comments in src/iceberg/avro/, I know we're vendoring some of that so please feel free to ignore any nit/style comments there. I'm super excited that we'll have a read path that utilizes all standard lib stuff. 😄

assert "Unknown logical/physical type combination:" in str(exc_info.value)


def test_logical_map_with_invalid_fields():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a pattern here so we may be able to consolidate these into a handful of parametrized tests, making them easily extendable too. How about making a single function per method that's being tested?

  • AvroSchemaConversion()._convert_logical_type
  • AvroSchemaConversion()._convert_logical_map_type
  • AvroSchemaConversion()._convert_schema
  • AvroSchemaConversion()._convert_field
  • AvroSchemaConversion()._convert_record_type
  • AvroSchemaConversion()._convert_array_type

As an example, for AvroSchemaConversion()._convert_logical_type:

@pytest.mark.parametrize(
    "avro_logical_type,expected",
    [
        ({"type": "int", "logicalType": "date"}, DateType()),
        (
            {"type": "bytes", "logicalType": "decimal", "precision": 19, "scale": 25},
            DecimalType(precision=19, scale=25),
        ),
        ...,
    ],
)
def test_schema_conversion_convert_logical_type(avro_logical_type, expected):
    assert AvroSchemaConversion()._convert_logical_type(avro_logical_type) == expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a fan of parameterized tests:

  • I find them hard to read with all the (curly)braces
  • If you have many tests, and the last one is failing, you have to rerun the earlier ones all the time, which is kind of annoying if you have breakpoints everywhere.
  • The parameterized arguments are evaluated every time, even if you run an unrelated test.

I don't mind a few additional tests.

return self._data[pos]


class _AvroReader(SchemaVisitor[Union[AvroStructProtocol, Any]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this calls a decoder when it visits a schema, but I was expecting an implementation that creates a reader tree that accepts a decoder, like the other one. Why did you decide to go with this approach over the other one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rdblue. This made the most sense to me at the time. Which one is the other one? I don't have a strong opinion on this way or the other. Probably you have done this more often than me :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested the other approach for a few reasons. First, I think when it is reasonable to match the approach taken by the Java codebase, that's a good idea. That way we don't have completely different implementations to validate and maintain.

Second, this approach traverses the schema for each record read. That is inefficient compared to building a tree of readers that handle this. The reader tree approach allows us to create basically a state machine that is ready to read records.

Last, this is harder to update. The next step is to reconcile the differences between the read and write schemas. Doing that for every record is hard to write with this approach. Same thing with reusing structs, dicts, and lists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored the code and added a reader tree. I love the approach because it gives a much nicer decoupling between the actual reading and the schema. This will make the reader/writer schema much easier to implement.

I kept it as simple as possible to not prematurely optimize the code and keep the PR a bit more concise. We can add things like reusing structs, dicts and lists later on.


@staticmethod
def decompress(readers_decoder: BinaryDecoder) -> BinaryDecoder:
_ = readers_decoder.read_long()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression stores the length even if it is uncompressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to check, and this is the case. I've added a test with a snappy and a null-codec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually an artifact of how the decoder is used. Since the block length is right before the block, the decoder does the right thing when you call read_bytes. But in this case there's no need to create a new decoder. It just needs to consume the length bytes and return the existing one. It makes sense, although I do think it is a bit strange to couple the compression API with decoders.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right and this doesn't look good. This is actually a bug. I'll refactor this, including passing the bytes to the compression API instead of the decoder, which indeed doesn't make much sense.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, this is great! I made a few comments but it's really close.

the unix epoch, 1 January 1970 (ISO calendar).
"""
days_since_epoch = self.read_int()
return date(1970, 1, 1) + timedelta(days_since_epoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I'd prefer to use days_to_date just like this uses micros_to_time and micros_to_timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it 👍🏻 Updated


def micros_to_timestamp(micros: int, tzinfo: timezone | None = None):
dt = timedelta(microseconds=micros)
unix_epoch_datetime = datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=tzinfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use EPOCH_TIMESTAMP and EPOCH_TIMESTAMPTZ instead of creating a new datetime here?

def micros_to_timestamp(micros: int):
    return EPOCH_TIMESTAMP + timedelta(microseconds=micros)

def micros_to_timestamptz(micros: int):
    return EPOCH_TIMESTAMPTZ + timedelta(microseconds=micros)

@Fokko
Copy link
Contributor Author

Fokko commented Jun 14, 2022

Hey @rdblue I've gone through the comments, the only one open is currently: #4920 (comment) Not sure if we want to break the protocol, or make a helper class. Apart from that, I've added a lot of tests to do some checks and boost the coverage a bit. Let me know what you think!

from abc import ABC, abstractmethod


class Codec(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: would it make sense to put this in the __init__.py file?

Copy link
Contributor Author

@Fokko Fokko Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some projects do this, and some don't :) For me, it makes sense to add base classes in the init. Mostly because they need to be loaded anyway, and by adding them to the __init__.py they are read when you access the module. Also, for the case of the codecs, this avoids having yet another file. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding it to __init__.py. Fewer files cuts down on load time and we will need to load it anyway.

the unix epoch, 1 January 1970 (ISO calendar).
"""
days_to_date = self.read_int()
return date(1970, 1, 1) + timedelta(days_to_date)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding days_to_date to datetime.py? Then you could use EPOCH_DATE and this would be reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auch, that one slipped through the cracks. I just updated the code

def __init__(self, input_file: InputFile) -> None:
self.input_file = input_file

def __enter__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up, I think we should start a base class for our readers that handles __iter__, __enter__, and __exit__. This should probably use threading.local() to ensure that with is thread-safe if the file is shared, and I think we can make __iter__ return an iterator class. But these all work great for now.

Copy link
Contributor Author

@Fokko Fokko Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to do this, but I would suggest doing this in a separate PR. We could also extend the Input- and OutputFile. That __enter__ calls open() or create(). Having a separate Iterator isn't pythonic.

With regard to the threading, I think that's another can of worms. (At least as a start) reading a file should not be considered thread-safe, and we should not share the file across threads. I would rather suggest reading the files in parallel using something like multiprocessing. We could also split out the blocks in the Avro file if we like. Another option would be to make the read async. We could also go fancy and go for a async iterator. But looking at the size of this PR, I think we should split that out. Let me know if you feel otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, definitely as a separate PR. I didn't mean to suggest doing it now.

@Fokko
Copy link
Contributor Author

Fokko commented Jun 15, 2022

@rdblue I've fixed the linting violation introduced by #5055

@Fokko Fokko requested a review from rdblue June 19, 2022 18:28
@rdblue rdblue merged commit da63c84 into apache:master Jun 20, 2022
@rdblue
Copy link
Contributor

rdblue commented Jun 20, 2022

Thanks, @Fokko! Everything looks great in here. Thanks for updating the tests. Looking forward to the next steps!

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants