Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def validate_dags(self, expected_parent_dag, actual_found_dags, actual_dagbag, s
actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))

for dag_id in expected_dag_ids:
actual_dagbag.log.info(f'validating {dag_id}')
actual_dagbag.log.info('validating %s', dag_id)
assert (dag_id in actual_found_dag_ids) == should_be_found, (
f"dag \"{dag_id}\" should {'' if should_be_found else 'not '}"
f"have been found after processing dag \"{expected_parent_dag.dag_id}\""
Expand Down
104 changes: 52 additions & 52 deletions tests/providers/apache/hive/transfers/test_s3_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import filecmp
import logging
import shutil
import unittest
from collections import OrderedDict
from gzip import GzipFile
from itertools import product
Expand All @@ -40,8 +39,9 @@
mock_s3 = None


class TestS3ToHiveTransfer(unittest.TestCase):
def setUp(self):
class TestS3ToHiveTransfer:
@pytest.fixture(autouse=True)
def setup_attrs(self):
self.file_names = {}
self.task_id = 'S3ToHiveTransferTest'
self.s3_key = 'S32hive_test_file'
Expand Down Expand Up @@ -69,49 +69,45 @@ def setUp(self):
'wildcard_match': self.wildcard_match,
'input_compressed': self.input_compressed,
}
try:
header = b"Sno\tSome,Text \n"
line1 = b"1\tAirflow Test\n"
line2 = b"2\tS32HiveTransfer\n"
self.tmp_dir = mkdtemp(prefix='test_tmps32hive_')
# create sample txt, gz and bz2 with and without headers
with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_h:
self._set_fn(f_txt_h.name, '.txt', True)
f_txt_h.writelines([header, line1, line2])
fn_gz = self._get_fn('.txt', True) + ".gz"
with GzipFile(filename=fn_gz, mode="wb") as f_gz_h:
self._set_fn(fn_gz, '.gz', True)
f_gz_h.writelines([header, line1, line2])
fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_h:
self._set_fn(fn_gz_upper, '.GZ', True)
f_gz_upper_h.writelines([header, line1, line2])
fn_bz2 = self._get_fn('.txt', True) + '.bz2'
with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_h:
self._set_fn(fn_bz2, '.bz2', True)
f_bz2_h.writelines([header, line1, line2])
# create sample txt, bz and bz2 without header
with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_nh:
self._set_fn(f_txt_nh.name, '.txt', False)
f_txt_nh.writelines([line1, line2])
fn_gz = self._get_fn('.txt', False) + ".gz"
with GzipFile(filename=fn_gz, mode="wb") as f_gz_nh:
self._set_fn(fn_gz, '.gz', False)
f_gz_nh.writelines([line1, line2])
fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_nh:
self._set_fn(fn_gz_upper, '.GZ', False)
f_gz_upper_nh.writelines([line1, line2])
fn_bz2 = self._get_fn('.txt', False) + '.bz2'
with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_nh:
self._set_fn(fn_bz2, '.bz2', False)
f_bz2_nh.writelines([line1, line2])
# Base Exception so it catches Keyboard Interrupt
except BaseException as e:
logging.error(e)
self.tearDown()
header = b"Sno\tSome,Text \n"
line1 = b"1\tAirflow Test\n"
line2 = b"2\tS32HiveTransfer\n"
self.tmp_dir = mkdtemp(prefix='test_tmps32hive_')
# create sample txt, gz and bz2 with and without headers
with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_h:
self._set_fn(f_txt_h.name, '.txt', True)
f_txt_h.writelines([header, line1, line2])
fn_gz = self._get_fn('.txt', True) + ".gz"
with GzipFile(filename=fn_gz, mode="wb") as f_gz_h:
self._set_fn(fn_gz, '.gz', True)
f_gz_h.writelines([header, line1, line2])
fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_h:
self._set_fn(fn_gz_upper, '.GZ', True)
f_gz_upper_h.writelines([header, line1, line2])
fn_bz2 = self._get_fn('.txt', True) + '.bz2'
with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_h:
self._set_fn(fn_bz2, '.bz2', True)
f_bz2_h.writelines([header, line1, line2])
# create sample txt, bz and bz2 without header
with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_nh:
self._set_fn(f_txt_nh.name, '.txt', False)
f_txt_nh.writelines([line1, line2])
fn_gz = self._get_fn('.txt', False) + ".gz"
with GzipFile(filename=fn_gz, mode="wb") as f_gz_nh:
self._set_fn(fn_gz, '.gz', False)
f_gz_nh.writelines([line1, line2])
fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_nh:
self._set_fn(fn_gz_upper, '.GZ', False)
f_gz_upper_nh.writelines([line1, line2])
fn_bz2 = self._get_fn('.txt', False) + '.bz2'
with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_nh:
self._set_fn(fn_bz2, '.bz2', False)
f_bz2_nh.writelines([line1, line2])

yield

def tearDown(self):
try:
shutil.rmtree(self.tmp_dir)
except OSError as e:
Expand Down Expand Up @@ -152,6 +148,11 @@ def _check_file_equality(fn_1, fn_2, ext):
else:
return filecmp.cmp(fn_1, fn_2, shallow=False)

@staticmethod
def _load_file_side_effect(args, op_fn, ext):
check = TestS3ToHiveTransfer._check_file_equality(args[0], op_fn, ext)
assert check, f'{ext} output file not as expected'

def test_bad_parameters(self):
self.kwargs['check_headers'] = True
self.kwargs['headers'] = False
Expand Down Expand Up @@ -194,8 +195,8 @@ def test__delete_top_row_and_compress(self):
fn_bz2 = self._get_fn('.bz2', False)
assert self._check_file_equality(bz2_txt_nh, fn_bz2, '.bz2'), "bz2 Compressed file not as expected"

@unittest.skipIf(mock is None, 'mock package not present')
@unittest.skipIf(mock_s3 is None, 'moto package not present')
@pytest.mark.skipif(mock is None, reason='mock package not present')
@pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
@mock.patch('airflow.providers.apache.hive.transfers.s3_to_hive.HiveCliHook')
@mock_s3
def test_execute(self, mock_hiveclihook):
Expand All @@ -217,16 +218,15 @@ def test_execute(self, mock_hiveclihook):

# file parameter to HiveCliHook.load_file is compared
# against expected file output
mock_hiveclihook().load_file.side_effect = lambda *args, **kwargs: self.assertTrue(
self._check_file_equality(args[0], op_fn, ext),
f'{ext} output file not as expected',
mock_hiveclihook().load_file.side_effect = lambda *args, **kwargs: self._load_file_side_effect(
args, op_fn, ext
)
# Execute S3ToHiveTransfer
s32hive = S3ToHiveOperator(**self.kwargs)
s32hive.execute(None)

@unittest.skipIf(mock is None, 'mock package not present')
@unittest.skipIf(mock_s3 is None, 'moto package not present')
@pytest.mark.skipif(mock is None, reason='mock package not present')
@pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
@mock.patch('airflow.providers.apache.hive.transfers.s3_to_hive.HiveCliHook')
@mock_s3
def test_execute_with_select_expression(self, mock_hiveclihook):
Expand Down
20 changes: 2 additions & 18 deletions tests/providers/google/cloud/utils/gcp_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ def set_key_path_in_airflow_connection(self):
key path
:return: None
"""
session = settings.Session()
try:
with settings.Session() as session:
conn = session.query(Connection).filter(Connection.conn_id == 'google_cloud_default')[0]
extras = conn.extra_dejson
extras[KEYPATH_EXTRA] = self.full_key_path
Expand All @@ -106,22 +105,14 @@ def set_key_path_in_airflow_connection(self):
extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
extras[PROJECT_EXTRA] = self.project_extra if self.project_extra else self.project_id
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
self.log.error('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
session.close()

def set_dictionary_in_airflow_connection(self):
"""
Set dictionary in 'google_cloud_default' connection to contain content
of the json service account file.
:return: None
"""
session = settings.Session()
try:
with settings.Session() as session:
conn = session.query(Connection).filter(Connection.conn_id == 'google_cloud_default')[0]
extras = conn.extra_dejson
with open(self.full_key_path) as path_file:
Expand All @@ -132,13 +123,6 @@ def set_dictionary_in_airflow_connection(self):
extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
extras[PROJECT_EXTRA] = self.project_extra
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
self.log.error('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
session.close()

def _set_key_path(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/task/task_runner/test_standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_on_kill(self):
time.sleep(0.01)
logging.info("Task started. Give the task some time to settle")
time.sleep(3)
logging.info(f"Terminating processes {processes} belonging to {runner_pgid} group")
logging.info("Terminating processes %s belonging to %s group", processes, runner_pgid)
runner.terminate()
session.close() # explicitly close as `create_session`s commit will blow up otherwise

Expand Down
47 changes: 20 additions & 27 deletions tests/utils/test_compression.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -20,45 +19,39 @@
import errno
import filecmp
import gzip
import logging
import shutil
import tempfile
import unittest

import pytest

from airflow.utils import compression


class TestCompression(unittest.TestCase):
def setUp(self):
class TestCompression:
@pytest.fixture(autouse=True)
def setup_attrs(self):
self.file_names = {}
try:
header = b"Sno\tSome,Text \n"
line1 = b"1\tAirflow Test\n"
line2 = b"2\tCompressionUtil\n"
self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_')
# create sample txt, gz and bz2 files
with tempfile.NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt:
self._set_fn(f_txt.name, '.txt')
f_txt.writelines([header, line1, line2])
header = b"Sno\tSome,Text \n"
line1 = b"1\tAirflow Test\n"
line2 = b"2\tCompressionUtil\n"
self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_')
# create sample txt, gz and bz2 files
with tempfile.NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt:
self._set_fn(f_txt.name, '.txt')
f_txt.writelines([header, line1, line2])

fn_gz = self._get_fn('.txt') + ".gz"
with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
self._set_fn(fn_gz, '.gz')
f_gz.writelines([header, line1, line2])
fn_gz = self._get_fn('.txt') + ".gz"
with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
self._set_fn(fn_gz, '.gz')
f_gz.writelines([header, line1, line2])

fn_bz2 = self._get_fn('.txt') + '.bz2'
with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
self._set_fn(fn_bz2, '.bz2')
f_bz2.writelines([header, line1, line2])
fn_bz2 = self._get_fn('.txt') + '.bz2'
with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
self._set_fn(fn_bz2, '.bz2')
f_bz2.writelines([header, line1, line2])

# Base Exception so it catches Keyboard Interrupt
except BaseException as e:
logging.error(e)
self.tearDown()
yield

def tearDown(self):
try:
shutil.rmtree(self.tmp_dir)
except OSError as e:
Expand Down