Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sdc 11380 #27

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
132 changes: 100 additions & 32 deletions stage/configuration/test_directory_origin.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,43 +420,34 @@ def test_directory_origin_configuration_delimiter_format_type(sdc_builder, sdc_e
"""Test for Directory origin can read delimited file with different delimiter format type.
Here we will be creating delimited files in different formats for testing. e.g. POSTGRES_CSV, TDF, RFC4180, etc.,
"""
files_directory = os.path.join('/tmp', get_random_string())
FILE_NAME = 'delimited_file.csv'
FILE_CONTENTS = [['field1', 'field2', 'field3'], ['Field11', 'Field12', 'fält13'], ['стол', 'Field22', 'Field23']]
file_name = 'delimited_file.csv'
file_contents = [['field1', 'field2', 'field3'], ['Field11', 'Field12', 'fält13'], ['стол', 'Field22', 'Field23']]
delimiter_character_map = {'CUSTOM': '^'}
delimiter_character = '^' if delimiter_format_type == 'CUSTOM' else None

try:
logger.debug('Creating files directory %s ...', files_directory)
shell_executor(f'mkdir {files_directory}')
delimited_file_writer(os.path.join(files_directory, FILE_NAME),
FILE_CONTENTS, delimiter_format_type, delimiter_character)

pipeline_builder = sdc_builder.get_pipeline_builder()
directory = pipeline_builder.add_stage('Directory')
directory.set_attributes(data_format=data_format,
files_directory=files_directory,
file_name_pattern='delimited_*',
file_name_pattern_mode='GLOB',
delimiter_format_type=delimiter_format_type,
delimiter_character=delimiter_character,
root_field_type=root_field_type,
header_line=header_line)
trash = pipeline_builder.add_stage('Trash')
directory >> trash
pipeline = pipeline_builder.build()
files_directory = DirectoryOriginCommon.create_file_directory(file_name, file_contents, shell_executor,
delimited_file_writer, delimiter_format_type,
delimiter_character)

attributes = {'data_format':data_format,
'files_directory':files_directory,
'file_name_pattern':'delimited_*',
'file_name_pattern_mode':'GLOB',
'delimiter_format_type':delimiter_format_type,
'delimiter_character':delimiter_character,
'root_field_type':root_field_type,
'header_line':header_line}
directory, pipeline = DirectoryOriginCommon.get_directory_trash_pipeline(sdc_builder, attributes)

sdc_executor.add_pipeline(pipeline)
snapshot = sdc_executor.capture_snapshot(pipeline, start_pipeline=True, batch_size=3).snapshot
sdc_executor.stop_pipeline(pipeline)
output_records = snapshot[directory.instance_name].output
new_line_field = 'Field12\nSTR' if delimiter_format_type == 'EXCEL' else 'Field12'

assert 2 == len(output_records)
assert output_records[0].field == OrderedDict(
[('field1', 'Field11'), ('field2', new_line_field), ('field3', 'fält13')])
assert output_records[1].field == OrderedDict(
[('field1', 'стол'), ('field2', 'Field22'), ('field3', 'Field23')])
new_line_field = 'Field12\nSTR' if delimiter_format_type == 'EXCEL' else 'Field12'
file_contents[1][1] = new_line_field
DirectoryOriginCommon.verify_delimited_output(output_records, file_contents[1:3], file_contents[0])
finally:
shell_executor(f'rm -r {files_directory}')

Expand All @@ -477,9 +468,45 @@ def test_directory_origin_configuration_error_directory(sdc_builder, sdc_executo

@pytest.mark.parametrize('delimiter_format_type', ['CUSTOM'])
@pytest.mark.parametrize('data_format', ['DELIMITED'])
@pytest.mark.skip('Not yet implemented')
def test_directory_origin_configuration_escape_character(sdc_builder, sdc_executor, delimiter_format_type, data_format):
pass
@pytest.mark.parametrize('escape_character', ['\t', ';' , ' '])
@pytest.mark.parametrize('delimiter_character', ['@'])
def test_directory_origin_configuration_escape_character(sdc_builder, sdc_executor, delimiter_format_type,
data_format, escape_character, shell_executor,
delimited_file_writer, delimiter_character):
"""Verify if directory origin can read delimited data custom escape character.
This TC check for different escape characters. Input data fields have delimiter characters.
Directory origin should read this data and produce field without escape character.
e.g. ;|Field is value of field with "|" as delimiter character and ";" as escape character
then output field should be "|Field".
"""
file_name = 'custom_delimited_file.csv'
f = lambda ip_string: ip_string.format(escape_character=escape_character, delimiter_character=delimiter_character)
f1 = lambda ip_string: ip_string.replace(escape_character, "")
data = [[f('Field11{escape_character}{delimiter_character}'), 'Field12', f('{escape_character}"Field13')],
[f('Field{escape_character}{delimiter_character}21'), 'Field22', 'Field23']]

try:
files_directory = DirectoryOriginCommon.create_file_directory(file_name, data, shell_executor,delimited_file_writer,
delimiter_format_type, delimiter_character)

attributes = {'data_format':data_format,
'files_directory':files_directory,
'file_name_pattern':'custom_delimited_*',
'file_name_pattern_mode':'GLOB',
'delimiter_format_type':delimiter_format_type,
'delimiter_character':delimiter_character,
'escape_character':escape_character}
directory, pipeline = DirectoryOriginCommon.get_directory_trash_pipeline(sdc_builder, attributes)

sdc_executor.add_pipeline(pipeline)
snapshot = sdc_executor.capture_snapshot(pipeline, start_pipeline=True, batch_size=3).snapshot
sdc_executor.stop_pipeline(pipeline)
output_records = snapshot[directory.instance_name].output

expected_output = [map(f1, data[0]), map(f1, data[1])]
DirectoryOriginCommon.verify_delimited_output(output_records, expected_output)
finally:
shell_executor(f'rm -r {files_directory}')


@pytest.mark.parametrize('data_format', ['EXCEL'])
Expand Down Expand Up @@ -918,5 +945,46 @@ def test_directory_origin_configuration_use_custom_log_format(sdc_builder, sdc_e


## Start of general supportive functions
def get_text_file_content(file_number):
return '\n'.join(['This is line{}{}'.format(str(file_number), i) for i in range(1, 4)])


# Class with common functionalities
class DirectoryOriginCommon(object):

def __init__(self):
pass

@staticmethod
def get_directory_trash_pipeline(sdc_builder, attributes):
pipeline_builder = sdc_builder.get_pipeline_builder()
directory = pipeline_builder.add_stage('Directory')
directory.set_attributes(**attributes)
trash = pipeline_builder.add_stage('Trash')
directory >> trash
pipeline = pipeline_builder.build()
return directory, pipeline

@staticmethod
def create_file_directory(file_name, file_content, shell_executor, file_writer, delimiter_format_type=None,
delimiter_character=None):
files_directory = os.path.join('/tmp', get_random_string())
logger.debug('Creating files directory %s ...', files_directory)
shell_executor(f'mkdir {files_directory}')
file_path = os.path.join(files_directory, file_name)
if delimiter_format_type:
file_writer(file_path, file_content, delimiter_format_type, delimiter_character)
else:
file_writer(file_path, file_content)
return files_directory

@staticmethod
def get_text_file_content(file_number, lines_needed=3):
return '\n'.join(['This is line{}{}'.format(str(file_number), i) for i in range(1, (lines_needed + 1))])

@staticmethod
def verify_delimited_output(output_records, data, header=None):
if not header:
header = [str(i) for i in range(0, 3)]
assert 2 == len(output_records)
assert output_records[0].field == OrderedDict(zip(header, data[0]))
assert output_records[1].field == OrderedDict(zip(header, data[1]))