Skip to content
This repository has been archived by the owner on Nov 15, 2018. It is now read-only.

enabling inheritance of topologies (the queries can be executed from dif... #2

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/hydro/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
__author__ = 'yanivshalev'

import hashlib
import os
import inspect


def create_cache_key(fingerprint):
return hashlib.md5(fingerprint).hexdigest()
return hashlib.md5(fingerprint).hexdigest()


def get_directory_from_search_path(given_search_path, file_name, topmost_class):
# get the relevant directory for a file from a list of inherited classes
for class_type in given_search_path:
dir_path = os.path.dirname(inspect.getabsfile(class_type))
# The chosen directory has to have a conf.py file
conf_file_path = '%s/%s' % (dir_path, file_name)
if (os.path.exists(conf_file_path)) or (topmost_class == class_type):
return class_type.__module__, os.path.dirname(inspect.getabsfile(class_type)), True
#TODO: make it work when no path is sent / when it didn't find the file
return None, None, False
10 changes: 8 additions & 2 deletions src/hydro/query_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from importlib import import_module
from base_classes import Base, HydroCommandTemplate
from hydro.common.utils import create_cache_key
from hydro.common.utils import create_cache_key, get_directory_from_search_path
from copy import deepcopy


Expand All @@ -11,6 +11,7 @@ def __init__(self, modules_dir, connection_handler, cache_engine, execution_plan
# TODO: check for the existence of the dir and file and throw error otherwise
self._modules_dir = modules_dir
self._templates_dir = modules_dir
self._directory_search_path = None
self._execution_plan = execution_plan
self._conf = import_module('%s.conf' % self._modules_dir).conf

Expand All @@ -24,7 +25,9 @@ def __init__(self, modules_dir, connection_handler, cache_engine, execution_plan
self._logger = logger

def _build_plan(self, logic_plan, params):
template = HydroCommandTemplate(self._templates_dir, logic_plan.template_file)
template_module, template_dir, found_dir = \
get_directory_from_search_path(self._directory_search_path, logic_plan.template_file, QueryEngine)
template = HydroCommandTemplate(template_dir, logic_plan.template_file)
execution_plan = template.parse(params)
return execution_plan

Expand Down Expand Up @@ -80,6 +83,9 @@ def get_config_item(self, key):
def set_templates_dir(self, templates_dir):
self._templates_dir = templates_dir

def set_templates_dir_path(self, directory_search_path):
self._directory_search_path = directory_search_path

def set_topology_lookup_callback(self, callback_function):
"""
call back function to lookup Hydro registered topologies
Expand Down
31 changes: 17 additions & 14 deletions src/hydro/topology_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,25 @@
from base_classes import Base
from cache.in_memory import InMemoryCache
from cache.mysql_cache import MySQLCache
from hydro.common.utils import create_cache_key
from hydro.common.utils import create_cache_key, get_directory_from_search_path
import os
import inspect
from hydro.exceptions import HydroException
from hydro.common.configurator import Configurator


class Topology(Base):
def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=None, cls=None, logger=None):
def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=None, logger=None):
super(Topology, self).__init__()

if logger:
self.logger = logger

# TODO: read the cache engines from the configuration and allow passing parameters if needed
#Toplogy is support self discovering of its needed modules but it can be supplied in init
if not base_dir:
base_dir = self.__module__
if not cls:
cls = self.__class__

#Toplogy supports self discovering of its needed modules but it can be supplied in init
directory_search_path = self._set_directory_variables(base_dir)
self.cache_engines = {Configurator.CACHE_ENGINE_IN_MEMORY: InMemoryCache, Configurator.CACHE_ENGINE_MYSQL_CACHE: MySQLCache}
self.transformers = Transformers()
self.base_dir = '.'.join(base_dir.split('.')[:-1])
self._modules_dir = self.base_dir
self._templates_dir = os.path.dirname(inspect.getabsfile(cls))

cache_engine_params = None
if type(cache_engine) == dict:
cache_engine_name = cache_engine['cache_engine_name']
Expand All @@ -41,15 +33,26 @@ def __init__(self, cache_engine=Configurator.CACHE_ENGINE_IN_MEMORY, base_dir=No
cache_engine_name = cache_engine
cache_engine_class = self.cache_engines.get(cache_engine_name, InMemoryCache)
self.cache_engine = cache_engine_class(cache_engine_params)

self._execution_plan = ExecutionPlan()
self.query_engine = QueryEngineFactory.get_query_engine(self._modules_dir, self.cache_engine, self._execution_plan, self.logger)
self.query_engine.set_templates_dir(self._templates_dir)
self.query_engine.set_templates_dir_path(directory_search_path)
self.logger.debug("Topology {0} was instantiated, modules_dir: {1}, templates_dir: {2}".
format(type(self).__name__, self._modules_dir, self._templates_dir))

self._topology_cache_ttl = Configurator.CACHE_DB_KEY_EXPIRE

def _set_directory_variables(self, base_dir):
directory_search_path = type(self).mro()
if not base_dir:
base_dir, self._templates_dir, found_dir = \
get_directory_from_search_path(directory_search_path, 'conf.py', Topology)
else:
# TODO: make this part work
self._templates_dir = os.path.dirname(inspect.getabsfile(self.__class__))
self.base_dir = '.'.join(base_dir.split('.')[:-1])
self._modules_dir = self.base_dir
return directory_search_path

def _submit(self, params):
raise HydroException('Not implemented')

Expand Down
4 changes: 2 additions & 2 deletions src/hydro/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ class Transformers(Base):
def __init__(self):
self._execution_plan = None

def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner'):
def combine(self, stream1, stream2, left_on=None, right_on=None, how='inner', suffixes=('_x', '_y')):
"""
takes two input streams (pandas data frames) and joins them based on the keys dictionary
"""
if self._execution_plan:
self._execution_plan.append('combine', 'transform')

if left_on and right_on:
return pd.merge(stream1, stream2, left_on=left_on, right_on=right_on, how=how)
return pd.merge(stream1, stream2, left_on=left_on, right_on=right_on, how=how, suffixes=suffixes)
else:
return None

Expand Down
48 changes: 48 additions & 0 deletions src/test/hydro/test_transformers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
__author__ = 'Yaniv Ranen'

import unittest
from hydro.transformers import Transformers
from pandas import DataFrame
from pandas.util.testing import assert_frame_equal
from numpy import nan

class TransformersTest(unittest.TestCase):
#creating data frames for the tests and results
test_df_1 = DataFrame({'id': ['1', '2'], 'name': ['try1', 'try2']})
test_df_2 = DataFrame({'id': ['1', '3'], 'full_name': ['full_try1', 'full_try3']})
test_df_3 = DataFrame({'id': ['1', '2'], 'full_name': ['try1', 'try2']})

test_result_1 = DataFrame({'id': ['1'], 'name': ['try1'], 'full_name': ['full_try1']})[["id", "name", "full_name"]]
test_result_2 = DataFrame({'id': ['1', '2'], 'name': ['try1', 'try2'],
'full_name': ['full_try1', nan]})[["id", "name", "full_name"]]
test_result_3 = DataFrame({'id': ['1', '3'], 'name': ['try1', nan],
'full_name': ['full_try1', 'full_try3']})[["id", "name", "full_name"]]
test_result_4 = DataFrame({'id': ['1'], 'full_name_left_side': ['try1'],
'full_name_right_side': ['full_try1']})[["full_name_left_side", "id",
"full_name_right_side"]]

transformer = Transformers()

def test_Transformers_combine(self):
# inner join
comb_res1 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id")[["id", "name", "full_name"]]
assert_frame_equal(comb_res1, self.test_result_1)

# left join
comb_res2 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id","left")[["id", "name",
"full_name"]]
assert_frame_equal(comb_res2, self.test_result_2)

# right join
comb_res3 = self.transformer.combine(self.test_df_1, self.test_df_2, "id", "id","right")[["id", "name",
"full_name"]]
assert_frame_equal(comb_res3, self.test_result_3)

#suffixes
comb_res4 = self.transformer.combine(self.test_df_3, self.test_df_2, "id", "id",
suffixes=('_left_side', '_right_side'))
assert_frame_equal(comb_res4, self.test_result_4)


if __name__ == '__main__':
unittest.main()