diff --git a/src/hydro/common/utils.py b/src/hydro/common/utils.py index 1cc1681..fe88b72 100644 --- a/src/hydro/common/utils.py +++ b/src/hydro/common/utils.py @@ -1,6 +1,21 @@ __author__ = 'yanivshalev' import hashlib +import os +import inspect + def create_cache_key(fingerprint): - return hashlib.md5(fingerprint).hexdigest() \ No newline at end of file + return hashlib.md5(fingerprint).hexdigest() + + +def get_directory_from_search_path(given_search_path, file_name, topmost_class): + # get the relevant directory for a file from a list of inherited classes + for class_type in given_search_path: + dir_path = os.path.dirname(inspect.getabsfile(class_type)) + # The chosen directory has to have a conf.py file + conf_file_path = '%s/%s' % (dir_path, file_name) + if (os.path.exists(conf_file_path)) or (topmost_class == class_type): + return class_type.__module__, os.path.dirname(inspect.getabsfile(class_type)), True + #TODO: make it work when no path is sent / when it didn't find the file + return None, None, False diff --git a/src/hydro/query_engine.py b/src/hydro/query_engine.py index fd2b5ef..274af7a 100644 --- a/src/hydro/query_engine.py +++ b/src/hydro/query_engine.py @@ -2,7 +2,7 @@ from importlib import import_module from base_classes import Base, HydroCommandTemplate -from hydro.common.utils import create_cache_key +from hydro.common.utils import create_cache_key, get_directory_from_search_path from copy import deepcopy @@ -11,6 +11,7 @@ def __init__(self, modules_dir, connection_handler, cache_engine, execution_plan # TODO: check for the existence of the dir and file and throw error otherwise self._modules_dir = modules_dir self._templates_dir = modules_dir + self._directory_search_path = None self._execution_plan = execution_plan self._conf = import_module('%s.conf' % self._modules_dir).conf @@ -24,7 +25,9 @@ def __init__(self, modules_dir, connection_handler, cache_engine, execution_plan self._logger = logger def _build_plan(self, logic_plan, params): - template = HydroCommandTemplate(self._templates_dir, logic_plan.template_file) + template_module, template_dir, found_dir = \ + get_directory_from_search_path(self._directory_search_path, logic_plan.template_file, QueryEngine) + template = HydroCommandTemplate(template_dir, logic_plan.template_file) execution_plan = template.parse(params) return execution_plan @@ -80,6 +83,9 @@ def get_config_item(self, key): def set_templates_dir(self, templates_dir): self._templates_dir = templates_dir + def set_templates_dir_path(self, directory_search_path): + self._directory_search_path = directory_search_path + def set_topology_lookup_callback(self, callback_function): """ call back function to lookup Hydro registered topologies diff --git a/src/hydro/topology_base.py b/src/hydro/topology_base.py index d34a16d..5e56801 100644 --- a/src/hydro/topology_base.py +++ b/src/hydro/topology_base.py @@ -6,7 +6,7 @@ from base_classes import Base from cache.in_memory import InMemoryCache from cache.mysql_cache import MySQLCache -from hydro.common.utils import create_cache_key +from hydro.common.utils import create_cache_key, get_directory_from_search_path import os import inspect from hydro.exceptions import HydroException @@ -14,25 +14,17 @@ class Topology(Base): - def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=None, cls=None, logger=None): + def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=None, logger=None): super(Topology, self).__init__() if logger: self.logger = logger # TODO: read the cache engines from the configuration and allow passing parameters if needed - #Toplogy is support self discovering of its needed modules but it can be supplied in init - if not base_dir: - base_dir = self.__module__ - if not cls: - cls = self.__class__ - + #Toplogy supports self discovering of its needed modules but it can be supplied in init + directory_search_path = self._set_directory_variables(base_dir) self.cache_engines = {Configurator.CACHE_ENGINE_IN_MEMORY: InMemoryCache, Configurator.CACHE_ENGINE_MYSQL_CACHE: MySQLCache} self.transformers = Transformers() - self.base_dir = '.'.join(base_dir.split('.')[:-1]) - self._modules_dir = self.base_dir - self._templates_dir = os.path.dirname(inspect.getabsfile(cls)) - cache_engine_params = None if type(cache_engine) == dict: cache_engine_name = cache_engine['cache_engine_name'] @@ -41,15 +33,26 @@ def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=No cache_engine_name = cache_engine cache_engine_class = self.cache_engines.get(cache_engine_name, InMemoryCache) self.cache_engine = cache_engine_class(cache_engine_params) - self._execution_plan = ExecutionPlan() self.query_engine = QueryEngineFactory.get_query_engine(self._modules_dir, self.cache_engine, self._execution_plan, self.logger) - self.query_engine.set_templates_dir(self._templates_dir) + self.query_engine.set_templates_dir_path(directory_search_path) self.logger.debug("Topology {0} was instantiated, modules_dir: {1}, templates_dir: {2}". format(type(self).__name__, self._modules_dir, self._templates_dir)) self._topology_cache_ttl = Configurator.CACHE_DB_KEY_EXPIRE + def _set_directory_variables(self, base_dir): + directory_search_path = type(self).mro() + if not base_dir: + base_dir, self._templates_dir, found_dir = \ + get_directory_from_search_path(directory_search_path, 'conf.py', Topology) + else: + # TODO: make this part work + self._templates_dir = os.path.dirname(inspect.getabsfile(self.__class__)) + self.base_dir = '.'.join(base_dir.split('.')[:-1]) + self._modules_dir = self.base_dir + return directory_search_path + def _submit(self, params): raise HydroException('Not implemented') diff --git a/src/hydro/transformers.py b/src/hydro/transformers.py index 506d454..31b46df 100644 --- a/src/hydro/transformers.py +++ b/src/hydro/transformers.py @@ -8,7 +8,7 @@ class Transformers(Base): def __init__(self): self._execution_plan = None - def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner'): + def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner', suffixes=('_x', '_y')): """ takes two input streams (pandas data frames) and joins them based on the keys dictionary """ @@ -16,7 +16,7 @@ def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner'): self._execution_plan.append('combine', 'transform') if left_on and right_on: - return pd.merge(stream1, stream2, left_on=left_on, right_on=right_on, how=how) + return pd.merge(stream1, stream2, left_on=left_on, right_on=right_on, how=how, suffixes=suffixes) else: return None diff --git a/src/test/hydro/test_transformers.py b/src/test/hydro/test_transformers.py new file mode 100644 index 0000000..7d10d79 --- /dev/null +++ b/src/test/hydro/test_transformers.py @@ -0,0 +1,48 @@ +__author__ = 'Yaniv Ranen' + +import unittest +from hydro.transformers import Transformers +from pandas import DataFrame +from pandas.util.testing import assert_frame_equal +from numpy import nan + +class TransformersTest(unittest.TestCase): + #creating data frames for the tests and results + test_df_1 = DataFrame({'id': ['1', '2'], 'name': ['try1', 'try2']}) + test_df_2 = DataFrame({'id': ['1', '3'], 'full_name': ['full_try1', 'full_try3']}) + test_df_3 = DataFrame({'id': ['1', '2'], 'full_name': ['try1', 'try2']}) + + test_result_1 = DataFrame({'id': ['1'], 'name': ['try1'], 'full_name': ['full_try1']})[["id", "name", "full_name"]] + test_result_2 = DataFrame({'id': ['1', '2'], 'name': ['try1', 'try2'], + 'full_name': ['full_try1', nan]})[["id", "name", "full_name"]] + test_result_3 = DataFrame({'id': ['1', '3'], 'name': ['try1', nan], + 'full_name': ['full_try1', 'full_try3']})[["id", "name", "full_name"]] + test_result_4 = DataFrame({'id': ['1'], 'full_name_left_side': ['try1'], + 'full_name_right_side': ['full_try1']})[["full_name_left_side", "id", + "full_name_right_side"]] + + transformer = Transformers() + + def test_Transformers_combine(self): + # inner join + comb_res1 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id")[["id", "name", "full_name"]] + assert_frame_equal(comb_res1, self.test_result_1) + + # left join + comb_res2 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id","left")[["id", "name", + "full_name"]] + assert_frame_equal(comb_res2, self.test_result_2) + + # right join + comb_res3 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id","right")[["id", "name", + "full_name"]] + assert_frame_equal(comb_res3, self.test_result_3) + + #suffixes + comb_res4 = self.transformer.combine(self.test_df_3, self.test_df_2, "id", "id", + suffixes=('_left_side', '_right_side')) + assert_frame_equal(comb_res4, self.test_result_4) + + +if __name__ == '__main__': + unittest.main()