Skip to content

Commit

Permalink
Merge pull request #231 from IBM/ingest-2-parquet
Browse files Browse the repository at this point in the history
ingest to parquet rewrite
  • Loading branch information
blublinsky authored Jun 6, 2024
2 parents 5afcd40 + 87aa4e1 commit dc44844
Show file tree
Hide file tree
Showing 47 changed files with 1,943 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,10 @@ def __init__(
:param n_samples: amount of files to randomly sample
:param files_to_use: files extensions of files to include
"""
self.s3_credentials = {} | s3_credentials
access_key = self.get_access_key()
if access_key is None:
raise ValueError("S3 access key not provided")
secret_key = self.get_secret_key()
if secret_key is None:
raise ValueError("S3 secret key not provided")
endpoint = self.get_endpoint()
region = self.get_region()
if (s3_credentials is None or s3_credentials.get("access_key", None) is None
or s3_credentials.get("secret_key", None) is None):
raise "S3 credentials is not defined"
self.s3_credentials = s3_credentials
if s3_config is None:
self.input_folder = None
self.output_folder = None
Expand All @@ -68,20 +63,13 @@ def __init__(
self.m_files = m_files
self.n_samples = n_samples
self.files_to_use = files_to_use
self.arrS3 = ArrowS3(
access_key=s3_credentials.get("access_key"),
secret_key=s3_credentials.get("secret_key"),
endpoint=s3_credentials.get("url", None),
region=s3_credentials.get("region", None),
)

logger.debug(f"S3 access key provided: {access_key}")
logger.debug(f"S3 secret key provided: no soup for you!")
logger.debug(f"S3 region {region}")
logger.debug(f"S3 endpoint/url: {endpoint}")
logger.debug(f"S input folder: {self.input_folder}")
logger.debug(f"S3 output folder: {self.output_folder}")
logger.debug(f"S3 data sets: {self.d_sets}")
logger.debug(f"S3 checkpoint: {self.checkpoint}")
logger.debug(f"S3 m_files: {self.m_files}")
logger.debug(f"S3 n_samples: {self.n_samples}")
logger.debug(f"S3 files_to_use: {self.files_to_use}")

self.arrS3 = ArrowS3(access_key, secret_key, endpoint=endpoint, region=region)

def get_access_key(self):
return self.s3_credentials.get("access_key", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def process_file(self, f_name: str) -> None:
# execute local processing
name_extension = TransformUtils.get_file_extension(f_name)
self.logger.debug(f"Begin transforming file {f_name}")
out_files, stats = self.transform.transform_binary(byte_array=filedata, ext=name_extension[1])
out_files, stats = self.transform.transform_binary(
base_name=TransformUtils.get_file_basename(f_name), byte_array=filedata)
self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files")
self.last_file_name = name_extension[0]
self.last_file_name_next_index = None
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .abstract_test import AbstractTest, get_tables_in_folder
from .abstract_test import AbstractTest, get_tables_in_folder, get_files_in_folder
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,27 @@
logger = get_logger(__name__)


def get_tables_in_folder(dir) -> list[pa.Table]:
def get_tables_in_folder(dir: str) -> list[pa.Table]:
"""
Get list of Tables loaded from the parquet files in the given directory. The returned
list is sorted lexigraphically by the name of the file.
:param dir:
:return:
"""
tables = []
dal = DataAccessLocal()
files = dal.get_folder_files(dir, extensions=["parquet"], return_data=False)
filenames = []
for filename in files:
filenames.append(filename)
filenames = sorted(filenames)
for filename in filenames:
t = dal.get_table(filename)
tables.append(t)
return tables
files = dal.get_folder_files(dir, extensions=[".parquet"])
return [TransformUtils.convert_binary_to_arrow(data) for data in files.values()]


def get_files_in_folder(dir: str, ext: str) -> dict[str, bytes]:
"""
Get list of Tables loaded from the parquet files in the given directory. The returned
list is sorted lexigraphically by the name of the file.
:param dir:
:return:
"""
dal = DataAccessLocal()
return dal.get_folder_files(dir, extensions=[ext])


class AbstractTest:
Expand Down Expand Up @@ -93,6 +96,27 @@ def validate_expected_tables(table_list: list[pa.Table], expected_table_list: li
r2 = t2.take([j])
assert r1 == r2, f"Row {j} of table {i} are not equal\n\tTransformed: {r1}\n\tExpected : {r2}"

@staticmethod
def validate_expected_files(files_list: list[tuple[bytes, str]], expected_files_list: list[tuple[bytes, str]]):
"""
Verify with assertion messages that the two lists of Tables are equivalent.
:param files_list:
:param expected_files_list:
:return:
"""
assert files_list is not None, "Transform output table is None"
assert expected_files_list is not None, "Test misconfigured: expected table list is None"

l1 = len(files_list)
l2 = len(expected_files_list)
assert l1 == l2, f"Number of transformed files ({l1}) is not the expected number ({l2})"
for i in range(l1):
f1 = files_list[i]
f2 = expected_files_list[i]
assert f1[1] == f2[1], f"produced file extension {f1[1]} is different from expected file extension {f2[1]}"
assert (len(f1[0]) - len(f2[0])) < 50, \
f"produced file length {len(f1[0])} is different from expected file extension {len(f2[0])}"

@staticmethod
def validate_expected_metadata_lists(metadata: list[dict[str, float]], expected_metadata: list[dict[str, float]]):
elen = len(expected_metadata)
Expand All @@ -117,11 +141,12 @@ def validate_expected_metadata(metadata: dict[str, float], expected_metadata: di
)

@staticmethod
def validate_directory_contents(directory: str, expected_dir: str):
def validate_directory_contents(directory: str, expected_dir: str, drop_columns: list[str] = []):
"""
Make sure the directory contents are the same.
:param directory:
:param expected_dir:
:param drop_columns: list of columns that might differ
:return:
"""
dir_cmp = dircmp(directory, expected_dir, ignore=[".DS_Store"])
Expand All @@ -138,24 +163,27 @@ def validate_directory_contents(directory: str, expected_dir: str):
expected_diffs = 0
failed = len(dir_cmp.diff_files) != expected_diffs
if failed:
AbstractTest.__confirm_diffs(directory, expected_dir, dir_cmp.diff_files, "/tmp")
AbstractTest.__confirm_diffs(directory, expected_dir, dir_cmp.diff_files, "/tmp", drop_columns)

# Traverse into the subdirs since dircmp doesn't seem to do that.
subdirs = [f.name for f in os.scandir(expected_dir) if f.is_dir()]
for subdir in subdirs:
d1 = os.path.join(directory, subdir)
d2 = os.path.join(expected_dir, subdir)
AbstractTest.validate_directory_contents(d1, d2)
AbstractTest.validate_directory_contents(d1, d2, drop_columns)

@staticmethod
def _validate_table_files(parquet1: str, parquet2: str):
def _validate_table_files(parquet1: str, parquet2: str, drop_columns: list[str] = []):
da = DataAccessLocal()
t1 = da.get_table(parquet1)
t2 = da.get_table(parquet2)
if len(drop_columns) > 0:
t1 = t1.drop_columns(drop_columns)
t2 = t2.drop_columns(drop_columns)
AbstractTest.validate_expected_tables([t1], [t2])

@staticmethod
def __confirm_diffs(src_dir: str, expected_dir: str, diff_files: list, dest_dir: str):
def __confirm_diffs(src_dir: str, expected_dir: str, diff_files: list, dest_dir: str, drop_columns: list[str] = []):
"""
Copy all files from the source dir to the dest dir.
:param src_dir:
Expand All @@ -172,14 +200,17 @@ def __confirm_diffs(src_dir: str, expected_dir: str, diff_files: list, dest_dir:
# It seems file can be different on disk, but contain the same column/row values.
# so for these, do the inmemory comparison.
try:
AbstractTest._validate_table_files(expected, src)
AbstractTest._validate_table_files(expected, src, drop_columns)
except AssertionError as e:
logger.info(f"Copying file with difference: {src} to {dest}")
shutil.copyfile(src, dest)
raise e
elif "metadata" in file:
pass
else:
logger.info(f"Copying file with difference: {src} to {dest}")
shutil.copyfile(src, dest)
assert False, "Files {src} and {dest} are different"
# These are binary files.
da = DataAccessLocal()
f1_bytes = da.get_file(src)
f2_bytes = da.get_file(dest)
assert (len(f1_bytes) - len(f2_bytes)) < 50, \
f"produced file length {len(f1_bytes)} is different from expected file extension {len(f2_bytes)}"
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,21 @@ class AbstractTransformLauncherTest(AbstractTest):

@staticmethod
def _get_argv(
launcher: AbstractTransformLauncher, cli_params: dict[str, Any], in_table_path: str, out_table_path: str
cli_params: dict[str, Any], in_table_path: str, out_table_path: str
):
args = {} | cli_params
local_ast = {"input_folder": in_table_path, "output_folder": out_table_path}
args["data_local_config"] = local_ast
# if isinstance(launcher, RayTransformLauncher):
# args["run_locally"] = "True"
argv = ParamsUtils.dict_to_req(args)
return argv

def test_transform(
self,
launcher: AbstractTransformLauncher,
cli_params: dict[str, Any],
in_table_path: str,
expected_out_table_path: str,
self,
launcher: AbstractTransformLauncher,
cli_params: dict[str, Any],
in_table_path: str,
expected_out_table_path: str,
ignore_columns: list[str],
):
"""
Test the given transform and its runtime using the given CLI arguments, input directory of data files and expected output directory.
Expand All @@ -59,29 +58,36 @@ def test_transform(
prefix = launcher.get_transform_name()
with tempfile.TemporaryDirectory(prefix=prefix, dir="/tmp") as temp_dir:
print(f"Using temporary output path {temp_dir}")
sys.argv = self._get_argv(launcher, cli_params, in_table_path, temp_dir)
sys.argv = self._get_argv(cli_params, in_table_path, temp_dir)
launcher.launch()
self._validate_directory_contents_match(temp_dir, expected_out_table_path)
self._validate_directory_contents_match(temp_dir, expected_out_table_path, ignore_columns)

def _validate_directory_contents_match(self, dir: str, expected: str):
def _validate_directory_contents_match(self, dir: str, expected: str, ignore_columns: list[str] = []):
"""
Confirm that the two directories contains the same files.
Stubbed out like this to allow spark tests to override this since spark tends to rename the files.
"""
AbstractTest.validate_directory_contents(dir, expected)
AbstractTest.validate_directory_contents(dir, expected, ignore_columns)

def _install_test_fixtures(self, metafunc):
# Apply the fixtures for the method with these input names (i.e. test_transform()).
if (
"launcher" in metafunc.fixturenames
and "cli_params" in metafunc.fixturenames
and "in_table_path" in metafunc.fixturenames
and "expected_out_table_path" in metafunc.fixturenames
"launcher" in metafunc.fixturenames
and "cli_params" in metafunc.fixturenames
and "in_table_path" in metafunc.fixturenames
and "expected_out_table_path" in metafunc.fixturenames
and "ignore_columns" in metafunc.fixturenames
):
# Let the sub-class define the specific tests and test data for the transform under test.
f = self.get_test_transform_fixtures()
fixtures = self.get_test_transform_fixtures()
# for backward compatibility to make ignore_columns optional
fi = 0
for f in fixtures:
if len(f) == 4:
fixtures[fi] = f + ([],)
fi += 1
# Install the fixture, matching the parameter names used by test_transform() method.
metafunc.parametrize("launcher,cli_params,in_table_path,expected_out_table_path", f)
metafunc.parametrize("launcher,cli_params,in_table_path,expected_out_table_path,ignore_columns", fixtures)

def get_test_transform_fixtures(self) -> list[tuple]:
"""
Expand All @@ -91,6 +97,7 @@ def get_test_transform_fixtures(self) -> list[tuple]:
| Item 1: The dictionary of command line args to simulate when running the transform.
| Item 2: The input path to the parquet files to process.
| Item 3: the output path holding the expected results of the transform including parquet and metadata.json
| Item 4: columns to drop for table comparison (optional), if omitted an empty array is used
:return: a list of Tuples, to test. Each tuple contains the test inputs for test_transform() method.
"""
raise NotImplemented()
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .table_transform_test import AbstractTableTransformTest
from .binary_transform_test import AbstractBinaryTransformTest
from .noop_transform import (
NOOPTransform,
NOOPPythonTransformConfiguration,
)
from .transform_test import AbstractTransformTest
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from typing import Tuple


from data_processing.test_support.abstract_test import AbstractTest
from data_processing.transform import AbstractBinaryTransform


class AbstractBinaryTransformTest(AbstractTest):
"""
The test class for all/most AbstractBinaryTransform implementations.
Generic tests are provided here, and sub-classes must implement the _get*_fixture() method(s)
to provide the test data for a given test method. For example, get_test_transform_fixtures()
provides the test data for the test_transform() test method.
"""

def _install_test_fixtures(self, metafunc):
# Apply the fixtures for the method with these input names (i.e. test_transform()).
if (
"transform" in metafunc.fixturenames
and "in_binary_list" in metafunc.fixturenames
and "expected_binary_list" in metafunc.fixturenames
and "expected_metadata_list" in metafunc.fixturenames
):
# Let the sub-class define the specific tests and test data for the transform under test.
f = self.get_test_transform_fixtures()
# Install the fixture, matching the parameter names used by test_transform() method.
metafunc.parametrize("transform,in_binary_list,expected_binary_list,expected_metadata_list", f)

def test_transform(
self,
transform: AbstractBinaryTransform,
in_binary_list: list[tuple[str, bytes]],
expected_binary_list: list[tuple[bytes, str]],
expected_metadata_list: list[dict[str, float]],
):
"""
Use the given transform to transform() the given binary file and compare the results (list of binary files and
metadata) with the expected values as given. The inputs are provided by the sub-class definition of
get_test_transform_fixtures().
:param transform: transform to test.
:param in_binary_list: table(s) to transform
:param expected_binary_list: the expected accumulation of output files produced by the transform() call.
This should include any empty files if some of the calls to transform_binary() generate empty files.
If the final call to flush() produces an empty list of files, these will not be included here (duh!).
However, see expected_metadata_list for the handling of metadata produced by flush().
:param expected_metadata_list: the expected list of accumulated metadata dictionaries across all calls to
transform() and the final call to flush(). Transforms that produce nothing from flush() should include
and empty dictionary at the end of this list.
:return:
"""
all_files_list = []
all_metadata_list = []
for in_file in in_binary_list:
files_list, metadata = transform.transform_binary(base_name=in_file[0], byte_array=in_file[1])
all_files_list.extend(files_list)
all_metadata_list.append(metadata)

files_list, metadata = transform.flush_binary()
all_files_list.extend(files_list)
all_metadata_list.append(metadata)

AbstractBinaryTransformTest.validate_expected_files(all_files_list, expected_binary_list)
AbstractBinaryTransformTest.validate_expected_metadata_lists(all_metadata_list, expected_metadata_list)

def get_test_transform_fixtures(self) -> list[Tuple]:
"""
Get the test data for the test_transform() test.
:return: a list of Tuples, to test. Each tuple contains the test inputs for test_transform() method.
Item 0: The AbstractBinaryTransform to be tested
Item 1: The input file(s) to be transformed
Item 2: The expected list of output file(s) for transformation of the input.
Item 3: the expected metadata for transformation of the input.
"""
raise NotImplemented()
Loading

0 comments on commit dc44844

Please sign in to comment.