Skip to content
This repository has been archived by the owner on Feb 17, 2023. It is now read-only.

Commit

Permalink
Update the SplittedParquetWriter with related docs and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JayjeetAtGithub committed May 30, 2021
1 parent 8635266 commit 39a535c
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 64 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/adapters/arrow-rados-cls/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ docker-compose run --service-ports ubuntu-cls-demo

* Minimal overhead in requirements:
1) Requires CephFS to be mounted.
2) Requires using the `SplittedParquetWriter` API to write arrow Tables.
2) Requires using the [`SplittedParquetWriter`](../../../../../python/pyarrow/rados.py) API to write arrow Tables.

* Built on top of latest Ceph v15.2.x.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The goal of SkyhookDM is to allow users to transparently grow and shrink their d

# Lifetime of a Dataset Scan in SkyhookDM

* **Write Path:** Datasets containing Parquet files or a directory heirarchy of Parquet files are written to a CephFS mount. While writing, each Parquet file is splitted into several smaller Parquet files of size `<= 128 MB`. We configure the stripe unit in CephFS to be 128 MB to ensure a `1:1 mapping` between a file and an object. The file layout is shown in the figure below. The reason behing choosing 128 MB as the stripe size is because Ceph doesn't perform well with objects any larger than 128 MB. Also, some of our performance experiments have shown most optimal performance with 128 MB Parquet files. Once the Parquet files are written to CephFS, they are ready to be scanned via the `RadosParquetFileFormat`.
* **Write Path:** Datasets containing Parquet files or a directory heirarchy of Parquet files are written to a CephFS mount. While writing, each Parquet file is splitted into several smaller Parquet files of size `<= 128 MB` by the `SplittedParquetWriter`. We configure the stripe unit in CephFS to be 128 MB to ensure a `1:1 mapping` between a file and an object. The file layout is shown in the figure below. The reason behing choosing 128 MB as the stripe size is because Ceph doesn't perform well with objects any larger than 128 MB. Also, some of our performance experiments have shown most optimal performance with 128 MB Parquet files. Once the Parquet files are written to CephFS, they are ready to be scanned via the `RadosParquetFileFormat`.

<p align="center">
<img src="./images/filelayout.png" width="80%">
Expand Down
12 changes: 4 additions & 8 deletions cpp/src/arrow/adapters/arrow-rados-cls/docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,20 @@

