Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sdc 11380 #27

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
88 changes: 58 additions & 30 deletions stage/configuration/test_directory_origin.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ def test_directory_origin_configuration_delimiter_element(sdc_builder, sdc_execu
sdc_executor.add_pipeline(pipeline)
snapshot = sdc_executor.capture_snapshot(pipeline, start_pipeline=True, batch_size=3).snapshot
output_records = snapshot[directory.instance_name].output

item_list = output_records[0].field['msg']
rows_from_snapshot = [{item['time'][0]['value'].value: item['request'][0]['value'].value}
for item in item_list]
Expand All @@ -549,43 +550,34 @@ def test_directory_origin_configuration_delimiter_format_type(sdc_builder, sdc_e
"""Test for Directory origin can read delimited file with different delimiter format type.
Here we will be creating delimited files in different formats for testing. e.g. POSTGRES_CSV, TDF, RFC4180, etc.,
"""
files_directory = os.path.join('/tmp', get_random_string())
FILE_NAME = 'delimited_file.csv'
FILE_CONTENTS = [['field1', 'field2', 'field3'], ['Field11', 'Field12', 'fält13'], ['стол', 'Field22', 'Field23']]
file_name = 'delimited_file.csv'
file_contents = [['field1', 'field2', 'field3'], ['Field11', 'Field12', 'fält13'], ['стол', 'Field22', 'Field23']]
delimiter_character_map = {'CUSTOM': '^'}
delimiter_character = '^' if delimiter_format_type == 'CUSTOM' else None

try:
logger.debug('Creating files directory %s ...', files_directory)
shell_executor(f'mkdir {files_directory}')
delimited_file_writer(os.path.join(files_directory, FILE_NAME),
FILE_CONTENTS, delimiter_format_type, delimiter_character)

pipeline_builder = sdc_builder.get_pipeline_builder()
directory = pipeline_builder.add_stage('Directory')
directory.set_attributes(data_format=data_format,
files_directory=files_directory,
file_name_pattern='delimited_*',
file_name_pattern_mode='GLOB',
delimiter_format_type=delimiter_format_type,
delimiter_character=delimiter_character,
root_field_type=root_field_type,
header_line=header_line)
trash = pipeline_builder.add_stage('Trash')
directory >> trash
pipeline = pipeline_builder.build()
files_directory = DirectoryOriginCommon.create_file_directory(file_name, file_contents, shell_executor,
delimited_file_writer, delimiter_format_type,
delimiter_character)

attributes = {'data_format':data_format,
'files_directory':files_directory,
'file_name_pattern':'delimited_*',
'file_name_pattern_mode':'GLOB',
'delimiter_format_type':delimiter_format_type,
'delimiter_character':delimiter_character,
'root_field_type':root_field_type,
'header_line':header_line}
directory, pipeline = DirectoryOriginCommon.get_directory_trash_pipeline(sdc_builder, attributes)

sdc_executor.add_pipeline(pipeline)
snapshot = sdc_executor.capture_snapshot(pipeline, start_pipeline=True, batch_size=3).snapshot
sdc_executor.stop_pipeline(pipeline)
output_records = snapshot[directory.instance_name].output
new_line_field = 'Field12\nSTR' if delimiter_format_type == 'EXCEL' else 'Field12'

assert 2 == len(output_records)
assert output_records[0].field == OrderedDict(
[('field1', 'Field11'), ('field2', new_line_field), ('field3', 'fält13')])
assert output_records[1].field == OrderedDict(
[('field1', 'стол'), ('field2', 'Field22'), ('field3', 'Field23')])
new_line_field = 'Field12\nSTR' if delimiter_format_type == 'EXCEL' else 'Field12'
file_contents[1][1] = new_line_field
DirectoryOriginCommon.verify_delimited_output(output_records, file_contents[1:3], file_contents[0])
finally:
shell_executor(f'rm -r {files_directory}')

Expand All @@ -606,9 +598,45 @@ def test_directory_origin_configuration_error_directory(sdc_builder, sdc_executo

@pytest.mark.parametrize('delimiter_format_type', ['CUSTOM'])
@pytest.mark.parametrize('data_format', ['DELIMITED'])
@pytest.mark.skip('Not yet implemented')
def test_directory_origin_configuration_escape_character(sdc_builder, sdc_executor, delimiter_format_type, data_format):
pass
@pytest.mark.parametrize('escape_character', ['\t', ';' , ' '])
@pytest.mark.parametrize('delimiter_character', ['@'])
def test_directory_origin_configuration_escape_character(sdc_builder, sdc_executor, delimiter_format_type,
data_format, escape_character, shell_executor,
delimited_file_writer, delimiter_character):
"""Verify if directory origin can read delimited data custom escape character.
This TC check for different escape characters. Input data fields have delimiter characters.
Directory origin should read this data and produce field without escape character.
e.g. ;|Field is value of field with "|" as delimiter character and ";" as escape character
then output field should be "|Field".
"""
file_name = 'custom_delimited_file.csv'
f = lambda ip_string: ip_string.format(escape_character=escape_character, delimiter_character=delimiter_character)
f1 = lambda ip_string: ip_string.replace(escape_character, "")
data = [[f('Field11{escape_character}{delimiter_character}'), 'Field12', f('{escape_character}"Field13')],
[f('Field{escape_character}{delimiter_character}21'), 'Field22', 'Field23']]

try:
files_directory = create_file_and_directory(file_name, data, shell_executor,delimited_file_writer,
delimiter_format_type, delimiter_character)

attributes = {'data_format': data_format,
'files_directory': files_directory,
'file_name_pattern': 'custom_delimited_*',
'file_name_pattern_mode': 'GLOB',
'delimiter_format_type': delimiter_format_type,
'delimiter_character': delimiter_character,
'escape_character': escape_character}
directory, pipeline = get_directory_to_trash_pipeline(sdc_builder, attributes)

sdc_executor.add_pipeline(pipeline)
snapshot = sdc_executor.capture_snapshot(pipeline, start_pipeline=True, batch_size=3).snapshot
output_records = snapshot[directory.instance_name].output

expected_output = [map(f1, data[0]), map(f1, data[1])]
verify_delimited_output(output_records, expected_output)
finally:
sdc_executor.stop_pipeline(pipeline)
shell_executor(f'rm -r {files_directory}')


@pytest.mark.parametrize('data_format', ['EXCEL'])
Expand Down