Skip to content

Commit

Permalink
Added argument for specifying gcs_bucket separately
Browse files Browse the repository at this point in the history
  • Loading branch information
agiix committed Apr 10, 2023
1 parent 8b5d28e commit 549b534
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 35 deletions.
31 changes: 17 additions & 14 deletions pipeline/run_beam_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@


def run_parallel_pipelines(runner: beam_tables.ScanDataBeamPipelineRunner,
dataset: str, scan_types: List[str],
incremental_load: bool,
dataset_bq: str, dataset_gcs: str,
scan_types: List[str], incremental_load: bool,
start_date: Optional[datetime.date],
end_date: Optional[datetime.date], export_gcs: bool,
export_bq: bool) -> bool:
"""Runs beam pipelines for different scan types in parallel.
Args:
runner: ScanDataBeamPipelineRunner to run pipelines
dataset: dataset name to write to like 'prod' or 'laplante
dataset_bq: big query dataset name to write to like 'prod' or 'laplante
dataset_gcs: gcs dataset name to write to like 'prod' or 'laplante
scan_types: list of scan types to run ['echo' 'http']
incremental_load: boolean. If true, only load the latest new data, if false
reload all data.
Expand All @@ -60,18 +61,18 @@ def run_parallel_pipelines(runner: beam_tables.ScanDataBeamPipelineRunner,
gcs_folder = None

if export_bq and export_gcs:
table_name = beam_tables.get_table_name(dataset, scan_type,
table_name = beam_tables.get_table_name(dataset_bq, scan_type,
beam_tables.BASE_TABLE_NAME)
gcs_folder = beam_tables.get_gcs_folder(beam_tables.BASE_GCS_NAME,
scan_type, runner.output_bucket)
gcs_folder = beam_tables.get_gcs_folder(dataset_gcs, scan_type,
runner.output_bucket)
job_name = beam_tables.get_gcs_job_name(gcs_folder, incremental_load)
else:
if export_gcs:
gcs_folder = beam_tables.get_gcs_folder(dataset, scan_type,
gcs_folder = beam_tables.get_gcs_folder(dataset_gcs, scan_type,
runner.output_bucket)
job_name = beam_tables.get_gcs_job_name(gcs_folder, incremental_load)
else:
table_name = beam_tables.get_table_name(dataset, scan_type,
table_name = beam_tables.get_table_name(dataset_bq, scan_type,
beam_tables.BASE_TABLE_NAME)
job_name = beam_tables.get_bq_job_name(table_name, incremental_load)

Expand Down Expand Up @@ -154,14 +155,16 @@ def main(parsed_args: argparse.Namespace) -> None:
pipeline_runner = get_beam_pipeline_runner(parsed_args.env)
if parsed_args.env == 'user':
run_parallel_pipelines(pipeline_runner, parsed_args.user_dataset,
selected_scan_types, incremental,
parsed_args.start_date, parsed_args.end_date,
parsed_args.export_gcs, parsed_args.export_bq)
parsed_args.user_dataset, selected_scan_types,
incremental, parsed_args.start_date,
parsed_args.end_date, parsed_args.export_gcs,
parsed_args.export_bq)
elif parsed_args.env in ('dev', 'prod'):
run_parallel_pipelines(pipeline_runner, beam_tables.BASE_DATASET_NAME,
selected_scan_types, incremental,
parsed_args.start_date, parsed_args.end_date,
parsed_args.export_gcs, parsed_args.export_bq)
beam_tables.BASE_GCS_NAME, selected_scan_types,
incremental, parsed_args.start_date,
parsed_args.end_date, parsed_args.export_gcs,
parsed_args.export_bq)


def parse_args() -> argparse.Namespace:
Expand Down
43 changes: 23 additions & 20 deletions pipeline/test_run_beam_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ def test_run_single_pipelines(self) -> None:
mock_runner = MagicMock(beam_tables.ScanDataBeamPipelineRunner)
mock_runner.output_bucket = 'firehook-test'

run_beam_tables.run_parallel_pipelines(mock_runner, 'base', ['echo'], True,
run_beam_tables.run_parallel_pipelines(mock_runner, 'base', 'scans',
['echo'], True,
datetime.date(2020, 1, 1),
datetime.date(2020, 1, 2), False,
False)
datetime.date(2020, 1,
2), False, False)

mock_runner.run_beam_pipeline.assert_called_with('echo', True,
'append-base-echo-scan',
Expand All @@ -46,22 +47,23 @@ def test_run_single_pipelines(self) -> None:
datetime.date(2020, 1, 2),
False, False)

run_beam_tables.run_parallel_pipelines(mock_runner, 'base', ['echo'], True,
run_beam_tables.run_parallel_pipelines(mock_runner, 'base', 'scans',
['echo'], True,
datetime.date(2020, 1, 1),
datetime.date(2020, 1, 2), True,
False)
datetime.date(2020, 1,
2), True, False)

mock_runner.run_beam_pipeline.assert_called_with(
'echo', True, 'append-gs-firehook-test-base-echo', None,
'gs://firehook-test/base/echo', datetime.date(2020, 1, 1),
'echo', True, 'append-gs-firehook-test-scans-echo', None,
'gs://firehook-test/scans/echo', datetime.date(2020, 1, 1),
datetime.date(2020, 1, 2), True, False)

def test_run_parallel_pipelines(self) -> None:
"""Test running two pipelines in parallel."""
mock_runner = MagicMock(beam_tables.ScanDataBeamPipelineRunner)
mock_runner.output_bucket = 'firehook-test'

run_beam_tables.run_parallel_pipelines(mock_runner, 'laplante',
run_beam_tables.run_parallel_pipelines(mock_runner, 'laplante', 'laplante',
['http', 'https'], False, None, None,
False, False)

Expand All @@ -72,7 +74,7 @@ def test_run_parallel_pipelines(self) -> None:
mock_runner.run_beam_pipeline.assert_has_calls([call1, call2],
any_order=True)

run_beam_tables.run_parallel_pipelines(mock_runner, 'laplante',
run_beam_tables.run_parallel_pipelines(mock_runner, 'laplante', 'laplante',
['http', 'https'], False, None, None,
True, False)

Expand Down Expand Up @@ -125,17 +127,18 @@ def test_main_prod(self) -> None:
export_bq=False)
run_beam_tables.main(args)

call6 = call('echo', True, 'append-gs-firehook-test-base-echo', None,
'gs://firehook-test/base/echo', None, None, True, False)
call7 = call('discard', True, 'append-gs-firehook-test-base-discard',
None, 'gs://firehook-test/base/discard', None, None, True,
call6 = call('echo', True, 'append-gs-firehook-test-scans-echo', None,
'gs://firehook-test/scans/echo', None, None, True, False)
call7 = call('discard', True, 'append-gs-firehook-test-scans-discard',
None, 'gs://firehook-test/scans/discard', None, None, True,
False)
call8 = call('http', True, 'append-gs-firehook-test-base-http', None,
'gs://firehook-test/base/http', None, None, True, False)
call9 = call('https', True, 'append-gs-firehook-test-base-https', None,
'gs://firehook-test/base/https', None, None, True, False)
call10 = call('satellite', True, 'append-gs-firehook-test-base-satellite',
None, 'gs://firehook-test/base/satellite', None, None, True,
call8 = call('http', True, 'append-gs-firehook-test-scans-http', None,
'gs://firehook-test/scans/http', None, None, True, False)
call9 = call('https', True, 'append-gs-firehook-test-scans-https', None,
'gs://firehook-test/scans/https', None, None, True, False)
call10 = call('satellite', True,
'append-gs-firehook-test-scans-satellite', None,
'gs://firehook-test/scans/satellite', None, None, True,
False)
mock_runner.run_beam_pipeline.assert_has_calls(
[call6, call7, call8, call9, call10], any_order=True)
Expand Down
2 changes: 1 addition & 1 deletion schedule_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def run_pipeline(env: str) -> None:
# which is slow (hangs basic worker machines) and wasteful.
subprocess.run([
sys.executable, '-m', 'pipeline.run_beam_tables', f'--env={env}',
'--scan_type=all', '--export_gc', '--export_bq'
'--scan_type=all', '--export_bq'
],
check=True,
stdout=subprocess.PIPE)
Expand Down

0 comments on commit 549b534

Please sign in to comment.