2. Build and install SkyhookDM and [PyArrow](https://pypi.org/project/pyarrow/) (with Rados Parquet extensions) using [this](../scripts/deploy_skyhook.sh) script.

3. Update your Ceph configuration file with this line and restart the OSDs to reload the changes.
3. Update your Ceph configuration file with the line below and restart the OSD daemons to load the arrow CLS libraries.
```
osd class load list = *
osd class load list = arrow
```

# Interacting with SkyhookDM

1. Write some [Parquet](https://parquet.apache.org/) files in the CephFS mount. We need to use the [`deploy_data.sh`](../scripts/deploy_data.sh) script to write Parquet files to CephFS for use in SkyhookDM. For example,
```bash
./deploy_data.sh myfile.parquet /mnt/cephfs/myfile.parquet 100 67108864
```
Running this command would write 100 Parquet files in the format `myfile.parquet.N` in the `/mnt/cephfs` directory with an object size of 64MB.
1. Write some [Parquet](https://parquet.apache.org/) files in the CephFS mount by splitting them up into files of size `128 MB` or less. This is achieved by using the [`SplittedParquetWriter`](../../../../../../python/pyarrow/rados.py) API. An example script to split up Parquet files into SkyhookDM compatible files can be found [here](../scripts/splitter.py).

2. Write a client script and get started with querying datasets in SkyhookDM. An example script is given below.
```python
import pyarrow.dataset as ds

format_ = ds.RadosParquetFileFormat("/path/to/cephconfig", "cephfs-data-pool-name")
dataset_ = ds.dataset("file:///mnt/cephfs/dataset", format=format_)
dataset_ = ds.dataset("file:///path/to/dataset", format=format_)
print(dataset_.to_table())
```
32 changes: 32 additions & 0 deletions cpp/src/arrow/adapters/arrow-rados-cls/scripts/splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
import sys

from pyarrow.rados import SplittedParquetWriter

if __name__ == "__main__":
source_dir = str(sys.argv[1])
destination_dir = str(sys.argv[2])
chunksize = int(sys.argv[3])

files = os.listdir(source_dir)
for file in files:
path = os.path.join(source_dir, file)
writer = SplittedParquetWriter(path, destination_dir, chunksize)
writer.write()
91 changes: 43 additions & 48 deletions python/pyarrow/rados.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,61 +16,56 @@
# under the License.

import os
import pyarrow as pa
import uuid
import time
import pyarrow.parquet as pq
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor


class SplittedParquetWriter(object):
def __init__(self, source_path, destination_path, chunksize):
self.source_path = source_path
self.destination_path = destination_path
def __init__(self, filename, destination, chunksize=128*1024*1024):
self.filename = filename
self.destination = destination
self.chunksize = chunksize
self.file = pq.ParquetFile(source_path)
self._schema = self.file.schema_arrow
self._fileno = -1

def __del__(self):
self.close()
def round(self, num):
num_str = str(int(num))
result_str = ""
result_str += num_str[0]
for i in range(len(num_str) - 1):
result_str += "0"
return int(result_str)

def _create_new_file(self):
self._fileno += 1
_fd = open(os.path.join(
self.destination_path, f"file.{self._fileno}.parquet"), "wb")
return _fd
def write_file(self, filename, table):
pq.write_table(
table, filename,
row_group_size=table.num_rows, compression=None
)

def _open_new_file(self):
self._current_fd = self._create_new_file()
self._current_sink = pa.PythonFile(self._current_fd, mode="w")
self._current_writer = pq.ParquetWriter(
self._current_sink, self._schema)

def _close_current_file(self):
self._current_writer.close()
self._current_sink.close()
self._current_fd.close()
def estimate_rows(self):
self.table = pq.read_table(self.filename)
disk_size = os.stat(self.filename).st_size
inmemory_table_size = self.table.nbytes
inmemory_row_size = inmemory_table_size/self.table.num_rows
compression_ratio = inmemory_table_size/disk_size
required_inmemory_table_size = self.chunksize * compression_ratio
required_rows_per_file = required_inmemory_table_size/inmemory_row_size
return self.table.num_rows, self.round(required_rows_per_file)

def write(self):
self._open_new_file()
for batch in self.file.iter_batches(): # default batch_size=64k
table = pa.Table.from_batches([batch])
self._current_writer.write_table(table)
if self._current_sink.tell() < self.chunksize:
continue
else:
self._close_current_file()
self._open_new_file()

self._close_current_file()

def close(self):
num_files_written = self._fileno + 1
for i in range(num_files_written):
table = pq.read_table(f"file.{i}.parquet")
pq.write_table(
table,
where=f"file.{i}.parquet",
row_group_size=table.num_rows
)

self._fileno = -1
return num_files_written
os.makedirs(self.destination, exist_ok=True)
s_time = time.time()
total_rows, rows_per_file = self.estimate_rows()
i = 0
with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor:
while i < total_rows:
executor.submit(
self.write_file,
os.path.join(
self.destination, f"{uuid.uuid4().hex}.parquet"),
self.table.slice(i, rows_per_file)
)
i += rows_per_file
e_time = time.time()
print(f"Finished writing in {e_time - s_time} seconds")
12 changes: 6 additions & 6 deletions python/pyarrow/tests/test_rados_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ def test_splitted_parquet_writer():
os.system("wget "
"https://raw.githubusercontent.com/"
"JayjeetAtGithub/zips/main/largefile.parquet")
chunksize = 4 * 1000000 # 4MB
writer = SplittedParquetWriter("largefile.parquet", os.getcwd(), chunksize)
chunksize = 4 * 1024 * 1024 # 4MB
writer = SplittedParquetWriter("largefile.parquet", 'mydataset', chunksize)
writer.write()
num_files_written = writer.close()
assert num_files_written == 5
assert len(os.listdir('mydataset')) == 8

original_file_rows = pq.read_table('largefile.parquet').num_rows
splitted_files_rows = 0
for i in range(num_files_written):
splitted_files_rows += pq.read_metadata(f"file.{i}.parquet").num_rows
files = os.listdir('mydataset')
for file in files:
splitted_files_rows += pq.read_metadata(f"mydataset/{file}").num_rows

assert splitted_files_rows == original_file_rows

0 comments on commit 39a535c

Please sign in to comment.