diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 181b0b18a71..9e4e9078ceb 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -106,6 +106,9 @@ cdef class NativeFile: raise IOError("file not open") def size(self): + """ + Return file size + """ cdef int64_t size self._assert_readable() with nogil: @@ -113,6 +116,9 @@ cdef class NativeFile: return size def tell(self): + """ + Return current stream position + """ cdef int64_t position with nogil: if self.is_readable: @@ -121,10 +127,46 @@ cdef class NativeFile: check_status(self.wr_file.get().Tell(&position)) return position - def seek(self, int64_t position): + def seek(self, int64_t position, int whence=0): + """ + Change current file stream position + + Parameters + ---------- + position : int + Byte offset, interpreted relative to value of whence argument + whence : int, default 0 + Point of reference for seek offset + + Notes + ----- + Values of whence: + * 0 -- start of stream (the default); offset should be zero or positive + * 1 -- current stream position; offset may be negative + * 2 -- end of stream; offset is usually negative + + Returns + ------- + new_position : the new absolute stream position + """ + cdef int64_t offset self._assert_readable() with nogil: - check_status(self.rd_file.get().Seek(position)) + if whence == 0: + offset = position + elif whence == 1: + check_status(self.rd_file.get().Tell(&offset)) + offset = offset + position + elif whence == 2: + check_status(self.rd_file.get().GetSize(&offset)) + offset = offset + position + else: + with gil: + raise ValueError("Invalid value of whence: {0}" + .format(whence)) + check_status(self.rd_file.get().Seek(offset)) + + return self.tell() def write(self, data): """ @@ -144,6 +186,18 @@ cdef class NativeFile: check_status(self.wr_file.get().Write(buf, bufsize)) def read(self, nbytes=None): + """ + Read indicated number of bytes from file, or read all remaining bytes + if no argument passed + + Parameters + ---------- + nbytes : int, default None + + Returns + ------- + data : bytes + """ cdef: int64_t c_nbytes int64_t bytes_read = 0 diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 21288e4f35e..f2d67f6641b 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -33,12 +33,14 @@ except ImportError: pass + try: - import pyarrow.plasma as plasma + import pyarrow.plasma as plasma # noqa defaults['plasma'] = True except ImportError: pass + def pytest_configure(config): pass diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index 835f50874f7..c81a0485ce1 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -323,6 +323,15 @@ def _check_native_file_reader(FACTORY, sample_data): assert f.tell() == len(data) + 1 assert f.read(5) == b'' + # Test whence argument of seek, ARROW-1287 + assert f.seek(3) == 3 + assert f.seek(3, os.SEEK_CUR) == 6 + assert f.tell() == 6 + + ex_length = len(data) - 2 + assert f.seek(-2, os.SEEK_END) == ex_length + assert f.tell() == ex_length + def test_memory_map_reader(sample_disk_data): _check_native_file_reader(pa.memory_map, sample_disk_data) diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 8f8d5b5ed60..e168d9fb1ef 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -19,22 +19,20 @@ from __future__ import division from __future__ import print_function -import glob import numpy as np import os import pytest import random import signal import subprocess -import sys import time -import unittest import pyarrow as pa import pandas as pd DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9 + def random_name(): return str(random.randint(0, 99999999)) @@ -160,7 +158,7 @@ def setup_method(self, test_method): def teardown_method(self, test_method): # Check that the Plasma store is still alive. - assert self.p.poll() == None + assert self.p.poll() is None # Kill the plasma store process. if os.getenv("PLASMA_VALGRIND") == "1": self.p.send_signal(signal.SIGTERM) @@ -227,7 +225,7 @@ def test_create_existing(self): self.plasma_client.create(object_id, length, generate_metadata(length)) # TODO(pcm): Introduce a more specific error type here. - except pa.lib.ArrowException as e: + except pa.lib.ArrowException: pass else: assert False @@ -270,7 +268,6 @@ def test_get(self): assert results[i] is None def test_store_arrow_objects(self): - import pyarrow.plasma as plasma data = np.random.randn(10, 4) # Write an arrow object. object_id = random_object_id() @@ -334,7 +331,7 @@ def assert_create_raises_plasma_full(unit_test, size): partial_size, size - partial_size) # TODO(pcm): More specific error here. - except pa.lib.ArrowException as e: + except pa.lib.ArrowException: pass else: # For some reason the above didn't throw an exception, so fail. @@ -368,7 +365,7 @@ def test_contains(self): fake_object_ids = [random_object_id() for _ in range(100)] real_object_ids = [random_object_id() for _ in range(100)] for object_id in real_object_ids: - assert self.plasma_client.contains(object_id) == False + assert self.plasma_client.contains(object_id) is False self.plasma_client.create(object_id, 100) self.plasma_client.seal(object_id) assert self.plasma_client.contains(object_id) @@ -383,7 +380,7 @@ def test_hash(self): try: self.plasma_client.hash(object_id1) # TODO(pcm): Introduce a more specific error type here - except pa.lib.ArrowException as e: + except pa.lib.ArrowException: pass else: assert False diff --git a/python/testing/parquet_interop.py b/python/testing/parquet_interop.py new file mode 100644 index 00000000000..ba2eb6fa416 --- /dev/null +++ b/python/testing/parquet_interop.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import pytest + +import fastparquet +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import pandas.util.testing as tm + + +def hdfs_test_client(driver='libhdfs'): + host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') + user = os.environ['ARROW_HDFS_TEST_USER'] + try: + port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500)) + except ValueError: + raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' + 'an integer') + + return pa.HdfsClient(host, port, user, driver=driver) + + +def test_fastparquet_read_with_hdfs(): + fs = hdfs_test_client() + + df = tm.makeDataFrame() + table = pa.Table.from_pandas(df) + + path = '/tmp/testing.parquet' + with fs.open(path, 'wb') as f: + pq.write_table(table, f) + + parquet_file = fastparquet.ParquetFile(path, open_with=fs.open) + + result = parquet_file.to_pandas() + tm.assert_frame_equal(result, df)