From 180c4a24717217496a4b3a64bf5fec9c67ee6caf Mon Sep 17 00:00:00 2001 From: Jim Crist Date: Tue, 9 Jan 2018 17:08:07 -0600 Subject: [PATCH] Fix race condition in write_to_dataset One race condition was already fixed, but another one exists when writing by partition. --- python/pyarrow/parquet.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index d9f1bd2c364..151e0df8a22 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -966,6 +966,14 @@ def write_table(table, where, row_group_size=None, version='1.0', """.format(_parquet_writer_arg_docs) +def _mkdir_if_not_exists(fs, path): + if fs._isfilestore() and not fs.exists(path): + try: + fs.mkdir(path) + except OSError: + assert fs.exists(path) + + def write_to_dataset(table, root_path, partition_cols=None, filesystem=None, preserve_index=True, **kwargs): """ @@ -1012,11 +1020,7 @@ def write_to_dataset(table, root_path, partition_cols=None, else: fs = _ensure_filesystem(filesystem) - if fs._isfilestore() and not fs.exists(root_path): - try: - fs.mkdir(root_path) - except OSError: - assert fs.exists(root_path) + _mkdir_if_not_exists(fs, root_path) if partition_cols is not None and len(partition_cols) > 0: df = table.to_pandas() @@ -1034,8 +1038,7 @@ def write_to_dataset(table, root_path, partition_cols=None, subtable = Table.from_pandas(subgroup, preserve_index=preserve_index) prefix = "/".join([root_path, subdir]) - if fs._isfilestore() and not fs.exists(prefix): - fs.mkdir(prefix) + _mkdir_if_not_exists(fs, prefix) outfile = compat.guid() + ".parquet" full_path = "/".join([prefix, outfile]) with fs.open(full_path, 'wb') as f: