From f2d4eea9d09859729ca182ae2857e1c1c5337694 Mon Sep 17 00:00:00 2001 From: Yaniv Ranen Date: Tue, 28 Apr 2015 15:23:09 +0300 Subject: [PATCH 1/4] enabling inheritance of topologies (the queries can be executed from different directories) --- src/hydro/common/utils.py | 17 ++++++++++++++++- src/hydro/query_engine.py | 10 ++++++++-- src/hydro/topology_base.py | 31 +++++++++++++++++-------------- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/hydro/common/utils.py b/src/hydro/common/utils.py index 1cc1681..12b554e 100644 --- a/src/hydro/common/utils.py +++ b/src/hydro/common/utils.py @@ -1,6 +1,21 @@ __author__ = 'yanivshalev' import hashlib +import os +import inspect def create_cache_key(fingerprint): - return hashlib.md5(fingerprint).hexdigest() \ No newline at end of file + return hashlib.md5(fingerprint).hexdigest() + +def get_directory_from_search_path(given_search_path, file_name, lowest_class_instance): + # get the relevant directory for a file from a list of inherited classes + found_dir = False + for stack_class_instance in given_search_path: + dir_path = os.path.dirname(inspect.getabsfile(stack_class_instance)) + # The chosen directory has to have a conf.py file + conf_file_path = '%s/%s'%(dir_path,file_name) + if (os.path.exists(conf_file_path)) or (lowest_class_instance == stack_class_instance): + found_dir = True + break + #TODO: make it work when no path is sent / when it didn't find the file + return stack_class_instance.__module__, os.path.dirname(inspect.getabsfile(stack_class_instance)), found_dir diff --git a/src/hydro/query_engine.py b/src/hydro/query_engine.py index fd2b5ef..274af7a 100644 --- a/src/hydro/query_engine.py +++ b/src/hydro/query_engine.py @@ -2,7 +2,7 @@ from importlib import import_module from base_classes import Base, HydroCommandTemplate -from hydro.common.utils import create_cache_key +from hydro.common.utils import create_cache_key, get_directory_from_search_path from copy import deepcopy @@ -11,6 +11,7 @@ def __init__(self, modules_dir, connection_handler, cache_engine, execution_plan # TODO: check for the existence of the dir and file and throw error otherwise self._modules_dir = modules_dir self._templates_dir = modules_dir + self._directory_search_path = None self._execution_plan = execution_plan self._conf = import_module('%s.conf' % self._modules_dir).conf @@ -24,7 +25,9 @@ def __init__(self, modules_dir, connection_handler, cache_engine, execution_plan self._logger = logger def _build_plan(self, logic_plan, params): - template = HydroCommandTemplate(self._templates_dir, logic_plan.template_file) + template_module, template_dir, found_dir = \ + get_directory_from_search_path(self._directory_search_path, logic_plan.template_file, QueryEngine) + template = HydroCommandTemplate(template_dir, logic_plan.template_file) execution_plan = template.parse(params) return execution_plan @@ -80,6 +83,9 @@ def get_config_item(self, key): def set_templates_dir(self, templates_dir): self._templates_dir = templates_dir + def set_templates_dir_path(self, directory_search_path): + self._directory_search_path = directory_search_path + def set_topology_lookup_callback(self, callback_function): """ call back function to lookup Hydro registered topologies diff --git a/src/hydro/topology_base.py b/src/hydro/topology_base.py index d34a16d..5e56801 100644 --- a/src/hydro/topology_base.py +++ b/src/hydro/topology_base.py @@ -6,7 +6,7 @@ from base_classes import Base from cache.in_memory import InMemoryCache from cache.mysql_cache import MySQLCache -from hydro.common.utils import create_cache_key +from hydro.common.utils import create_cache_key, get_directory_from_search_path import os import inspect from hydro.exceptions import HydroException @@ -14,25 +14,17 @@ class Topology(Base): - def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=None, cls=None, logger=None): + def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=None, logger=None): super(Topology, self).__init__() if logger: self.logger = logger # TODO: read the cache engines from the configuration and allow passing parameters if needed - #Toplogy is support self discovering of its needed modules but it can be supplied in init - if not base_dir: - base_dir = self.__module__ - if not cls: - cls = self.__class__ - + #Toplogy supports self discovering of its needed modules but it can be supplied in init + directory_search_path = self._set_directory_variables(base_dir) self.cache_engines = {Configurator.CACHE_ENGINE_IN_MEMORY: InMemoryCache, Configurator.CACHE_ENGINE_MYSQL_CACHE: MySQLCache} self.transformers = Transformers() - self.base_dir = '.'.join(base_dir.split('.')[:-1]) - self._modules_dir = self.base_dir - self._templates_dir = os.path.dirname(inspect.getabsfile(cls)) - cache_engine_params = None if type(cache_engine) == dict: cache_engine_name = cache_engine['cache_engine_name'] @@ -41,15 +33,26 @@ def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=No cache_engine_name = cache_engine cache_engine_class = self.cache_engines.get(cache_engine_name, InMemoryCache) self.cache_engine = cache_engine_class(cache_engine_params) - self._execution_plan = ExecutionPlan() self.query_engine = QueryEngineFactory.get_query_engine(self._modules_dir, self.cache_engine, self._execution_plan, self.logger) - self.query_engine.set_templates_dir(self._templates_dir) + self.query_engine.set_templates_dir_path(directory_search_path) self.logger.debug("Topology {0} was instantiated, modules_dir: {1}, templates_dir: {2}". format(type(self).__name__, self._modules_dir, self._templates_dir)) self._topology_cache_ttl = Configurator.CACHE_DB_KEY_EXPIRE + def _set_directory_variables(self, base_dir): + directory_search_path = type(self).mro() + if not base_dir: + base_dir, self._templates_dir, found_dir = \ + get_directory_from_search_path(directory_search_path, 'conf.py', Topology) + else: + # TODO: make this part work + self._templates_dir = os.path.dirname(inspect.getabsfile(self.__class__)) + self.base_dir = '.'.join(base_dir.split('.')[:-1]) + self._modules_dir = self.base_dir + return directory_search_path + def _submit(self, params): raise HydroException('Not implemented') From 75c7d4c5dc31482b2eddc33b16d7dca2ed67582d Mon Sep 17 00:00:00 2001 From: Moshe Basanchig Date: Tue, 28 Apr 2015 17:21:58 +0300 Subject: [PATCH 2/4] Minor refactoring --- src/hydro/common/utils.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/hydro/common/utils.py b/src/hydro/common/utils.py index 12b554e..fe88b72 100644 --- a/src/hydro/common/utils.py +++ b/src/hydro/common/utils.py @@ -4,18 +4,18 @@ import os import inspect + def create_cache_key(fingerprint): return hashlib.md5(fingerprint).hexdigest() -def get_directory_from_search_path(given_search_path, file_name, lowest_class_instance): + +def get_directory_from_search_path(given_search_path, file_name, topmost_class): # get the relevant directory for a file from a list of inherited classes - found_dir = False - for stack_class_instance in given_search_path: - dir_path = os.path.dirname(inspect.getabsfile(stack_class_instance)) + for class_type in given_search_path: + dir_path = os.path.dirname(inspect.getabsfile(class_type)) # The chosen directory has to have a conf.py file - conf_file_path = '%s/%s'%(dir_path,file_name) - if (os.path.exists(conf_file_path)) or (lowest_class_instance == stack_class_instance): - found_dir = True - break + conf_file_path = '%s/%s' % (dir_path, file_name) + if (os.path.exists(conf_file_path)) or (topmost_class == class_type): + return class_type.__module__, os.path.dirname(inspect.getabsfile(class_type)), True #TODO: make it work when no path is sent / when it didn't find the file - return stack_class_instance.__module__, os.path.dirname(inspect.getabsfile(stack_class_instance)), found_dir + return None, None, False From 9f8af9570eb7cd053910155810b6085232888560 Mon Sep 17 00:00:00 2001 From: Yaniv Ranen Date: Sun, 17 May 2015 10:31:55 +0300 Subject: [PATCH 3/4] add an option to control suffixes for combine --- src/hydro/transformers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hydro/transformers.py b/src/hydro/transformers.py index 506d454..31b46df 100644 --- a/src/hydro/transformers.py +++ b/src/hydro/transformers.py @@ -8,7 +8,7 @@ class Transformers(Base): def __init__(self): self._execution_plan = None - def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner'): + def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner', suffixes=('_x', '_y')): """ takes two input streams (pandas data frames) and joins them based on the keys dictionary """ @@ -16,7 +16,7 @@ def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner'): self._execution_plan.append('combine', 'transform') if left_on and right_on: - return pd.merge(stream1, stream2, left_on=left_on, right_on=right_on, how=how) + return pd.merge(stream1, stream2, left_on=left_on, right_on=right_on, how=how, suffixes=suffixes) else: return None From b2f0530905094a231eda8f207cb9a6fb0c9df848 Mon Sep 17 00:00:00 2001 From: Yaniv Ranen Date: Mon, 18 May 2015 10:07:56 +0300 Subject: [PATCH 4/4] =?UTF-8?q?add=20tests=20for=20hydro=E2=80=99s=20added?= =?UTF-8?q?=20parameters=20for=20comine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test/hydro/test_transformers.py | 48 +++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 src/test/hydro/test_transformers.py diff --git a/src/test/hydro/test_transformers.py b/src/test/hydro/test_transformers.py new file mode 100644 index 0000000..7d10d79 --- /dev/null +++ b/src/test/hydro/test_transformers.py @@ -0,0 +1,48 @@ +__author__ = 'Yaniv Ranen' + +import unittest +from hydro.transformers import Transformers +from pandas import DataFrame +from pandas.util.testing import assert_frame_equal +from numpy import nan + +class TransformersTest(unittest.TestCase): + #creating data frames for the tests and results + test_df_1 = DataFrame({'id': ['1', '2'], 'name': ['try1', 'try2']}) + test_df_2 = DataFrame({'id': ['1', '3'], 'full_name': ['full_try1', 'full_try3']}) + test_df_3 = DataFrame({'id': ['1', '2'], 'full_name': ['try1', 'try2']}) + + test_result_1 = DataFrame({'id': ['1'], 'name': ['try1'], 'full_name': ['full_try1']})[["id", "name", "full_name"]] + test_result_2 = DataFrame({'id': ['1', '2'], 'name': ['try1', 'try2'], + 'full_name': ['full_try1', nan]})[["id", "name", "full_name"]] + test_result_3 = DataFrame({'id': ['1', '3'], 'name': ['try1', nan], + 'full_name': ['full_try1', 'full_try3']})[["id", "name", "full_name"]] + test_result_4 = DataFrame({'id': ['1'], 'full_name_left_side': ['try1'], + 'full_name_right_side': ['full_try1']})[["full_name_left_side", "id", + "full_name_right_side"]] + + transformer = Transformers() + + def test_Transformers_combine(self): + # inner join + comb_res1 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id")[["id", "name", "full_name"]] + assert_frame_equal(comb_res1, self.test_result_1) + + # left join + comb_res2 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id","left")[["id", "name", + "full_name"]] + assert_frame_equal(comb_res2, self.test_result_2) + + # right join + comb_res3 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id","right")[["id", "name", + "full_name"]] + assert_frame_equal(comb_res3, self.test_result_3) + + #suffixes + comb_res4 = self.transformer.combine(self.test_df_3, self.test_df_2, "id", "id", + suffixes=('_left_side', '_right_side')) + assert_frame_equal(comb_res4, self.test_result_4) + + +if __name__ == '__main__': + unittest.main()