Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions python/src/iceberg/io/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from abc import ABC, abstractmethod


class InputFile(ABC):
"""A base class for InputFile implementations"""

def __init__(self, location: str):
self._location = location

@abstractmethod
def __len__(self) -> int:
"""Returns the total length of the file, in bytes"""

@property
def location(self) -> str:
"""The fully-qualified location of the input file"""
return self._location

@property
@abstractmethod
def exists(self) -> bool:
"""Checks whether the file exists"""

@abstractmethod
def __enter__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nice to support with syntax, but I think it would be better to use an open or new_stream method to do this. That way you can customize the options if you need to and you avoid having shared state on this class that makes it unsafe. You'd also be able to use a type annotation here for the class of input_stream.

with in_file.new_stream(buffer_size = 8192) as f:
    f.read()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use open for InputFile and create for OutputFile.

"""Enter context for InputFile

This method should assign a seekable stream to `self.input_stream` and
return `self`. If the file does not exist, a FileNotFoundError should
be raised."""

@abstractmethod
def __exit__(self, exc_type, exc_value, exc_traceback):
"""Exit context for InputFile

This method should perform any necessary teardown."""


class OutputFile(ABC):
"""A base class for OutputFile implementations"""

def __init__(self, location: str, overwrite: bool = False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a stream factory, overwrite is actually an option for the output stream and not for the OutputFile itself. Using a method to open like I suggested for InputFile gives you a place to put this:

with out_file.create(overwrite=True) as f:
    f.write(...)

in_file = out_file.to_input_file()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! Thanks @rdblue

self._location = location
self._overwrite = overwrite

@abstractmethod
def __len__(self) -> int:
"""Returns the total length of the file, in bytes"""

@property
def location(self) -> str:
"""The fully-qualified location of the output file"""
return self._location

@property
def overwrite(self) -> bool:
"""Whether or not to overwrite the file if it exists"""
return self._overwrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that these classes should be immutable so that we don't have to worry about thread safety.


@property
@abstractmethod
def exists(self) -> bool:
"""Checks whether the file exists"""

@abstractmethod
def to_input_file(self) -> InputFile:
"""Returns an InputFile for the location of this output file"""

@abstractmethod
def __enter__(self):
"""Enter context for OutputFile

This method should return a file-like object. If the file already exists
at `self.location` and `self.overwrite` is False a FileExistsError should
be raised.

Example:
>>> with OutputFile(overwrite=True) as f:
content = f.read()
"""

@abstractmethod
def __exit__(self, exc_type, exc_value, exc_traceback):
"""Exit context for OutputFile

This method should perform any necessary teardown.

Example:
>>> with OutputFile(connection=connection):
content = f.read()
connection.close() # `__exit__` method would contain `del self._connection`
"""


class FileIO(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc string

@abstractmethod
def new_input(self, location: str) -> InputFile:
"""Get an InputFile instance to read bytes from the file at the given location"""

@abstractmethod
def new_output(self, location: str) -> OutputFile:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to distinguish between input and output files? it seems like for the most part the APIs are very similar? It seems if a file is going to be only readable or writeable having the implementatin throw not-implemented might be a better choice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the flexibility to the implementation. You can always implement both base classes, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be more a philosophical question. There are two ways to achieve the flexibility:

  1. Provide a single class and have users only implement the methods they want (you can document a set of methods that should always be implemented together). Giving users run-time errors when not implemented.
  2. Separate the functionality into two different interfaces and require all methods be implemented.

My sense is that #2 is more of a java design pattern. I think (but I'm no expert) option #1 is more pythonic/dynamically typed language pattern.

"""Get an OutputFile instance to write bytes to the file at the given location"""

@abstractmethod
def delete(self, location: str):
"""Delete the file at the given path"""
116 changes: 116 additions & 0 deletions python/tests/io/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import io

from iceberg.io.base import FileIO, InputFile, OutputFile


class FooInputFile(InputFile):
def __len__(self):
return io.BytesIO(b"foo").getbuffer().nbytes

def exists(self):
return True

def __enter__(self):
super().__enter__()
return io.BytesIO(b"foo")

def __exit__(self, exc_type, exc_value, exc_traceback):
super().__exit__(exc_type, exc_value, exc_traceback)
return


class FooOutputFile(OutputFile):
def __call__(self, overwrite: bool = False, **kwargs):
super().__call__(overwrite=True)
return self

def __len__(self):
return len(self._file_obj)
Copy link
Contributor

@rdblue rdblue Jan 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be os.path.getsize(self.location)? I don't see any other reference to self._file_obj.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right thanks! I updated it to use the parsed uri added in another commit so it's os.path.getsize(self.parsed_location.path) now. I also added validation of len in the tests.

relevant commit: 7b625cf


def exists(self):
return True

def to_input_file(self):
return FooInputFile(location=self.location)

def __enter__(self):
self._mock_storage = io.BytesIO()
return self._mock_storage

def __exit__(self, exc_type, exc_value, exc_traceback):
super().__exit__(exc_type, exc_value, exc_traceback)
return


class FooFileIO(FileIO):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than writing tests directly against FooFileIO, what about writing generic tests and passing an instance of FooFileIO? You could have the test implementation stash strings in memory keyed by location for the initial API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this could also do what the Java one does and use local files rather than an in-memory implementation. That's probably much more useful!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about writing generic tests and passing an instance of FooFileIO

Great idea, I'll update this.

Actually, this could also do what the Java one does and use local files rather than an in-memory implementation. That's probably much more useful!

If we're expecting to have a LocalFileIO implementation of FileIO, would that be a better place to include tests using local files? This file could then be isolated to just testing the ABC classes and a failure in LocalFileIO tests but not here, or failures in both sets of tests may make for clearer signals on where in the source code to look (I think).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the LocalFileIO in Java is in tests only. We probably don't want to have anyone use it in production, but it's a great way to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the tests to use a LocalFileIO, LocalInputFile, and LocalOutputFile implementation. I've also made the tests more generic by configuring them to take a list of implementation classes. I'm using tempfile which is part of the python standard library to create temporary directories/files to test the local file-io.

def new_input(self, location: str):
return FooInputFile(location=location)

def new_output(self, location: str):
return FooOutputFile(location=location)

def delete(self, location: str):
return


def test_custom_input_file():

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small style nit PEP8 says "Use blank lines in functions, sparingly, to indicate logical sections." I think these functions should either add a doc-string on the blank lines or remove them (assuming PEP8).

input_file = FooInputFile(location="foo/bar.json")
assert input_file.location == "foo/bar.json"

with input_file as f:
data = f.read()

assert data == b"foo"


def test_custom_output_file():

output_file = FooOutputFile(location="foo/bar.json")
assert output_file.location == "foo/bar.json"

with output_file as f:
f.write(b"foo")

output_file._mock_storage.seek(0)
assert output_file._mock_storage.read() == b"foo"


def test_custom_output_file_with_overwrite():

output_file = FooOutputFile(location="foo/bar.json", overwrite=True)
assert output_file.location == "foo/bar.json"
assert output_file.overwrite == True

with output_file as f:
f.write(b"foo")

output_file._mock_storage.seek(0)
assert output_file._mock_storage.read() == b"foo"


def test_custom_file_io():

file_io = FooFileIO()
input_file = file_io.new_input(location="foo")
output_file = file_io.new_output(location="bar")

assert isinstance(input_file, FooInputFile)
assert isinstance(output_file, FooOutputFile)