diff --git a/src/pyjedai/block_building.py b/src/pyjedai/block_building.py index 54e06c2..3c6682e 100644 --- a/src/pyjedai/block_building.py +++ b/src/pyjedai/block_building.py @@ -13,7 +13,7 @@ from .datamodel import Block, Data, PYJEDAIFeature from .utils import (are_matching, drop_big_blocks_by_size, - drop_single_entity_blocks) + drop_single_entity_blocks, get_blocks_cardinality) from .evaluation import Evaluation class AbstractBlockProcessing(PYJEDAIFeature): @@ -27,7 +27,13 @@ def __init__(self): self.attributes_2: list self.num_of_blocks_dropped: int self.original_num_of_blocks: int - + self.sum_of_sizes: int = 0 + self.total_num_of_comparisons: int = 0 + self.min_block_size: int = None + self.max_block_size: int = None + self.min_block_comparisons: int = None + self.max_block_comparisons: int = None + def report(self) -> None: """Prints Block Building method configuration """ @@ -75,7 +81,8 @@ def evaluate(self, id2 in entity_index and are_matching(entity_index, id1, id2): true_positives += 1 - eval_obj.calculate_scores(true_positives=true_positives) + total_matching_pairs = get_blocks_cardinality(eval_blocks, self.data.is_dirty_er) + eval_obj.calculate_scores(true_positives=true_positives, total_matching_pairs=total_matching_pairs) eval_result = eval_obj.report(self.method_configuration(), export_to_df, export_to_dict, @@ -84,8 +91,7 @@ def evaluate(self, if with_stats: self.stats(eval_blocks) return eval_result - - + def stats(self, blocks: dict) -> None: self.list_of_sizes = [] self.entities_in_blocks = set() @@ -120,7 +126,6 @@ def stats(self, blocks: dict) -> None: ) print(u'\u2500' * 123) - class AbstractBlockBuilding(AbstractBlockProcessing): """Abstract class for the block building method """ @@ -137,13 +142,7 @@ def __init__(self): self.attributes_2: list self.execution_time: float self.data: Data - self.sum_of_sizes: int = 0 self.list_of_sizes: list = [] - self.total_num_of_comparisons: int = 0 - self.min_block_size: int = None - self.max_block_size: int = None - self.min_block_comparisons: int = None - self.max_block_comparisons: int = None def build_blocks( self, diff --git a/src/pyjedai/block_cleaning.py b/src/pyjedai/block_cleaning.py index f450be4..1035909 100644 --- a/src/pyjedai/block_cleaning.py +++ b/src/pyjedai/block_cleaning.py @@ -13,7 +13,7 @@ from .block_building import AbstractBlockProcessing from .datamodel import Block, Data -from .utils import create_entity_index, drop_single_entity_blocks +from .utils import create_entity_index, drop_single_entity_blocks, java_math_round class AbstractBlockCleaning(AbstractBlockProcessing): @@ -81,14 +81,13 @@ def process( ) sorted_blocks = _sort_blocks_cardinality(blocks, self.data.is_dirty_er) self._progress_bar.update(1) - entity_index = create_entity_index(sorted_blocks, self.data.is_dirty_er) + self.entity_index = create_entity_index(sorted_blocks, self.data.is_dirty_er) self._progress_bar.update(1) filtered_blocks = {} - for entity_id, block_keys in entity_index.items(): + for entity_id, block_keys in self.entity_index.items(): # Create new blocks from the entity index - # print(list(block_keys[:int(round(self.ratio*len(block_keys)))])) block_keys = list(block_keys) - for key in list(block_keys[:int(round(self.ratio*len(block_keys)))]): + for key in list(block_keys[:java_math_round(self.ratio*float(len(block_keys)))]): filtered_blocks.setdefault(key, Block()) # Entities ids start to 0 ... n-1 for 1st dataset # and n ... m for 2nd dataset @@ -97,17 +96,17 @@ def process( self._progress_bar.update(1) new_blocks = drop_single_entity_blocks(filtered_blocks, self.data.is_dirty_er) self._progress_bar.close() + self.num_of_blocks_dropped = len(blocks) - len(new_blocks) self.execution_time = time() - start_time self.blocks = new_blocks - - return new_blocks + + return self.blocks def _configuration(self) -> dict: return { "Ratio" : self.ratio } - class BlockPurging(AbstractBlockCleaning): """Discards the blocks exceeding a certain number of comparisons. """ diff --git a/src/pyjedai/comparison_cleaning.py b/src/pyjedai/comparison_cleaning.py index 29427cb..6799849 100644 --- a/src/pyjedai/comparison_cleaning.py +++ b/src/pyjedai/comparison_cleaning.py @@ -3,19 +3,21 @@ from itertools import chain from collections import defaultdict from logging import warning -from math import log10 +from math import log10, sqrt from queue import PriorityQueue from time import time import numpy as np +import math from tqdm.autonotebook import tqdm from .evaluation import Evaluation from .datamodel import Data, PYJEDAIFeature -from .utils import chi_square, create_entity_index +from .utils import chi_square, create_entity_index, get_sorted_blocks_shuffled_entities, PositionIndex, canonical_swap, sorted_enumerate from abc import ABC, abstractmethod +from typing import Tuple, List class AbstractComparisonCleaning(PYJEDAIFeature): @@ -37,7 +39,8 @@ def process( self, blocks: dict, data: Data, - tqdm_disable: bool = False + tqdm_disable: bool = False, + store_weights: bool = False ) -> dict: """Main method for comparison cleaning @@ -50,7 +53,7 @@ def process( dict: cleaned blocks """ start_time = time() - self.tqdm_disable, self.data = tqdm_disable, data + self.tqdm_disable, self.data, self.store_weights = tqdm_disable, data, store_weights self._limit = self.data.num_of_entities \ if self.data.is_dirty_er or self._node_centric else self.data.dataset_limit self._progress_bar = tqdm( @@ -59,6 +62,7 @@ def process( disable=self.tqdm_disable ) + self._stored_weights = defaultdict(float) if self.store_weights else None self._entity_index = create_entity_index(blocks, self.data.is_dirty_er) self._num_of_blocks = len(blocks) self._blocks: dict = blocks @@ -93,6 +97,24 @@ def _configuration(self) -> dict: def stats(self) -> None: pass + + def get_precalculated_weight(self, entity_id: int, neighbor_id: int) -> float: + """Returns the precalculated weight for given pair + + Args: + entity_id (int): Entity ID + neighbor_id (int): Neighbor ID + + Raises: + AttributeError: Given pair has no precalculated weigth + + Returns: + float: Pair weigth + """ + if(not self.store_weights): raise AttributeError("No precalculated weights.") + _weight = self._stored_weights.get(canonical_swap(entity_id, neighbor_id), KeyError(f"Pair [{entity_id},{neighbor_id}] has no precalculated weight")) + return _weight + def evaluate(self, prediction, export_to_df: bool = False, @@ -160,6 +182,19 @@ def _get_weight(self, entity_id: int, neighbor_id: int) -> float: ws = self.weighting_scheme if ws == 'ARCS' or ws == 'CBS': return self._counters[neighbor_id] + # CARDINALITY_NORM_COSINE, SIZE_NORM_COSINE + elif ws == 'CNC' or ws == 'SNC': + return self._counters[neighbor_id] / float(sqrt(len(self._comparisons_per_entity[entity_id]) * self._comparisons_per_entity[neighbor_id])) + # SIZE_NORM_DICE, CARDINALITY_NORM_DICE + elif ws == 'SND' or ws == 'CND': + return 2 * self._counters[neighbor_id] / float(self._comparisons_per_entity[entity_id] + self._comparisons_per_entity[neighbor_id]) + # CARDINALITY_NORM_JS, SIZE_NORM_JS + elif ws == 'CNJ' or ws == 'SNJ': + return self._counters[neighbor_id] / float(self._comparisons_per_entity[entity_id] + self._comparisons_per_entity[neighbor_id] - self._counters[neighbor_id]) + elif ws == 'COSINE': + return self._counters[neighbor_id] / float(sqrt(len(self._entity_index[entity_id]) * len(self._entity_index[neighbor_id]))) + elif ws == 'DICE': + return 2 * self._counters[neighbor_id] / float(len(self._entity_index[entity_id]) + len(self._entity_index[neighbor_id])) elif ws == 'ECBS': return float( self._counters[neighbor_id] * @@ -271,7 +306,7 @@ def _apply_main_processing(self) -> dict: self.blocks[i] = self._valid_entities.copy() self._progress_bar.update(1) return self.blocks - + def _configuration(self) -> dict: return { "Node centric" : self._node_centric @@ -330,7 +365,6 @@ def _update_threshold(self, entity_id: int) -> None: def _set_threshold(self): self._num_of_edges = 0.0 self._threshold = 0.0 - for i in range(0, self._limit): self._process_entity(i) self._update_threshold(i) @@ -346,6 +380,8 @@ def _verify_valid_entities(self, entity_id: int) -> None: weight = self._get_weight(entity_id, neighbor_id) if self._threshold <= weight: self._retained_neighbors.add(neighbor_id) + if self.store_weights: + self._stored_weights[canonical_swap(entity_id, neighbor_id)] = weight if len(self._retained_neighbors) > 0: self.blocks[entity_id] = self._retained_neighbors.copy() @@ -381,6 +417,8 @@ def _prune_edges(self) -> dict: while not self._top_k_edges.empty(): comparison = self._top_k_edges.get() self.blocks[comparison[1]].add(comparison[2]) + if self.store_weights: + self._stored_weights[canonical_swap(comparison[1], comparison[2])] = comparison[0] return self.blocks @@ -469,6 +507,8 @@ def _verify_valid_entities(self, entity_id: int) -> None: while not self._top_k_edges.empty(): comparison = self._top_k_edges.get() self._nearest_entities[entity_id].add(comparison[2]) + if self.store_weights: + self._stored_weights[canonical_swap(entity_id, comparison[2])] = comparison[0] class ReciprocalCardinalityNodePruning(CardinalityNodePruning): """A Meta-blocking method that retains the comparisons \ @@ -516,7 +556,7 @@ def _get_valid_weight(self, entity_id: int, neighbor_id: int) -> float: entity_id < neighbor_id) else 0 def _set_threshold(self): - self._average_weight = np.empty([self.data.num_of_entities], dtype=float) + self._average_weight = np.zeros(self._limit, dtype=float) for i in range(0, self._limit): self._process_entity(i) self._update_threshold(i) @@ -534,8 +574,11 @@ def _verify_valid_entities(self, entity_id: int) -> None: return self._retained_neighbors.clear() for neighbor_id in self._valid_entities: - if self._get_valid_weight(entity_id, neighbor_id): + _weight = self._get_valid_weight(entity_id, neighbor_id) + if _weight: self._retained_neighbors.add(neighbor_id) + if self.store_weights: + self._stored_weights[canonical_swap(entity_id, neighbor_id)] = _weight if len(self._retained_neighbors) > 0: self.blocks[entity_id] = self._retained_neighbors.copy() @@ -595,10 +638,11 @@ def __init__(self, weighting_scheme: str = 'JS', budget: int = 0) -> None: def _set_threshold(self) -> None: self._threshold = self._budget - def process(self, blocks: dict, data: Data, tqdm_disable: bool = False, cc: AbstractMetablocking = None) -> dict: - + def process(self, blocks: dict, data: Data, tqdm_disable: bool = False, store_weights: bool = True, cc: AbstractMetablocking = None, emit_all_tps_stop : bool = False) -> dict: + + self._emit_all_tps_stop : bool = emit_all_tps_stop if(cc is None): - return super().process(blocks, data, tqdm_disable) + return super().process(blocks, data, tqdm_disable, store_weights) else: self._threshold = self._budget self._top_k_edges = PriorityQueue(int(2*self._threshold)) @@ -607,7 +651,7 @@ def process(self, blocks: dict, data: Data, tqdm_disable: bool = False, cc: Abst for entity_id, neighbors in blocks.items(): for neighbor_id in neighbors: - weight = cc._get_weight(entity_id, neighbor_id) + weight = cc.get_precalculated_weight(entity_id, neighbor_id) if weight >= self._minimum_weight: self._top_k_edges.put( (weight, entity_id, neighbor_id) @@ -618,6 +662,8 @@ def process(self, blocks: dict, data: Data, tqdm_disable: bool = False, cc: Abst while not self._top_k_edges.empty(): comparison = self._top_k_edges.get() self.trimmed_blocks[comparison[1]].add(comparison[2]) + if(self.store_weights): + self._stored_weights[canonical_swap(comparison[1], comparison[2])] = comparison[0] return self.trimmed_blocks @@ -627,21 +673,27 @@ def __init__(self, weighting_scheme: str = 'CBS', budget: int = 0) -> None: self._budget = budget def _set_threshold(self) -> None: - self._threshold = max(1, 2 * self._budget / self.data.num_of_entities) - - def process(self, blocks: dict, data: Data, tqdm_disable: bool = False, cc: AbstractMetablocking = None) -> dict: - + self._threshold = max(1, 2 * self._budget / self.data.num_of_entities) if not self._emit_all_tps_stop else 2 * self._budget + + def process(self, blocks: dict, + data: Data, + tqdm_disable: bool = False, + store_weights: bool = True, + cc: AbstractMetablocking = None, + emit_all_tps_stop : bool = False) -> dict: + self._emit_all_tps_stop : bool = emit_all_tps_stop if(cc is None): - return super().process(blocks, data, tqdm_disable) + return super().process(blocks, data, tqdm_disable, store_weights) + else: - self._threshold = max(1, 2 * self._budget / data.num_of_entities) + self._threshold = max(1, 2 * self._budget / data.num_of_entities) if not self._emit_all_tps_stop else 2 * self._budget self.trimmed_blocks : dict = defaultdict(set) for entity_id, neighbors in blocks.items(): self._minimum_weight = sys.float_info.min self._top_k_edges = PriorityQueue(int(2*self._threshold)) for neighbor_id in neighbors: - weight = cc._get_weight(entity_id, neighbor_id) + weight = cc.get_precalculated_weight(entity_id, neighbor_id) if weight >= self._minimum_weight: self._top_k_edges.put( (weight, entity_id, neighbor_id) @@ -652,9 +704,431 @@ def process(self, blocks: dict, data: Data, tqdm_disable: bool = False, cc: Abst while not self._top_k_edges.empty(): comparison = self._top_k_edges.get() self.trimmed_blocks[entity_id].add(comparison[2]) + if self.store_weights: + self._stored_weights[canonical_swap(entity_id, comparison[2])] = comparison[0] + + return self.trimmed_blocks + + +class ProgressiveSortedNeighborhood(AbstractMetablocking): + """Progressive Sorted Neighborhood""" + + _method_name = "Progressive Sorted Neighborhood" + _method_short_name: str = "PSN" + _method_info = "Sorts and iterates over entities in an incremental, windowed manner, compares the entities within defined windows " + \ + "and orders non-reduntant comparisons within the windows by decreasing frequency" + + def __init__(self, weighting_scheme: str = 'ACF', budget: int = 0) -> None: + self.weighting_scheme: str = weighting_scheme + self._budget : int = budget + super().__init__() + self._node_centric = False + + + def process( + self, + blocks: dict, + data: Data, + tqdm_disable: bool = False, + emit_all_tps_stop : bool = False + ) -> PriorityQueue: + """Calculates top comparisons for Progressive Matching + + Args: + blocks (dict): blocks creted from previous steps of pyjedai + data (Data): dataset module + tqdm_disable (bool, optional): Disables tqdm progress bars. Defaults to False. + + Returns: + PriorityQueue: Top Comparisons + """ + start_time = time() + self.tqdm_disable, self.data = tqdm_disable, data + self._limit = self.data.num_of_entities \ + if self.data.is_dirty_er or self._node_centric else self.data.dataset_limit + self._progress_bar = tqdm( + total=self._limit, + desc=self._method_name, + disable=self.tqdm_disable + ) + self._emit_all_tps_stop : bool = emit_all_tps_stop + self._num_of_blocks = len(blocks) + self._blocks: dict = blocks + + self._sorted_entity_ids = get_sorted_blocks_shuffled_entities(self.data.is_dirty_er, self._blocks) + self._total_sorted_entities = len(self._sorted_entity_ids) + self._position_index = PositionIndex(self.data.num_of_entities, self._sorted_entity_ids) + + self._counters = np.empty([self.data.num_of_entities], dtype=float) + self._flags = np.empty([self.data.num_of_entities], dtype=int) + self._counters[:] = 0 + self._flags[:] = -1 + self._pairs = self._apply_main_processing() + self.execution_time = time() - start_time + self._progress_bar.close() + + return self._pairs + + def _get_weight(self, entity_id: int, neighbor_id: int) -> float: + ws = self.weighting_scheme + + if ws == 'NCF': + denominator : float = len(self._position_index.get_positions(entity_id)) + len(self._position_index.get_positions(neighbor_id)) - self._counters[neighbor_id] + return self._counters[neighbor_id] / denominator + elif ws == 'ACF' or ws == 'ID': + return self._counters[neighbor_id] + else: + raise ValueError("This weighting scheme does not exist") + + def valid_entity_neighbor_index(self, entity: int, neighbor_index: int) -> bool: + """Verifies if the neighbor identifier at the specified index is valid for candidate (the pair hasn't been considered previously) - return self.trimmed_blocks + Args: + entity (int): Identifier of the current entity + neighbor_index (int): Index of the neighbor identifier within the list of sorted indices + + Returns: + bool: Valid / Not Valid candidate for pair + """ + neighbor = self._sorted_entity_ids[neighbor_index] + return self.data.dataset_limit <= neighbor if not self.data.is_dirty_er else neighbor < entity + + def _set_threshold(self): + pass + + def _prune_edges(self) -> dict: + pass + + def _configuration(self) -> dict: + pass + +class GlobalProgressiveSortedNeighborhood(ProgressiveSortedNeighborhood): + """Global Progressive Sorted Neighborhood""" + + _method_name = "Global Progressive Sorted Neighborhood" + _method_short_name: str = "GPSN" + _method_info = "For each sorted entity, conducts incrementally expanding window wise iteration over all the sorted entities, " + \ + "calculates local scores for the entities present within current window and stores the best comparisons on a global scale" + + def __init__(self, weighting_scheme: str = 'ACF', budget: int = 0) -> None: + super().__init__(weighting_scheme, budget) + + def _apply_main_processing(self) -> PriorityQueue: + self._max_window = 2 if self.data.num_of_entities <= 100 else int(2 ** (math.log10(self.data.num_of_entities) + 1) + 1) + # TO DO: budget taken as argument in prediction, not algorithm constructor + self._budget = float('inf') if self._emit_all_tps_stop else self._budget + self._top_pairs : PriorityQueue = PriorityQueue(2 * int(self._budget)) if not self._emit_all_tps_stop else PriorityQueue() + _top_unsorted_pairs: PriorityQueue = PriorityQueue(2 * int(self._budget)) if not self._emit_all_tps_stop else PriorityQueue() + + for entity in range(self.data.dataset_limit): + entity_positions = self._position_index.get_positions(entity) + self._neighbors.clear() + for current_window in range(1,self._max_window): + for entity_position in entity_positions: + right_neighbor = entity_position + current_window + left_neighbor = entity_position - current_window + + if(right_neighbor < self._total_sorted_entities): + if(self.valid_entity_neighbor_index(entity, right_neighbor)): + self._update_local_weight(current_window, entity, self._sorted_entity_ids[right_neighbor]) + if(left_neighbor >= 0): + if(self.valid_entity_neighbor_index(entity, left_neighbor)): + self._update_local_weight(current_window, entity, self._sorted_entity_ids[left_neighbor]) + + current_minimum_weight = -1 + for neighbor in self._neighbors: + self._flags[neighbor] = -1 + pair_weight = self._get_weight(entity, neighbor) + + if(pair_weight >= current_minimum_weight): + _top_unsorted_pairs.put( + (pair_weight, entity, neighbor) + ) + if self._budget < _top_unsorted_pairs.qsize(): + current_minimum_weight = _top_unsorted_pairs.get()[0] + + while(not _top_unsorted_pairs.empty()): + _score, _entity, _neighbor = _top_unsorted_pairs.get() + self._top_pairs.put((-_score, _entity, _neighbor)) + + return self._top_pairs + + def _update_local_weight(self, window : int, entity: int, neighbor: int): + """Updates the weight of the entity & neighbor pair for current window + Args: + window (int): Current Window Size + entity (int): Current Entity ID + neighbor (int): Current Neighbor ID + """ + + if(self._flags[neighbor] != entity): + self._counters[neighbor] = 0 + self._flags[neighbor] = entity + + pwScheme = self.weighting_scheme + if pwScheme == 'ID': + self._counters[neighbor] += 1.0 / window + else: + self._counters[neighbor] += 1.0 + + self._neighbors.add(neighbor) + +class LocalProgressiveSortedNeighborhood(ProgressiveSortedNeighborhood): + """Local Progressive Sorted Neighborhood""" + + _method_name = "Local Progressive Sorted Neighborhood" + _method_short_name: str = "LPSN" + _method_info = "Iteratively increments window size. For each one, derives the distinct neighbors for each entity, " + \ + "calculates their similarity, and emits the pairs in decreasing similarity score order" + + def __init__(self, weighting_scheme: str = 'ACF', budget: int = 0) -> None: + super().__init__(weighting_scheme, budget) + + def _has_next(self) -> bool: + """Validates if more pairs can be emitted + + Returns: + bool: Another pair can be emitted + """ + return self._emitted_comparisons < self._budget and self._current_window < self._total_sorted_entities + + def _apply_main_processing(self) -> List[Tuple[int, int]]: + self._emitted_comparisons = 0 + self._current_window = 1 + self._top_pairs: List[Tuple[int, int]] = [] + # TO DO: budget taken as argument in prediction, not algorithm constructor + self._budget = float('inf') if self._emit_all_tps_stop else self._budget + + while(self._has_next()): + _window_top_pairs = PriorityQueue() + for entity in range(self.data.dataset_limit): + entity_positions = self._position_index.get_positions(entity) + self._neighbors.clear() + + for entity_position in entity_positions: + right_neighbor = entity_position + self._current_window + left_neighbor = entity_position - self._current_window + + if(right_neighbor < self._total_sorted_entities): + if(self.valid_entity_neighbor_index(entity, right_neighbor)): + self._update_counters(entity, self._sorted_entity_ids[right_neighbor]) + if(left_neighbor >= 0): + if(self.valid_entity_neighbor_index(entity, left_neighbor)): + self._update_counters(entity, self._sorted_entity_ids[left_neighbor]) + + for neighbor in self._neighbors: + self._flags[neighbor] = -1 + pair_weight = self._get_weight(entity, neighbor) + + _window_top_pairs.put( + (-pair_weight, entity, neighbor) + ) + + while(len(self._top_pairs) < self._budget and not _window_top_pairs.empty()): + _, _entity, _neighbor = _window_top_pairs.get() + self._top_pairs.append((_entity, _neighbor)) + self._emitted_comparisons += 1 + + self._current_window += 1 + + return self._top_pairs + + def _update_counters(self, entity: int, neighbor: int): + """Updates the counters of the entity & neighbor pair for current window + + Args: + entity (int): Current Entity ID + neighbor (int): Current Neighbor ID + """ + + if(self._flags[neighbor] != entity): + self._counters[neighbor] = 0 + self._flags[neighbor] = entity + + self._counters[neighbor] += 1.0 + self._neighbors.add(neighbor) + + +class ProgressiveEntityScheduling(WeightedNodePruning): + """Progressive Entity Scheduling""" + + _method_name = "Progressive Entity Scheduling" + _method_short_name: str = "PES" + _method_info = "Sorts entities in descending order of their average weight, " + \ + "emits the top pair per entity. Finally, traverses the sorted " + \ + "entities and emits their comparisons in descending weight order " + \ + "within specified budget" + def __init__(self, weighting_scheme: str = 'CBS', budget: int = 0) -> None: + super().__init__(weighting_scheme) + self._budget = budget + + def _process_entity(self, entity_id: int) -> None: + """Calculates the counters for the neighbors of specified entity, + stores the weight for each neighbor and the top comparison for current entity. + Finally, creates a prunned block for specified entity + + Args: + entity_id (int): Entity ID + """ + if entity_id not in self._entity_index: + self.blocks[entity_id] = set() + return + self._valid_entities.clear() + self._flags[:] = -1 + associated_blocks = self._entity_index[entity_id] + + for block_id in associated_blocks: + self._normalize_neighbor_entities(block_id, entity_id) + for neighbor_id in self._neighbors: + if self._flags[neighbor_id] != entity_id: + self._counters[neighbor_id] = 0 + self._flags[neighbor_id] = entity_id + if self.weighting_scheme == 'ARCS': + self._counters[neighbor_id] += 1 / self._blocks[block_id].get_cardinality(self.data.is_dirty_er) + else: + self._counters[neighbor_id] += 1 + self._valid_entities.add(neighbor_id) + + for valid_entity_id in self._valid_entities: + _current_neighbor_weight = self._get_weight(entity_id, valid_entity_id) + self._sorted_neighbors[entity_id].put((-_current_neighbor_weight, valid_entity_id)) + if(self.store_weights): + self._stored_weights[canonical_swap(entity_id, valid_entity_id)] = _current_neighbor_weight + + if(self.method == 'HB' and not self._sorted_neighbors[entity_id].empty()): + _top_entity_weight, _top_entity_neighbor = self._sorted_neighbors[entity_id].get() + self._to_emit_pairs.append((-_top_entity_weight, entity_id, _top_entity_neighbor)) + + self.blocks[entity_id] = self._valid_entities.copy() + + def _prune_edges(self) -> dict: + return None + + def process_raw_blocks(self, blocks: dict): + self._entity_index = create_entity_index(blocks, self.data.is_dirty_er) + self._apply_main_processing() + + def process_prunned_blocks(self, blocks : dict, cc : AbstractMetablocking): + self._average_weight = np.zeros(self._limit, dtype=float) + self.blocks = blocks + for entity in sorted(blocks.keys()): + neighbors = blocks[entity] + _neighbors_weigth_sum : float = 0.0 + for neighbor in neighbors: + _current_neighbor_weigth = cc.get_precalculated_weight(entity, neighbor) + _neighbors_weigth_sum += _current_neighbor_weigth + self._sorted_neighbors[entity].put((-_current_neighbor_weigth, neighbor)) + + self._average_weight[entity] = _neighbors_weigth_sum / len(neighbors) if len(neighbors) else 0.0 + if(self.method == 'HB' and not self._sorted_neighbors[entity].empty()): + _top_entity_weight, _top_entity_neighbor = self._sorted_neighbors[entity].get() + self._to_emit_pairs.append((-_top_entity_weight, entity, _top_entity_neighbor)) + + def successful_emission(self, pair : tuple) -> bool: + """Attempts to emit given pair, returns True / False on Success / Fail + In the case of full emission, it always emits given pair + + Args: + pair (tuple): Tuple in the form (score, entity1, entity2) + + Returns: + bool: Successful / Failed Emission + """ + _weigth, _entity, _neighbor = pair + + _budget = float('inf') if self._emit_all_tps_stop else self._budget + + if(self._emitted_comparisons < _budget): + self.pairs.append((_entity, _neighbor)) + self._emitted_comparisons += 1 + self._progress_bar.update(1) + return True + else: + self.execution_time = time() - self.start_time + self._progress_bar.close() + return False + + + def produce_pairs(self) -> List[Tuple[int, int]]: + """Emits the top pair for each entity in decreasing average weigth order. + Traverses the entities in decreasing average weigth order and emits its + pairs in decreasing weight order + + Returns: + List[Tuple[float, int, int]]: List of emitted pairs + """ + self._emitted_comparisons = 0 + checked_entity = np.zeros(self._limit, dtype=bool) + self.pairs = [] + + for pair in self._to_emit_pairs: + if(not self.successful_emission(pair)): + return self.pairs + + if(self.method == 'HB' or self.method == 'DFS'): + for entity in self._avg_weight_sorted_entities: + checked_entity[entity] = True + while(not self._sorted_neighbors[entity].empty()): + weight, neighbor = self._sorted_neighbors[entity].get() + pair = -weight, entity, neighbor + if(not checked_entity[neighbor]): + if(not self.successful_emission(pair)): + return self.pairs + else: + _available_emissions = True + while(_available_emissions): + _available_emissions = False + for entity in self._avg_weight_sorted_entities: + if(not self._sorted_neighbors[entity].empty()): + weight, neighbor = self._sorted_neighbors[entity].get() + pair = -weight, entity, neighbor + if canonical_swap(entity, neighbor) not in self._checked_pairs: + if(not self.successful_emission(pair)): return self.pairs + self._checked_pairs.add(canonical_swap(entity, neighbor)) + _available_emissions = True + + return self.pairs + + def process(self, blocks: dict, data: Data, tqdm_disable: bool = False, store_weigths : bool = True, cc: AbstractMetablocking = None, method : str = 'HB', emit_all_tps_stop : bool = False) -> None: + """Calculates the weights between entities, stores them in descending order of their average weight, + stores the top comparison per entity + + Args: + blocks (dict): Blocks to process + data (Data): Data Feature + tqdm_disable (bool, optional): Progress Bar. Defaults to False. + cc (AbstractMetablocking, optional): Comparison Cleaner used in previous step. Defaults to None. + + Returns: + None: None + """ + + self.start_time = time() + self.tqdm_disable, self.data, self.store_weights, self.method = tqdm_disable, data, store_weigths, method + self._limit = self.data.num_of_entities \ + if self.data.is_dirty_er or self._node_centric else self.data.dataset_limit + self._progress_bar = tqdm( + total=self._limit, + desc=self._method_name, + disable=self.tqdm_disable + ) + + self._emit_all_tps_stop : bool = emit_all_tps_stop + self._num_of_blocks = len(blocks) + self._blocks: dict = blocks + self._stored_weights : dict = defaultdict(float) + self._to_emit_pairs = [] + self._sorted_neighbors = [PriorityQueue() for _ in range(self._limit)] + if(self.method == 'BFS'): self._checked_pairs = set() + + if(cc is None): + self.process_raw_blocks(blocks) + else: + self.process_prunned_blocks(blocks, cc) + + self._avg_weight_sorted_entities = sorted_enumerate(self._average_weight) + def get_meta_blocking_approach(acronym: str, w_scheme: str, budget: int = 0) -> any: """Return method by acronym @@ -683,6 +1157,12 @@ def get_meta_blocking_approach(acronym: str, w_scheme: str, budget: int = 0) -> return ProgressiveCardinalityEdgePruning(w_scheme, budget) elif acronym == "PCNP": return ProgressiveCardinalityNodePruning(w_scheme, budget) + elif acronym == "GPSN": + return GlobalProgressiveSortedNeighborhood(w_scheme, budget) + elif acronym == "LPSN": + return LocalProgressiveSortedNeighborhood(w_scheme, budget) + elif acronym == "PES": + return ProgressiveEntityScheduling(w_scheme, budget) else: warnings.warn("Wrong meta-blocking approach selected. Returning Comparison Propagation.") return ComparisonPropagation() diff --git a/src/pyjedai/datamodel.py b/src/pyjedai/datamodel.py index baaf927..92d725e 100644 --- a/src/pyjedai/datamodel.py +++ b/src/pyjedai/datamodel.py @@ -2,8 +2,14 @@ """ import pandas as pd from pandas import DataFrame, concat +import re +import nltk +nltk.download('stopwords') +from nltk.corpus import stopwords from abc import ABC, abstractmethod +from collections import defaultdict +from ordered_set import OrderedSet class PYJEDAIFeature(ABC): @@ -72,7 +78,8 @@ def __init__( attributes_2: list = None, id_column_name_2: str = None, dataset_name_2: str = None, - ground_truth: DataFrame = None + ground_truth: DataFrame = None, + inorder_gt: bool = True ) -> None: # Original Datasets as pd.DataFrame if isinstance(dataset_1, pd.DataFrame): @@ -98,6 +105,7 @@ def __init__( self.entities: DataFrame # Datasets specs + self.inorder_gt = inorder_gt self.is_dirty_er = dataset_2 is None self.dataset_limit = self.num_of_entities_1 = len(dataset_1) self.num_of_entities_2: int = len(dataset_2) if dataset_2 is not None else 0 @@ -105,6 +113,14 @@ def __init__( self.id_column_name_1 = id_column_name_1 self.id_column_name_2 = id_column_name_2 + + self.dataset_name_1 = dataset_name_1 + self.dataset_name_2 = dataset_name_2 + + # Fill NaN values with empty string + self.dataset_1.fillna("", inplace=True) + if not self.is_dirty_er: + self.dataset_2.fillna("", inplace=True) self.dataset_name_1 = dataset_name_1 self.dataset_name_2 = dataset_name_2 @@ -127,7 +143,6 @@ def __init__( self.attributes_1: list = attributes_1 if dataset_2 is not None: - if attributes_2 is None: if dataset_2.columns.values.tolist(): self.attributes_2 = dataset_2.columns.values.tolist() @@ -159,9 +174,22 @@ def __init__( if ground_truth is not None: self._create_gt_mapping() + self._store_pairs() else: self.ground_truth = None + def _store_pairs(self) -> None: + """Creates a mapping: + - pairs_of : ids of first dataset to ids of true matches from second dataset""" + + self.pairs_of = defaultdict(set) + d1_col_index, d2_col_index = (0, 1) if self.inorder_gt else (1,0) + + for _, row in self.ground_truth.iterrows(): + id1, id2 = (row[d1_col_index], row[d2_col_index]) + if id1 in self.pairs_of: self.pairs_of[id1].append(id2) + else: self.pairs_of[id1] = [id2] + def _create_gt_mapping(self) -> None: """Creates two mappings: - _ids_mapping_X: ids from initial dataset to index @@ -221,13 +249,77 @@ def print_specs(self) -> None: print("Number of matching pairs in ground-truth: ", len(self.ground_truth)) print(56*"-", "\n") + + # Functions that removes stopwords, punctuation, uni-codes, numbers from the dataset + def clean_dataset(self, + remove_stopwords: bool = True, + remove_punctuation: bool = True, + remove_numbers:bool = True, + remove_unicodes: bool = True) -> None: + """Removes stopwords, punctuation, uni-codes, numbers from the dataset. + """ + + # Make self.dataset_1 and self.dataset_2 lowercase + self.dataset_1 = self.dataset_1.applymap(lambda x: x.lower()) + if not self.is_dirty_er: + self.dataset_2 = self.dataset_2.applymap(lambda x: x.lower()) + + if remove_numbers: + self.dataset_1 = self.dataset_1.applymap(lambda x: re.sub(r'\d+', '', x)) + if not self.is_dirty_er: + self.dataset_2 = self.dataset_2.applymap(lambda x: re.sub(r'\d+', '', x)) + + if remove_unicodes: + self.dataset_1 = self.dataset_1.applymap(lambda x: re.sub(r'[^\x00-\x7F]+', '', x)) + if not self.is_dirty_er: + self.dataset_2 = self.dataset_2.applymap(lambda x: re.sub(r'[^\x00-\x7F]+', '', x)) + + if remove_punctuation: + self.dataset_1 = self.dataset_1.applymap(lambda x: re.sub(r'[^\w\s]','',x)) + if not self.is_dirty_er: + self.dataset_2 = self.dataset_2.applymap(lambda x: re.sub(r'[^\w\s]','',x)) + + if remove_stopwords: + self.dataset_1 = self.dataset_1.applymap(lambda x: ' '.join([word for word in x.split() if word not in (stopwords.words('english'))])) + if not self.is_dirty_er: + self.dataset_2 = self.dataset_2.applymap(lambda x: ' '.join([word for word in x.split() if word not in (stopwords.words('english'))])) + + self.entities = self.dataset_1 = self.dataset_1.astype(str) + + # Concatenated columns into new dataframe + self.entities_d1 = self.dataset_1[self.attributes_1] + + if not self.is_dirty_er: + self.dataset_2 = self.dataset_2.astype(str) + self.entities_d2 = self.dataset_2[self.attributes_2] + self.entities = pd.concat([self.dataset_1, self.dataset_2], + ignore_index=True) + + def stats_about_data(self) -> None: + + stats_df = pd.DataFrame(columns=['word_count_1', 'word_count_2']) + + # Calculate the average number of words per line + stats_df['word_count_1'] = self.dataset_1.apply(lambda row: len(row.str.split()), axis=1) + print(stats_df['word_count_1']) + average_words_per_line_1 = stats_df['word_count_1'].mean() + print(average_words_per_line_1) + + if not self.is_dirty_er: + stats_df['word_count_2'] = self.dataset_2.apply(lambda row: len(row.str.split()), axis=1) + average_words_per_line_2 = stats_df['word_count_2'].mean() + print(average_words_per_line_2) + + return stats_df + + class Block: """The main module used for storing entities in the blocking steps of pyjedai module. \ Consists of 2 sets of profile entities 1 for Dirty ER and 2 for Clean-Clean ER. """ def __init__(self) -> None: - self.entities_D1: set = set() - self.entities_D2: set = set() + self.entities_D1: set = OrderedSet() + self.entities_D2: set = OrderedSet() def get_cardinality(self, is_dirty_er) -> int: """Returns block cardinality. @@ -257,7 +349,7 @@ def verbose(self, key: any, is_dirty_er: bool) -> None: key (any): Block key is_dirty_er (bool): Dirty or Clean-Clean ER. """ - print("\nBlock ", "\033[1;32m"+key+"\033[0m", " contains entities with ids: ") + print("\nBlock ", "\033[1;32m"+key+"\033[0m", " has cardinality ", str(self.get_cardinality(is_dirty_er)) ," and contains entities with ids: ") if is_dirty_er: print("Dirty dataset: " + "[\033[1;34m" + \ str(len(self.entities_D1)) + " entities\033[0m]") diff --git a/src/pyjedai/evaluation.py b/src/pyjedai/evaluation.py index fd00f45..de4a801 100644 --- a/src/pyjedai/evaluation.py +++ b/src/pyjedai/evaluation.py @@ -15,7 +15,9 @@ from .datamodel import Data from .utils import are_matching from .utils import batch_pairs - +from .utils import canonical_swap +from math import inf +from .utils import PredictionData import random import matplotlib.pyplot as plt @@ -50,34 +52,6 @@ def _set_true_positives(self, true_positives) -> None: def _set_total_matching_pairs(self, total_matching_pairs) -> None: self.total_matching_pairs = total_matching_pairs - - # if isinstance(prediction, dict) and isinstance(list(prediction.values())[0], set): - # # case of candidate pairs, entity-id -> {entity-id, ..} - # self.total_matching_pairs = sum([len(block) for block in prediction.values()]) - # for _, (id1, id2) in gt.iterrows(): - # id1 = self.data._ids_mapping_1[id1] - # id2 = self.data._ids_mapping_1[id2] if self.data.is_dirty_er else self.data._ids_mapping_2[id2] - # if (id1 in prediction and id2 in prediction[id1]) or \ - # (id2 in prediction and id1 in prediction[id2]): - # self.true_positives += 1 - # elif isinstance(prediction, nx.Graph): - # self.total_matching_pairs = prediction.number_of_edges() - # for _, (id1, id2) in gt.iterrows(): - # id1 = self.data._ids_mapping_1[id1] - # id2 = self.data._ids_mapping_1[id2] if self.data.is_dirty_er else self.data._ids_mapping_2[id2] - # if (id1 in prediction and id2 in prediction[id1]) or \ - # (id2 in prediction and id1 in prediction[id2]): - # self.true_positives += 1 - # else: # blocks, clusters evaluation - # entity_index: dict = self._create_entity_index(prediction, all_gt_ids) - # for _, (id1, id2) in gt.iterrows(): - # id1 = self.data._ids_mapping_1[id1] - # id2 = self.data._ids_mapping_1[id2] if self.data.is_dirty_er else self.data._ids_mapping_2[id2] - # if id1 in entity_index and \ - # id2 in entity_index and \ - # are_matching(entity_index, id1, id2): - # self.true_positives += 1 - def calculate_scores(self, true_positives=None, total_matching_pairs=None) -> None: if true_positives is not None: self.true_positives = true_positives @@ -96,13 +70,12 @@ def calculate_scores(self, true_positives=None, total_matching_pairs=None) -> No self.false_negatives = self.num_of_true_duplicates - self.true_positives self.false_positives = self.total_matching_pairs - self.true_positives cardinality = (self.data.num_of_entities_1*(self.data.num_of_entities_1-1))/2 \ - if self.data.is_dirty_er else self.data.num_of_entities_1 * self.data.num_of_entities_2 - self.true_negatives = cardinality - self.false_negatives - self.false_positives + if self.data.is_dirty_er else (self.data.num_of_entities_1 * self.data.num_of_entities_2) + self.true_negatives = cardinality - self.false_negatives - self.num_of_true_duplicates self.precision = self.true_positives / self.total_matching_pairs self.recall = self.true_positives / self.num_of_true_duplicates if self.precision == 0.0 or self.recall == 0.0: self.f1 = 0.0 - #raise DivisionByZero("Recall or Precision is equal to zero. Can't calculate F1 score.") else: self.f1 = 2*((self.precision*self.recall)/(self.precision+self.recall)) @@ -235,7 +208,7 @@ def visualize_roc(method_names : List[str], methods_data : List[Tuple[str, float normalized_aucs.append(normalized_auc) if proportional: sizes = [cr * 100 for cr in cumulative_recall] else: sizes = [10] * len(cumulative_recall) - ax.scatter(x_values, cumulative_recall, marker='o', s=sizes, color=color, label=method_name) + ax.scatter(x_values, cumulative_recall, marker='o', s=0.05, color=color, label=method_name) ax.plot(x_values, cumulative_recall, color=color) ax.set_xlabel('ec*', fontweight='bold', labelpad=10) @@ -252,7 +225,7 @@ def visualize_roc(method_names : List[str], methods_data : List[Tuple[str, float # add AUC score legend handles, _ = ax.get_legend_handles_labels() auc_legend_labels = ['AUC: {:.2f}'.format(nauc) for nauc in normalized_aucs] - auc_legend = ax.legend(handles, auc_legend_labels, loc='lower left', bbox_to_anchor=(0.5, -0.4), ncol=2, frameon=True, title='Normalized AUC', title_fontsize=12) + auc_legend = ax.legend(handles, auc_legend_labels, loc='lower left', bbox_to_anchor=(0.5, -0.4), ncol=2, frameon=True, title='AUC', title_fontsize=12) auc_legend.get_title().set_fontweight('bold') for i, text in enumerate(auc_legend.get_texts()): plt.setp(text, color=colors[i]) @@ -284,7 +257,36 @@ def calculate_ideal_auc(self, pairs_num : int, true_duplicates_num : int) -> flo return ideal_auc - def calculate_roc_auc_data(self, data: Data, pairs, batch_size : int = 1) -> List[Tuple[int, int]]: + def _till_full_tps_emission(self) -> bool: + """Checks if emission should be stopped once all TPs have been found (TPs dict supplied) + Returns: + bool: Stop emission on all TPs found / Emit all pairs + """ + return self._true_positive_checked is not None + + def _all_tps_emitted(self) -> bool: + """Checks if all TPs have been emitted (Defaults to False in the case of all pairs emission approach) + Returns: + bool: All TPs emitted / not emitted + """ + if(self._till_full_tps_emission()): return self._tps_found >= len(self._true_positive_checked) + else: False + + def _update_true_positive_entry(self, entity : int, candidate : int) -> None: + """Updates the checked status of the given true positive + + Args: + entity (int): Entity ID + candidate (int): Candidate ID + """ + if(self._till_full_tps_emission()): + if(not self._true_positive_checked[canonical_swap(entity, candidate)]): + self._true_positive_checked[canonical_swap(entity, candidate)] = True + self._tps_found += 1 + return + + + def calculate_roc_auc_data(self, data: Data, pairs, batch_size : int = 1, true_positive_checked : dict = None) -> List[Tuple[int, int]]: """Progressively calculates total recall, AUC for each batch of candidate pairs Args: data (Data): Data Module @@ -295,6 +297,9 @@ def calculate_roc_auc_data(self, data: Data, pairs, batch_size : int = 1) -> Li List[Tuple[int, int]]: List of ROC graph points information (recall up to e, normalized auc up to e) """ + if(true_positive_checked is not None): + for pair in true_positive_checked.keys(): + true_positive_checked[pair] = False if(data.ground_truth is None): raise AttributeError("Can calculate ROC AUC without a ground-truth file. \ @@ -302,34 +307,81 @@ def calculate_roc_auc_data(self, data: Data, pairs, batch_size : int = 1) -> Li if(len(data.ground_truth) == 0): raise AttributeError("Cannot calculate AUC score, number of true duplicates is equal to 0.") - + _true_positives: int = 0 _normalized_auc: int = 0 _current_recall: int = 0 _new_recall: int = 0 + self._tps_found : int = 0 + self._true_positive_checked : dict = true_positive_checked self.num_of_true_duplicates = len(data.ground_truth) _recall_progress = [0] batches = batch_pairs(pairs, batch_size) - ideal_auc = self.calculate_ideal_auc(len(pairs), self.num_of_true_duplicates) - + # ideal_auc = self.calculate_ideal_auc(len(pairs), self.num_of_true_duplicates) + self._total_emissions : int = 0 for batch in batches: _current_batch_size : int = 0 for entity, candidate in batch: - tdf = data.ground_truth - entity = data._gt_to_ids_reversed_1[entity] if entity < data.dataset_limit else data._gt_to_ids_reversed_2[entity] - candidate = data._gt_to_ids_reversed_1[candidate] if candidate < data.dataset_limit else data._gt_to_ids_reversed_2[candidate] - if not tdf[((tdf.iloc[:, 0] == entity) & (tdf.iloc[:, 1] == candidate)) | ((tdf.iloc[:, 0] == candidate) & (tdf.iloc[:, 1] == entity))].empty: - _true_positives += 1 + if(self._all_tps_emitted()): break + entity_id = data._gt_to_ids_reversed_1[entity] if entity < data.dataset_limit else data._gt_to_ids_reversed_2[entity] + candidate_id = data._gt_to_ids_reversed_1[candidate] if candidate < data.dataset_limit else data._gt_to_ids_reversed_2[candidate] + _d1_entity, _d2_entity = (entity_id, candidate_id) if entity < data.dataset_limit else (candidate_id, entity_id) + + if _d2_entity in self.data.pairs_of[_d1_entity]: + self._update_true_positive_entry(entity_id, candidate_id) + _true_positives += 1 _current_batch_size += 1 - + self._total_emissions += 1 _new_recall = _true_positives / self.num_of_true_duplicates - _normalized_auc += ((_new_recall + _current_recall) / 2) * (_current_batch_size / self.num_of_true_duplicates) + # _normalized_auc += ((_new_recall + _current_recall) / 2) * (_current_batch_size / self.num_of_true_duplicates) _current_recall = _new_recall _recall_progress.append(_current_recall) + if(self._all_tps_emitted()): break + - _normalized_auc = 0 if(ideal_auc == 0) else _normalized_auc / ideal_auc + # _normalized_auc = 0 if(ideal_auc == 0) else _normalized_auc / ideal_auc + _normalized_auc = sum(_recall_progress) / (len(pairs) + 1.0) return _recall_progress, _normalized_auc + + def evaluate_auc_roc(self, matchers_data : List[Tuple], batch_size : int = 1, proportional : bool = True) -> None: + """For each matcher, takes its prediction data, calculates cumulative recall and auc, plots the corresponding ROC curve, populates prediction data with performance info + Args: + matchers_data List[Tuple[str, ProgressiveMatching]]: Progressive Matchers and their names + data (Data) : Data Module + batch_size (int, optional): Emitted pairs step at which cumulative recall is recalculated. Defaults to 1. + proportional (bool) : Proportional Visualization + Raises: + AttributeError: No Data object + AttributeError: No Ground Truth file + """ + + if self.data is None: + raise AttributeError("Can not proceed to AUC ROC evaluation without data object.") + + if self.data.ground_truth is None: + raise AttributeError("Can not proceed to AUC ROC evaluation without a ground-truth file. " + + "Data object has not been initialized with the ground-truth file") + + self._matchers_auc_roc_data = [] + + for matcher_data in matchers_data: + + matcher_name, progressive_matcher = matcher_data + matcher_prediction_data : PredictionData = PredictionData(matcher_name, progressive_matcher.pairs, progressive_matcher.true_pair_checked) + + matcher_predictions = matcher_prediction_data.get_predictions() + matcher_tps_checked = matcher_prediction_data.get_tps_checked() + + cumulative_recall, normalized_auc = self.calculate_roc_auc_data(self.data, matcher_predictions, batch_size, matcher_tps_checked) + + self._matchers_auc_roc_data.append((matcher_name, normalized_auc, cumulative_recall)) + matcher_prediction_data.set_total_emissions(self._total_emissions) + matcher_prediction_data.set_normalized_auc(normalized_auc) + matcher_prediction_data.set_cumulative_recall(cumulative_recall[-1]) + progressive_matcher.set_prediction_data(matcher_prediction_data) + + self.visualize_roc(methods_data = self._matchers_auc_roc_data, proportional = proportional) def write( prediction: any, diff --git a/src/pyjedai/joins.py b/src/pyjedai/joins.py index ebdc986..e739618 100644 --- a/src/pyjedai/joins.py +++ b/src/pyjedai/joins.py @@ -264,6 +264,9 @@ def evaluate(self, prediction=None, export_to_df: bool = False, export_to_dict, with_classification_report, verbose) + + def stats(self) -> None: + pass def stats(self) -> None: pass @@ -276,12 +279,12 @@ def _configuration(self) -> dict: "qgrams": self.qgrams } -class ΕJoin(AbstractJoin): +class EJoin(AbstractJoin): """ - Ε Join algorithm + E Join algorithm """ _method_name = "EJoin" - _method_info = " ΕJoin algorithm" + _method_info = " EJoin algorithm" _method_short_name = "EJ" def __init__( diff --git a/src/pyjedai/matching.py b/src/pyjedai/matching.py index 8a90997..73945b6 100644 --- a/src/pyjedai/matching.py +++ b/src/pyjedai/matching.py @@ -43,12 +43,15 @@ WhitespaceTokenizer from sklearn.feature_extraction.text import TfidfVectorizer # from scipy.spatial.distance import cosine -from sklearn.metrics.pairwise import cosine_similarity +from sklearn.metrics.pairwise import cosine_similarity, pairwise_distances from tqdm.autonotebook import tqdm +from sklearn.metrics import jaccard_score +from scipy.spatial.distance import dice, jaccard from .datamodel import Data, PYJEDAIFeature from .evaluation import Evaluation from .utils import WordQgrammsTokenizer +from whoosh.scoring import TF_IDF, Frequency, PL2, BM25F # Package import from https://anhaidgroup.github.io/py_stringmatching/v0.4.2/index.html @@ -82,7 +85,11 @@ def cosine(x, y): 'dice': Dice(), 'overlap_coefficient' : OverlapCoefficient(), 'token_sort': TokenSort(), - 'cosine_vector_similarity': cosine + 'cosine_vector_similarity': cosine, + 'TF-IDF' : TF_IDF(), + 'Frequency' : Frequency(), + 'PL2' : PL2(), + 'BM25F' : BM25F() } string_metrics = [ @@ -102,7 +109,11 @@ def cosine(x, y): 'cosine_vector_similarity' ] -available_metrics = string_metrics + set_metrics + bag_metrics + vector_metrics +index_metrics = [ + 'TF-IDF', 'Frequency', 'PL2', 'BM25F' +] + +available_metrics = string_metrics + set_metrics + bag_metrics + vector_metrics + index_metrics class EntityMatching(PYJEDAIFeature): @@ -120,6 +131,7 @@ def __init__( qgram: int = 2, # for jaccard tokenizer_return_set = False, # unique values or not, attributes: any = None, + tfidf_similarity_metric: str = 'cosine', delim_set: list = None, # DelimiterTokenizer padding: bool = True, # QgramTokenizer prefix_pad: str = '#', # QgramTokenizer (if padding=True) @@ -134,6 +146,7 @@ def __init__( self.vectors_d2 = None self.tokenizer = tokenizer self.execution_time = 0 + self.tfidf_similarity_metric = tfidf_similarity_metric # # Selecting tokenizer # @@ -198,10 +211,9 @@ def predict(self, self.vectors_d2 = vectors_d2 if self.metric in vector_metrics: - if(vectors_d2 is not None and vectors_d1 is None): + if(vectors_d1 is None): raise ValueError("Embeddings of the first dataset not given") - - if(vectors_d1 is not None): + else: self.vectors = vectors_d1 if(not data.is_dirty_er): if(vectors_d2 is None): @@ -214,9 +226,9 @@ def predict(self, self.pairs = Graph() all_blocks = list(blocks.values()) self._progress_bar = tqdm(total=len(blocks), - desc=self._method_name+" ("+self.metric+")", + desc=self._method_name+" ("+self.metric+ ", " + str(self.tokenizer) + ")", disable=self.tqdm_disable) - + if self.metric == 'tf-idf': self._calculate_tfidf() @@ -279,6 +291,49 @@ def _calculate_vector_similarity(self, entity_id1: int, entity_id2: int) -> floa self.vectors[entity_id2]) else: raise AttributeError("Please select one vector similarity metric from the given: " + ','.join(vector_metrics)) + + def _calculate_tfidf(self) -> None: + + analyzer = 'char' if self.tokenizer == 'char_qgram_tokenizer' else 'word' + vectorizer = TfidfVectorizer(analyzer='') if self.qgram is None else TfidfVectorizer(analyzer=analyzer, ngram_range=(self.qgram, self.qgram)) + + d1 = self.data.dataset_1[self.attributes] if self.attributes else self.data.dataset_1 + self._entities_d1 = d1 \ + .apply(" ".join, axis=1) \ + .apply(lambda x: x.lower()) \ + .values.tolist() + + d2 = self.data.dataset_2[self.attributes] if self.attributes and not self.data.is_dirty_er else self.data.dataset_2 + self._entities_d2 = d2 \ + .apply(" ".join, axis=1) \ + .apply(lambda x: x.lower()) \ + .values.tolist() if not self.data.is_dirty_er else None + + if self.data.is_dirty_er: + pass + else: + self.corpus = self._entities_d1 + self._entities_d2 + self.tfidf_vectorizer = vectorizer.fit(self.corpus) + + if self.tfidf_similarity_metric == 'cosine': + self.tfidf_matrix = vectorizer.transform(self.corpus) + self.tfidf_similarity_matrix = cosine_similarity(self.tfidf_matrix) + elif self.tfidf_similarity_metric == 'jaccard': + self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.corpus) + self.tfidf_similarity_matrix = 1 - pairwise_distances( self.tfidf_matrix.toarray(), metric="jaccard") + elif self.tfidf_similarity_metric == 'dice': + self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.corpus).toarray() + + def _calculate_tfidf_similarity(self, entity_id1: int, entity_id2: int) -> float: + + if self.tfidf_similarity_metric == 'cosine': + return self.tfidf_similarity_matrix[entity_id1][entity_id2] + elif self.tfidf_similarity_metric == 'jaccard': + return self.tfidf_similarity_matrix[entity_id1][entity_id2] + elif self.tfidf_similarity_metric == 'dice': + return 1-dice(self.tfidf_matrix[entity_id1], self.tfidf_matrix[entity_id2]) + else: + raise AttributeError("Please select one tf-idf similarity metric from the given: cosine, jaccard, dice") def _calculate_tfidf(self) -> None: @@ -302,14 +357,26 @@ def _calculate_tfidf(self) -> None: else: self.corpus = self._entities_d1 + self._entities_d2 self.tfidf_vectorizer = vectorizer.fit(self.corpus) - self.tfidf_matrix = vectorizer.transform(self.corpus) - self.tfidf_similarity_matrix = cosine_similarity(self.tfidf_matrix) - # feature_names = self.tfidf_vectorizer.get_feature_names() - # tfidf_df = pd.DataFrame(self.tfidf_matrix.toarray(), columns=feature_names) - # self.tfidf_df = tfidf_df + + if self.tfidf_similarity_metric == 'cosine': + self.tfidf_matrix = vectorizer.transform(self.corpus) + self.tfidf_similarity_matrix = cosine_similarity(self.tfidf_matrix) + elif self.tfidf_similarity_metric == 'jaccard': + self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.corpus) + self.tfidf_similarity_matrix = 1 - pairwise_distances( self.tfidf_matrix.toarray(), metric="jaccard") + elif self.tfidf_similarity_metric == 'dice': + self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.corpus).toarray() def _calculate_tfidf_similarity(self, entity_id1: int, entity_id2: int) -> float: - return self.tfidf_similarity_matrix[entity_id1][entity_id2] + + if self.tfidf_similarity_metric == 'cosine': + return self.tfidf_similarity_matrix[entity_id1][entity_id2] + elif self.tfidf_similarity_metric == 'jaccard': + return self.tfidf_similarity_matrix[entity_id1][entity_id2] + elif self.tfidf_similarity_metric == 'dice': + return 1-dice(self.tfidf_matrix[entity_id1], self.tfidf_matrix[entity_id2]) + else: + raise AttributeError("Please select one tf-idf similarity metric from the given: cosine, jaccard, dice") def _similarity(self, entity_id1: int, entity_id2: int) -> float: @@ -360,12 +427,20 @@ def report(self) -> None: ) def _configuration(self) -> dict: - return { + conf = { "Metric" : self.metric, "Attributes" : self.attributes, - "Similarity threshold" : self.similarity_threshold + "Similarity threshold" : self.similarity_threshold, + "Tokenizer" : self.tokenizer } + if self.metric == 'tf-idf': + conf["Similarity metric"] = self.tfidf_similarity_metric + if self.tokenizer == 'word_qgram_tokenizer' or self.tokenizer == 'char_qgram_tokenizer': + conf["QGram size"] = self.qgram + + return conf + def get_weights_avg(self) -> float: return sum([w for _, _, w in self.pairs.edges(data='weight')])/len(self.pairs.edges(data='weight')) @@ -516,13 +591,6 @@ def evaluate(self, export_to_dict, with_classification_report, verbose) - - def _configuration(self) -> dict: - return { - "Tokenizer" : self.tokenizer, - "Metric" : self.metric, - "Similarity Threshold" : self.similarity_threshold - } def stats(self) -> None: pass diff --git a/src/pyjedai/prioritization.py b/src/pyjedai/prioritization.py new file mode 100644 index 0000000..4ce6f62 --- /dev/null +++ b/src/pyjedai/prioritization.py @@ -0,0 +1,1028 @@ +"""Entity Matching Prioritization Module +""" +import numpy as np +from time import time +import matplotlib.pyplot as plt +from .matching import EntityMatching +from .comparison_cleaning import ( + ComparisonPropagation, + ProgressiveCardinalityEdgePruning, + ProgressiveCardinalityNodePruning, + GlobalProgressiveSortedNeighborhood, + LocalProgressiveSortedNeighborhood, + ProgressiveEntityScheduling) +from .vector_based_blocking import EmbeddingsNNBlockBuilding +from sklearn.metrics.pairwise import ( + cosine_similarity +) +from networkx import Graph +from py_stringmatching.similarity_measure.affine import Affine +from py_stringmatching.similarity_measure.bag_distance import BagDistance +from py_stringmatching.similarity_measure.cosine import Cosine +from py_stringmatching.similarity_measure.dice import Dice +from py_stringmatching.similarity_measure.editex import Editex +from py_stringmatching.similarity_measure.generalized_jaccard import \ + GeneralizedJaccard +from py_stringmatching.similarity_measure.hamming_distance import \ + HammingDistance +from py_stringmatching.similarity_measure.jaccard import Jaccard +from py_stringmatching.similarity_measure.jaro import Jaro +from py_stringmatching.similarity_measure.jaro_winkler import JaroWinkler +from py_stringmatching.similarity_measure.levenshtein import Levenshtein +from py_stringmatching.similarity_measure.monge_elkan import MongeElkan +from py_stringmatching.similarity_measure.needleman_wunsch import \ + NeedlemanWunsch +from py_stringmatching.similarity_measure.overlap_coefficient import \ + OverlapCoefficient +from py_stringmatching.similarity_measure.partial_ratio import PartialRatio +from py_stringmatching.similarity_measure.token_sort import TokenSort +from py_stringmatching.similarity_measure.partial_token_sort import \ + PartialTokenSort +from py_stringmatching.similarity_measure.ratio import Ratio +from py_stringmatching.similarity_measure.smith_waterman import SmithWaterman +from py_stringmatching.similarity_measure.soundex import Soundex +from py_stringmatching.similarity_measure.tfidf import TfIdf +from py_stringmatching.similarity_measure.tversky_index import TverskyIndex +from py_stringmatching.tokenizer.alphabetic_tokenizer import \ + AlphabeticTokenizer +from py_stringmatching.tokenizer.alphanumeric_tokenizer import \ + AlphanumericTokenizer +from py_stringmatching.tokenizer.delimiter_tokenizer import DelimiterTokenizer +from py_stringmatching.tokenizer.qgram_tokenizer import QgramTokenizer +from py_stringmatching.tokenizer.whitespace_tokenizer import \ + WhitespaceTokenizer +from tqdm.autonotebook import tqdm + +from .evaluation import Evaluation +from .datamodel import Data, PYJEDAIFeature +from .matching import EntityMatching +from .comparison_cleaning import AbstractMetablocking +from queue import PriorityQueue +from random import sample +from .utils import sorted_enumerate, canonical_swap +from abc import abstractmethod +from typing import Tuple, List +from .utils import SubsetIndexer, WhooshDataset, WhooshNeighborhood, is_infinite, PredictionData +import pandas as pd +import os +from whoosh.fields import TEXT, Schema, ID +from whoosh.index import create_in +from whoosh import qparser +from whoosh.scoring import TF_IDF, Frequency, PL2, BM25F + + +# Directory where the whoosh index is stored +INDEXER_DIR='.indexer' + +# Package import from https://anhaidgroup.github.io/py_stringmatching/v0.4.2/index.html + +available_tokenizers = [ + 'white_space_tokenizer', 'qgram_tokenizer', 'delimiter_tokenizer', + 'alphabetic_tokenizer', 'alphanumeric_tokenizer' +] + +metrics_mapping = { + 'levenshtein' : Levenshtein(), + 'edit_distance': Levenshtein(), + 'jaro_winkler' : JaroWinkler(), + 'bag_distance' : BagDistance(), + 'editex' : Editex(), + 'cosine' : Cosine(), + 'jaro' : Jaro(), + 'soundex' : Soundex(), + 'tfidf' : TfIdf(), + 'tversky_index':TverskyIndex(), + 'ratio' : Ratio(), + 'partial_token_sort' : PartialTokenSort(), + 'partial_ratio' : PartialRatio(), + 'hamming_distance' : HammingDistance(), + 'jaccard' : Jaccard(), + 'generalized_jaccard' : GeneralizedJaccard(), + 'dice': Dice(), + 'overlap_coefficient' : OverlapCoefficient(), + 'token_sort': TokenSort(), + 'cosine_vector_similarity': cosine_similarity, + 'TF-IDF' : TF_IDF(), + 'Frequency' : Frequency(), + 'PL2' : PL2(), + 'BM25F' : BM25F() +} + +whoosh_similarity_function = { + 'TF-IDF' : TF_IDF(), + 'Frequency' : Frequency(), + 'PL2' : PL2(), + 'BM25F' : BM25F() +} + +string_metrics = [ + 'bag_distance', 'editex', 'hamming_distance', 'jaro', 'jaro_winkler', 'levenshtein', + 'edit_distance', 'partial_ratio', 'partial_token_sort', 'ratio', 'soundex', 'token_sort' +] + +set_metrics = [ + 'cosine', 'dice', 'generalized_jaccard', 'jaccard', 'overlap_coefficient', 'tversky_index' +] + +bag_metrics = [ + 'tfidf' +] + +index_metrics = [ + 'TF-IDF', 'Frequency', 'PL2', 'BM25F' +] + +vector_metrics = [ + 'cosine_vector_similarity' +] + +available_metrics = string_metrics + set_metrics + bag_metrics + vector_metrics + index_metrics + +class ProgressiveMatching(EntityMatching): + """Applies the matching process to a subset of available pairs progressively + """ + + _method_name: str = "Progressive Matching" + _method_info: str = "Applies the matching process to a subset of available pairs progressively " + + def __init__( + self, + budget: int = 0, + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + self._budget : int = budget + + def predict(self, + blocks: dict, + data: Data, + comparison_cleaner: AbstractMetablocking = None, + tqdm_disable: bool = False, + method : str = 'HB', + emit_all_tps_stop : bool = False) -> Graph: + """Main method of progressive entity matching. Inputs a set of blocks and outputs a graph \ + that contains of the entity ids (nodes) and the similarity scores between them (edges). + Args: + blocks (dict): blocks of entities + data (Data): dataset module + tqdm_disable (bool, optional): Disables progress bar. Defaults to False. + method (str) : DFS/BFS/Hybrid approach for specified algorithm + emit_all_tps_stop (bool) : Stop emission once all true positives are found + Returns: + networkx.Graph: entity ids (nodes) and similarity scores between them (edges) + """ + start_time = time() + self.tqdm_disable = tqdm_disable + self._comparison_cleaner: AbstractMetablocking = comparison_cleaner + self._method = method + self._emit_all_tps_stop = emit_all_tps_stop + self.true_pair_checked = None + self._prediction_data : PredictionData = None + + if not blocks: + raise ValueError("Empty blocks structure") + self.data = data + self.pairs = Graph() + all_blocks = list(blocks.values()) + self._progress_bar = tqdm(total=len(blocks), + desc=self._method_name+" ("+self.metric+")", + disable=self.tqdm_disable) + if 'Block' in str(type(all_blocks[0])): + self._predict_raw_blocks(blocks) + elif isinstance(all_blocks[0], set): + if(self._comparison_cleaner == None): + raise AttributeError("No precalculated weights were given from the CC step") + self._predict_prunned_blocks(blocks) + else: + raise AttributeError("Wrong type of Blocks") + self.execution_time = time() - start_time + self._progress_bar.close() + + return self.pairs + + def evaluate(self, + prediction, + export_to_df: bool = False, + export_to_dict: bool = False, + with_classification_report: bool = False, + verbose: bool = True) -> any: + + if self.data is None: + raise AttributeError("Can not proceed to evaluation without data object.") + + if self.data.ground_truth is None: + raise AttributeError("Can not proceed to evaluation without a ground-truth file. " + + "Data object has not been initialized with the ground-truth file") + + eval_obj = Evaluation(self.data) + true_positives = 0 + total_matching_pairs = prediction.number_of_edges() + for _, (id1, id2) in self.data.ground_truth.iterrows(): + id1 = self.data._ids_mapping_1[id1] + id2 = self.data._ids_mapping_1[id2] if self.data.is_dirty_er \ + else self.data._ids_mapping_2[id2] + if (id1 in prediction and id2 in prediction[id1]) or \ + (id2 in prediction and id1 in prediction[id2]): + true_positives += 1 + + eval_obj.calculate_scores(true_positives=true_positives, + total_matching_pairs=total_matching_pairs) + return eval_obj.report(self.method_configuration(), + export_to_df, + export_to_dict, + with_classification_report, + verbose) + + def get_true_pair_checked(self): + if(self.true_pair_checked is None): + raise AttributeError("True positive pairs not defined in specified workflow.") + else: return self.true_pair_checked + + + @abstractmethod + def extract_tps_checked(self, **kwargs) -> dict: + """Constructs a dictionary of the form [true positive pair] -> emitted status, + containing all the true positive pairs that are emittable from the current subset of the dataset + + Returns: + dict: Dictionary that shows whether a TP pair (key) has been emitted (value) + """ + pass + + def get_prediction_data(self) -> PredictionData: + if(self._prediction_data is None): + raise ValueError("Pairs not emitted yet - No Data to show") + return self._prediction_data + + def get_total_emissions(self) -> int: + return self.get_prediction_data().get_total_emissions() + + def get_cumulative_recall(self) -> float: + return self.get_prediction_data().get_cumulative_recall() + + def get_normalized_auc(self) -> float: + return self.get_prediction_data().get_normalized_auc() + + def set_prediction_data(self, prediction_data : PredictionData): + self._prediction_data : PredictionData = prediction_data + + +class HashBasedProgressiveMatching(ProgressiveMatching): + """Applies hash based candidate graph prunning, sorts retained comparisons and applies Progressive Matching + """ + + _method_name: str = "Hash Based Progressive Matching" + _method_info: str = "Applies hash based candidate graph prunning, sorts retained comparisons and applies Progressive Matching" + + def __init__( + self, + budget: int = 0, + w_scheme: str = 'X2', + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + self._w_scheme : str = w_scheme + + def extract_tps_checked(self, **kwargs) -> dict: + _tps_checked = dict() + for entity, neighbors in self.blocks.items(): + for neighbor in neighbors: + entity_id = self.data._gt_to_ids_reversed_1[entity] if entity < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[entity] + neighbor_id = self.data._gt_to_ids_reversed_1[neighbor] if neighbor < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[neighbor] + _d1_entity, _d2_entity = (entity_id, neighbor_id) if entity < self.data.dataset_limit else (neighbor_id, entity_id) + + if _d2_entity in self.data.pairs_of[_d1_entity]: + _tps_checked[canonical_swap(_d1_entity, _d2_entity)] = False + return _tps_checked + + +class GlobalTopPM(HashBasedProgressiveMatching): + """Applies Progressive CEP, sorts retained comparisons and applies Progressive Matching + """ + + _method_name: str = "Global Top Progressive Matching" + _method_info: str = "Applies Progressive CEP, sorts retained comparisons and applies Progressive Matching" + + def __init__( + self, + budget: int = 0, + w_scheme: str = 'X2', + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, w_scheme, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + + def _predict_raw_blocks(self, blocks: dict) -> None: + pcep : ProgressiveCardinalityEdgePruning = ProgressiveCardinalityEdgePruning(self._w_scheme, self._budget) + candidates : dict = pcep.process(blocks=blocks, data=self.data, tqdm_disable=True, cc=None, emit_all_tps_stop=self._emit_all_tps_stop) + self.blocks = candidates + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked() + + for entity_id, candidate_ids in candidates.items(): + for candidate_id in candidate_ids: + self._insert_to_graph(entity_id, candidate_id, pcep.get_precalculated_weight(entity_id, candidate_id)) + + self.pairs.edges = sorted(self.pairs.edges(data=True), key=lambda x: x[2]['weight'], reverse=True) + return self.pairs.edges + + + def _predict_prunned_blocks(self, blocks: dict) -> None: + pcep : ProgressiveCardinalityEdgePruning = ProgressiveCardinalityEdgePruning(self._w_scheme, self._budget) + candidates : dict = pcep.process(blocks=blocks, data=self.data, tqdm_disable=True, cc=self._comparison_cleaner, emit_all_tps_stop=self._emit_all_tps_stop) + self.blocks = candidates + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked() + + for entity_id, candidate_ids in candidates.items(): + for candidate_id in candidate_ids: + self._insert_to_graph(entity_id, candidate_id, self._comparison_cleaner.get_precalculated_weight(entity_id, candidate_id)) + + self.pairs.edges = sorted(self.pairs.edges(data=True), key=lambda x: x[2]['weight'], reverse=True) + return self.pairs.edges + +class LocalTopPM(HashBasedProgressiveMatching): + """Applies Progressive CNP, sorts retained comparisons and applies Progressive Matching + """ + + _method_name: str = "Global Top Progressive Matching" + _method_info: str = "Applies Progressive CNP, sorts retained comparisons and applies Progressive Matching" + + def __init__( + self, + budget: int = 0, + w_scheme: str = 'X2', + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, w_scheme, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + + + def _predict_raw_blocks(self, blocks: dict) -> None: + pcnp : ProgressiveCardinalityNodePruning = ProgressiveCardinalityNodePruning(self._w_scheme, self._budget) + candidates : dict = pcnp.process(blocks=blocks, data=self.data, tqdm_disable=True, cc=None, emit_all_tps_stop=self._emit_all_tps_stop) + self.blocks = candidates + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked() + + for entity_id, candidate_ids in candidates.items(): + for candidate_id in candidate_ids: + self._insert_to_graph(entity_id, candidate_id, pcnp.get_precalculated_weight(entity_id, candidate_id)) + + self.pairs.edges = sorted(self.pairs.edges(data=True), key=lambda x: x[2]['weight'], reverse=True) + return self.pairs.edges + + def _predict_prunned_blocks(self, blocks: dict) -> None: + + pcnp : ProgressiveCardinalityNodePruning = ProgressiveCardinalityNodePruning(self._w_scheme, self._budget) + candidates : dict = pcnp.process(blocks=blocks, data=self.data, tqdm_disable=True, cc=self._comparison_cleaner, emit_all_tps_stop=self._emit_all_tps_stop) + self.blocks = candidates + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked() + + for entity_id, candidate_ids in candidates.items(): + for candidate_id in candidate_ids: + self._insert_to_graph(entity_id, candidate_id, self._comparison_cleaner.get_precalculated_weight(entity_id, candidate_id)) + + self.pairs.edges = sorted(self.pairs.edges(data=True), key=lambda x: x[2]['weight'], reverse=True) + return self.pairs.edges + + +class EmbeddingsNNBPM(ProgressiveMatching): + """Utilizes/Creates entity embeddings, constructs neighborhoods via NN Approach and applies Progressive Matching + """ + + _method_name: str = "Embeddings NN Blocking Based Progressive Matching" + _method_info: str = "Utilizes/Creates entity embeddings, constructs neighborhoods via NN Approach and applies Progressive Matching" + + def __init__( + self, + budget: int = 0, + vectorizer: str = 'bert', + similarity_search: str = 'faiss', + vector_size: int = 200, + num_of_clusters: int = 5, + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + self._vectorizer = vectorizer + self._similarity_search = similarity_search + self._vector_size = vector_size + self._num_of_clusters = num_of_clusters + + + def predict(self, + data: Data, + blocks: dict = None, + comparison_cleaner: AbstractMetablocking = None, + tqdm_disable: bool = False, + method : str = 'HB', + emit_all_tps_stop : bool = False) -> Graph: + """Main method of progressive entity matching. Inputs a set of blocks and outputs a graph \ + that contains of the entity ids (nodes) and the similarity scores between them (edges). + Args: + blocks (dict): blocks of entities + data (Data): dataset module + tqdm_disable (bool, optional): Disables progress bar. Defaults to False. + method (str) : DFS/BFS/Hybrid approach for specified algorithm + emit_all_tps_stop (bool) : Stop emission once all true positives are found + Returns: + networkx.Graph: entity ids (nodes) and similarity scores between them (edges) + """ + start_time = time() + self.tqdm_disable = tqdm_disable + self._comparison_cleaner: AbstractMetablocking = comparison_cleaner + self._method = method + self._emit_all_tps_stop = emit_all_tps_stop + self.true_pair_checked = None + self._prediction_data : PredictionData = None + self.data = data + self.pairs = Graph() + + if blocks is None: + # applying the process to the whole dataset + self._predict_raw_blocks(blocks) + else: + all_blocks = list(blocks.values()) + self._progress_bar = tqdm(total=len(blocks), + desc=self._method_name+" ("+self.metric+")", + disable=self.tqdm_disable) + if 'Block' in str(type(all_blocks[0])): + self._predict_raw_blocks(blocks) + elif isinstance(all_blocks[0], set): + if(self._comparison_cleaner == None): + raise AttributeError("No precalculated weights were given from the CC step") + self._predict_prunned_blocks(blocks) + else: + raise AttributeError("Wrong type of Blocks") + self._progress_bar.close() + + self.execution_time = time() - start_time + return self.pairs + + def _top_pair_emission(self) -> None: + """Applies global sorting to all entity pairs produced by NN, + and returns pairs based on distance in ascending order + """ + self.pairs = [] + n, k = self.neighbors.shape + + for i in range(n): + entity_id = self.ennbb._si.d1_retained_ids[i] if self.data.is_dirty_er else self.ennbb._si.d2_retained_ids[i] + for j in range(k): + if self.neighbors[i][j] != -1: + candidate_id = self.ennbb._si.d1_retained_ids[self.neighbors[i][j]] + self.pairs.append((entity_id, candidate_id, self.scores[i][j])) + + self.pairs = sorted(self.pairs, key=lambda x: x[2], reverse=True) + self.pairs = [(x[0], x[1]) for x in self.pairs] + + def _dfs_pair_emission(self) -> None: + """Sorts NN neighborhoods in ascending average distance from their query entity, + iterate over each neighborhoods' entities in ascending distance to query entity + """ + self.pairs = [] + + average_neighborhood_distances = np.mean(self.scores, axis=1) + sorted_neighborhoods = sorted_enumerate(average_neighborhood_distances) + + for sorted_neighborhood in sorted_neighborhoods: + neighbor_scores = self.scores[sorted_neighborhood] + neighbors = self.neighbors[sorted_neighborhood] + entity_id = self.ennbb._si.d1_retained_ids[sorted_neighborhood] \ + if self.data.is_dirty_er \ + else self.ennbb._si.d2_retained_ids[sorted_neighborhood] + + for neighbor_index, neighbor in enumerate(neighbors): + if(neighbor != -1): + neighbor_id = self.ennbb._si.d1_retained_ids[neighbor] + self.pairs.append((entity_id, neighbor_id, neighbor_scores[neighbor_index])) + + self.pairs = [(x[0], x[1]) for x in self.pairs] + + def _hb_pair_emission(self) -> None: + """Sorts NN neighborhoods in ascending average distance from their query entity, + emits the top entity for each neighborhood, then iterates over the sorte neighborhoods, + and emits the pairs in descending weight order + """ + self.pairs = [] + _first_emissions = [] + _remaining_emissions = [] + + average_neighborhood_distances = np.mean(self.scores, axis=1) + sorted_neighborhoods = sorted_enumerate(average_neighborhood_distances) + + for sorted_neighborhood in sorted_neighborhoods: + neighbor_scores = self.scores[sorted_neighborhood] + neighbors = self.neighbors[sorted_neighborhood] + entity_id = self.ennbb._si.d1_retained_ids[sorted_neighborhood] \ + if self.data.is_dirty_er \ + else self.ennbb._si.d2_retained_ids[sorted_neighborhood] + for neighbor_index, neighbor in enumerate(neighbors): + if(neighbor != -1): + neighbor_id = self.ennbb._si.d1_retained_ids[neighbor] + _current_emissions = _remaining_emissions if neighbor_index else _first_emissions + _current_emissions.append((entity_id, neighbor_id, neighbor_scores[neighbor_index])) + + self.pairs = [(x[0], x[1]) for x in _first_emissions] + [(x[0], x[1]) for x in _remaining_emissions] + + def _bfs_pair_emission(self) -> None: + """Sorts NN neighborhoods in ascending average distance from their query entity, + and iteratively emits the current top pair per neighborhood + """ + self.pairs = [] + average_neighborhood_distances = np.mean(self.scores, axis=1) + sorted_neighborhoods = sorted_enumerate(average_neighborhood_distances) + + _emissions_per_pair = self.neighbors.shape[1] + for current_emission_per_pair in range(_emissions_per_pair): + for sorted_neighborhood in sorted_neighborhoods: + neighbor = self.neighbors[sorted_neighborhood][current_emission_per_pair] + if(neighbor != -1): + neighbor_id = self.ennbb._si.d1_retained_ids[neighbor] + entity_id = self.ennbb._si.d1_retained_ids[sorted_neighborhood] \ + if self.data.is_dirty_er \ + else self.ennbb._si.d2_retained_ids[sorted_neighborhood] + self.pairs.append((entity_id, neighbor_id, self.scores[sorted_neighborhood][current_emission_per_pair])) + + self.pairs = [(x[0], x[1]) for x in self.pairs] + + def _produce_pairs(self): + """Calls pairs emission based on the requested approach + Raises: + AttributeError: Given emission technique hasn't been defined + """ + if(self._method == 'DFS'): + self._dfs_pair_emission() + elif(self._method == 'HB'): + self._hb_pair_emission() + elif(self._method == 'BFS'): + self._bfs_pair_emission() + elif(self._method == 'TOP'): + self._top_pair_emission() + else: + raise AttributeError(self._method + ' emission technique is undefined!') + + def _predict_raw_blocks(self, blocks: dict = None) -> None: + self.ennbb : EmbeddingsNNBlockBuilding = EmbeddingsNNBlockBuilding(self._vectorizer, self._similarity_search) + self.final_blocks = self.ennbb.build_blocks(data = self.data, + num_of_clusters = self._num_of_clusters, + top_k = int(max(1, int(self._budget / self.data.num_of_entities) + (self._budget % self.data.num_of_entities > 0))) + if not self._emit_all_tps_stop else self._budget, + return_vectors = False, + tqdm_disable = False, + save_embeddings = True, + load_embeddings_if_exist = True, + with_entity_matching = False, + input_cleaned_blocks = blocks) + + self.scores = self.ennbb.distances + self.neighbors = self.ennbb.neighbors + self.final_vectors = (self.ennbb.vectors_1, self.ennbb.vectors_2) + + self._produce_pairs() + if(self._emit_all_tps_stop): + self.true_pair_checked = self.extract_tps_checked() + return self.pairs + + def _predict_prunned_blocks(self, blocks: dict = None) -> None: + return self._predict_raw_blocks(blocks) + + def extract_tps_checked(self, **kwargs) -> dict: + _tps_checked = dict() + _neighbors = self.neighbors + + for row in range(_neighbors.shape[0]): + entity = self.ennbb._si.d1_retained_ids[row] \ + if self.data.is_dirty_er \ + else self.ennbb._si.d2_retained_ids[row] + entity_id = self.data._gt_to_ids_reversed_1[entity] if entity < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[entity] + for column in range(_neighbors.shape[1]): + if(_neighbors[row][column] != -1): + neighbor = self.ennbb._si.d1_retained_ids[_neighbors[row][column]] + neighbor_id = self.data._gt_to_ids_reversed_1[neighbor] if neighbor < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[neighbor] + _d1_entity, _d2_entity = (entity_id, neighbor_id) if entity < self.data.dataset_limit else (neighbor_id, entity_id) + if _d2_entity in self.data.pairs_of[_d1_entity]: + _tps_checked[canonical_swap(_d1_entity, _d2_entity)] = False + + return _tps_checked + +class SimilarityBasedProgressiveMatching(ProgressiveMatching): + """Applies similarity based candidate graph prunning, sorts retained comparisons and applies Progressive Matching + """ + + _method_name: str = "Similarity Based Progressive Matching" + _method_info: str = "Applies similarity based candidate graph prunning, sorts retained comparisons and applies Progressive Matching" + + def __init__( + self, + budget: int = 0, + pwScheme: str = 'ACF', + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + self._pwScheme : str = pwScheme + + def extract_tps_checked(self, **kwargs) -> dict: + pass + +class GlobalPSNM(SimilarityBasedProgressiveMatching): + """Applies Global Progressive Sorted Neighborhood Matching + """ + + _method_name: str = "Global Progressive Sorted Neighborhood Matching" + _method_info: str = "For each entity sorted accordingly to its block's tokens, " + \ + "evaluates its neighborhood pairs defined within shifting windows of incremental size" + \ + " and retains the globally best candidate pairs" + + def __init__( + self, + budget: int = 0, + pwScheme: str = 'ACF', + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, pwScheme, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + + def _predict_raw_blocks(self, blocks: dict): + gpsn : GlobalProgressiveSortedNeighborhood = GlobalProgressiveSortedNeighborhood(self._pwScheme, self._budget) + candidates : PriorityQueue = gpsn.process(blocks=blocks, data=self.data, tqdm_disable=True, emit_all_tps_stop=self._emit_all_tps_stop) + self.pairs = [] + while(not candidates.empty()): + _, entity_id, candidate_id = candidates.get() + self.pairs.append((entity_id, candidate_id)) + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked(entity=entity_id, neighbor=candidate_id) + + return self.pairs + + def _predict_prunned_blocks(self, blocks: dict): + raise NotImplementedError("Sorter Neighborhood Algorithms don't support prunned blocks") + + def extract_tps_checked(self, **kwargs) -> dict: + self.true_pair_checked = dict() if self.true_pair_checked is None else self.true_pair_checked + entity = kwargs['entity'] + neighbor = kwargs['neighbor'] + + entity_id = self.data._gt_to_ids_reversed_1[entity] if entity < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[entity] + neighbor_id = self.data._gt_to_ids_reversed_1[neighbor] if neighbor < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[neighbor] + _d1_entity, _d2_entity = (entity_id, neighbor_id) if entity < self.data.dataset_limit else (neighbor_id, entity_id) + if _d2_entity in self.data.pairs_of[_d1_entity]: + self.true_pair_checked[canonical_swap(_d1_entity, _d2_entity)] = False + + return self.true_pair_checked + +class LocalPSNM(SimilarityBasedProgressiveMatching): + """Applies Local Progressive Sorted Neighborhood Matching + """ + + _method_name: str = "Global Progressive Sorted Neighborhood Matching" + _method_info: str = "For each entity sorted accordingly to its block's tokens, " + \ + "evaluates its neighborhood pairs defined within shifting windows of incremental size" + \ + " and retains the globally best candidate pairs" + + def __init__( + self, + budget: int = 0, + pwScheme: str = 'ACF', + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, pwScheme, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + + def _predict_raw_blocks(self, blocks: dict): + lpsn : LocalProgressiveSortedNeighborhood = LocalProgressiveSortedNeighborhood(self._pwScheme, self._budget) + candidates : list = lpsn.process(blocks=blocks, data=self.data, tqdm_disable=True, emit_all_tps_stop=self._emit_all_tps_stop) + self.pairs = candidates + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked(candidates=candidates) + return self.pairs + + def _predict_prunned_blocks(self, blocks: dict): + raise NotImplementedError("Sorter Neighborhood Algorithms don't support prunned blocks " + \ + "(pre comparison-cleaning entities per block distribution required") + + def extract_tps_checked(self, **kwargs) -> dict: + _tps_checked = dict() + _candidates = kwargs['candidates'] + + for entity, neighbor in _candidates: + entity_id = self.data._gt_to_ids_reversed_1[entity] if entity < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[entity] + neighbor_id = self.data._gt_to_ids_reversed_1[neighbor] if neighbor < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[neighbor] + _d1_entity, _d2_entity = (entity_id, neighbor_id) if entity < self.data.dataset_limit else (neighbor_id, entity_id) + if _d2_entity in self.data.pairs_of[_d1_entity]: + _tps_checked[canonical_swap(_d1_entity, _d2_entity)] = False + return _tps_checked +class RandomPM(ProgressiveMatching): + """Picks a number of random comparisons equal to the available budget + """ + + _method_name: str = "Random Progressive Matching" + _method_info: str = "Picks a number of random comparisons equal to the available budget" + + def __init__( + self, + budget: int = 0, + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + + def _predict_raw_blocks(self, blocks: dict) -> None: + cp : ComparisonPropagation = ComparisonPropagation() + cleaned_blocks = cp.process(blocks=blocks, data=self.data, tqdm_disable=True) + self._predict_prunned_blocks(cleaned_blocks) + + def _predict_prunned_blocks(self, blocks: dict) -> None: + _all_pairs = [(id1, id2) for id1 in blocks for id2 in blocks[id1]] + _total_pairs = len(_all_pairs) + random_pairs = sample(_all_pairs, self._budget) if self._budget <= _total_pairs and not self._emit_all_tps_stop else _all_pairs + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked(candidates=random_pairs) + self.pairs.add_edges_from(random_pairs) + + def extract_tps_checked(self, **kwargs) -> dict: + _tps_checked = dict() + _candidates = kwargs['candidates'] + + for entity, neighbor in _candidates: + entity_id = self.data._gt_to_ids_reversed_1[entity] if entity < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[entity] + neighbor_id = self.data._gt_to_ids_reversed_1[neighbor] if neighbor < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[neighbor] + _d1_entity, _d2_entity = (entity_id, neighbor_id) if entity < self.data.dataset_limit else (neighbor_id, entity_id) + if _d2_entity in self.data.pairs_of[_d1_entity]: + _tps_checked[canonical_swap(_d1_entity, _d2_entity)] = False + return _tps_checked + +class PESM(HashBasedProgressiveMatching): + """Applies Progressive Entity Scheduling Matching + """ + + _method_name: str = "Progressive Entity Scheduling Matching" + _method_info: str = "Applies Progressive Entity Scheduling - Sorts entities in descending order of their average weight, " + \ + "emits the top pair per entity. Finally, traverses the sorted " + \ + "entities and emits their comparisons in descending weight order " + \ + "within specified budget." + def __init__( + self, + budget: int = 0, + w_scheme: str = 'X2', + metric: str = 'dice', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + + super().__init__(budget, w_scheme, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + + + def _predict_raw_blocks(self, blocks: dict) -> None: + + pes : ProgressiveEntityScheduling = ProgressiveEntityScheduling(self._w_scheme, self._budget) + pes.process(blocks=blocks, data=self.data, tqdm_disable=True, cc=None, method=self._method, emit_all_tps_stop=self._emit_all_tps_stop) + self.pairs = pes.produce_pairs() + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked(candidates=self.pairs) + + def _predict_prunned_blocks(self, blocks: dict): + return self._predict_raw_blocks(blocks) + # raise NotImplementedError("Sorter Neighborhood Algorithms doesn't support prunned blocks (lack of precalculated weights)") + + def extract_tps_checked(self, **kwargs) -> dict: + _tps_checked = dict() + _candidates = kwargs['candidates'] + + for entity, neighbor in _candidates: + entity_id = self.data._gt_to_ids_reversed_1[entity] if entity < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[entity] + neighbor_id = self.data._gt_to_ids_reversed_1[neighbor] if neighbor < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[neighbor] + _d1_entity, _d2_entity = (entity_id, neighbor_id) if entity < self.data.dataset_limit else (neighbor_id, entity_id) + if _d2_entity in self.data.pairs_of[_d1_entity]: + _tps_checked[canonical_swap(_d1_entity, _d2_entity)] = False + return _tps_checked + + +class WhooshPM(ProgressiveMatching): + """Applies progressive index based matching using whoosh library + """ + + _method_name: str = "Whoosh Progressive Matching" + _method_info: str = "Applies Whoosh Progressive Matching - Indexes the entities of the second dataset, " + \ + "stores their specified attributes, " + \ + "defines a query for each entity of the first dataset, " + \ + "and retrieves its pair candidates from the indexer within specified budget" + + def __init__( + self, + budget: int = 0, + metric: str = 'TF-IDF', + tokenizer: str = 'white_space_tokenizer', + similarity_threshold: float = 0.5, + qgram: int = 2, # for jaccard + tokenizer_return_set = True, # unique values or not + attributes: any = None, + delim_set: list = None, # DelimiterTokenizer + padding: bool = True, # QgramTokenizer + prefix_pad: str = '#', # QgramTokenizer (if padding=True) + suffix_pad: str = '$' # QgramTokenizer (if padding=True) + ) -> None: + # budget set to float('inf') implies unlimited budget + super().__init__(budget, metric, tokenizer, similarity_threshold, qgram, tokenizer_return_set, attributes, delim_set, padding, prefix_pad, suffix_pad) + + def _set_whoosh_datasets(self) -> None: + """Saves the rows of both datasets corresponding to the indices of the entities that have been retained after comparison cleaning + """ + + self._whoosh_d1 = self.data.dataset_1[self.attributes + [self.data.id_column_name_1]] if self.attributes else self.data.dataset_1 + self._whoosh_d1 = self._whoosh_d1[self._whoosh_d1[self.data.id_column_name_1].isin(self._whoosh_d1_retained_index)] + if(not self.data.is_dirty_er): + self._whoosh_d2 = self.data.dataset_2[self.attributes + [self.data.id_column_name_2]] if self.attributes else self.data.dataset_2 + self._whoosh_d2 = self._whoosh_d2[self._whoosh_d2[self.data.id_column_name_2].isin(self._whoosh_d2_retained_index)] + + + def _set_retained_entries(self) -> None: + """Saves the indices of entities of both datasets that have been retained after comparison cleaning + """ + self._whoosh_d1_retained_index = pd.Index([self.data._gt_to_ids_reversed_1[id] + for id in self._si.d1_retained_ids]) + + if(not self.data.is_dirty_er): + self._whoosh_d2_retained_index = pd.Index([self.data._gt_to_ids_reversed_2[id] + for id in self._si.d2_retained_ids]) + + + def _initialize_index_path(self): + """Creates index directory if non-existent, constructs the absolute path to the current whoosh index + """ + global INDEXER_DIR + INDEXER_DIR = os.path.abspath(INDEXER_DIR) + _d1_name = self.data.dataset_name_1 if self.data.dataset_name_1 is not None else 'd3' + self._index_path = os.path.join(INDEXER_DIR, _d1_name if self.data.is_dirty_er else (_d1_name + (self.data.dataset_name_2 if self.data.dataset_name_2 is not None else 'd4'))) + if not os.path.exists(self._index_path): + print('Created index directory at: ' + self._index_path) + os.makedirs(self._index_path, exist_ok=True) + + + def _create_index(self): + """Defines the schema [ID, CONTENT], creates the index in the defined path + and populates it with all the entities of the target dataset (first - Dirty ER, second - Clean ER) + """ + self._schema = Schema(ID=ID(stored=True), content=TEXT(stored=True)) + self._index = create_in(self._index_path, self._schema) + writer = self._index.writer() + + _target_dataset = self._whoosh_d1 if self.data.is_dirty_er else self._whoosh_d2 + _id_column_name = self.data.id_column_name_1 if self.data.is_dirty_er else self.data.id_column_name_2 + + for _, entity in _target_dataset.iterrows(): + entity_values = [str(entity[column]) for column in _target_dataset.columns if column != _id_column_name] + writer.add_document(ID=entity[_id_column_name], content=' '.join(entity_values)) + writer.commit() + + def _populate_whoosh_dataset(self) -> None: + """For each retained entity in the first dataset, construct a query with its text content, + parses it to the indexers, retrieves best candidates and stores them in entity's neighborhood. + Finally, neighborhoods are sorted in descending order of their average weight + """ + # None value for budget implies unlimited budget in whoosh + _query_budget = None if is_infinite(self._budget) else max(1, 2 * self._budget / len(self._whoosh_d1)) + + if(self.metric not in whoosh_similarity_function): + print(f'{self.metric} Similarity Function is Undefined') + self.metric = 'Frequency' + print(f'Applying {self.metric} Similarity Function') + _scorer = whoosh_similarity_function[self.metric] + + with self._index.searcher(weighting=_scorer) as searcher: + self._parser = qparser.QueryParser('content', schema=self._index.schema, group=qparser.OrGroup) + for _, entity in self._whoosh_d1.iterrows(): + entity_values = [str(entity[column]) for column in self._whoosh_d1.columns if column != self.data.id_column_name_1] + entity_string = ' '.join(entity_values) + entity_id = entity[self.data.id_column_name_1] + entity_query = self._parser.parse(entity_string) + query_results = searcher.search(entity_query, limit = _query_budget) + + for neighbor in query_results: + _score = neighbor.score + _neighbor_id = neighbor['ID'] + self._sorted_dataset._insert_entity_neighbor(entity=entity_id, neighbor=_neighbor_id, weight=_score) + + self._sorted_dataset._sort_neighborhoods_by_avg_weight() + + def _emit_pairs(self) -> None: + """Returns a list of candidate pairs that have been emitted following the requested method""" + self.pairs = self._sorted_dataset._emit_pairs(method=self._method, data=self.data) + + def _predict_raw_blocks(self, blocks: dict) -> None: + self._start_time = time() + self._si = SubsetIndexer(blocks=blocks, data=self.data, subset=False) + self._set_retained_entries() + self._set_whoosh_datasets() + self._initialize_index_path() + self._create_index() + self._to_emit_pairs : List[Tuple[int, int]] = [] + self._budget = float('inf') if self._emit_all_tps_stop else self._budget + self._sorted_dataset = WhooshDataset(list(self._whoosh_d1_retained_index), self._budget) + self._populate_whoosh_dataset() + self._emit_pairs() + self.execution_time = time() - self._start_time + if(self._emit_all_tps_stop): self.true_pair_checked = self.extract_tps_checked(candidates=self.pairs) + + def _predict_prunned_blocks(self, blocks: dict) -> None: + self._predict_raw_blocks(blocks) + + def extract_tps_checked(self, **kwargs) -> dict: + _tps_checked = dict() + _candidates = kwargs['candidates'] + + for entity, neighbor in _candidates: + entity_id = self.data._gt_to_ids_reversed_1[entity] if entity < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[entity] + neighbor_id = self.data._gt_to_ids_reversed_1[neighbor] if neighbor < self.data.dataset_limit else self.data._gt_to_ids_reversed_2[neighbor] + _d1_entity, _d2_entity = (entity_id, neighbor_id) if entity < self.data.dataset_limit else (neighbor_id, entity_id) + if _d2_entity in self.data.pairs_of[_d1_entity]: + _tps_checked[canonical_swap(_d1_entity, _d2_entity)] = False + return _tps_checked + + + diff --git a/src/pyjedai/utils.py b/src/pyjedai/utils.py index a997b39..24326e0 100644 --- a/src/pyjedai/utils.py +++ b/src/pyjedai/utils.py @@ -4,9 +4,16 @@ import numpy as np from nltk import ngrams from nltk.tokenize import word_tokenize - from pyjedai.datamodel import Block, Data - +from typing import List, Tuple +import random +from queue import PriorityQueue +import math +import sys +from time import time +from networkx import Graph +from ordered_set import OrderedSet +from math import floor # ----------------------- # # Constants # ----------------------- # @@ -24,12 +31,12 @@ def create_entity_index(blocks: dict, is_dirty_er: bool) -> dict: entity_index = {} for key, block in blocks.items(): for entity_id in block.entities_D1: - entity_index.setdefault(entity_id, set()) + entity_index.setdefault(entity_id, OrderedSet()) entity_index[entity_id].add(key) if not is_dirty_er: for entity_id in block.entities_D2: - entity_index.setdefault(entity_id, set()) + entity_index.setdefault(entity_id, OrderedSet()) entity_index[entity_id].add(key) return entity_index @@ -47,6 +54,17 @@ def are_matching(entity_index, id1, id2) -> bool: return len(entity_index[id1].intersection(entity_index[id2])) > 0 return entity_index[id1] == entity_index[id2] # Clusters case +def get_blocks_cardinality(blocks: dict, is_dirty_er: bool) -> int: + """Returns the cardinality of the blocks. + + Args: + blocks (dict): Blocks. + + Returns: + int: Cardinality. + """ + return sum([block.get_cardinality(is_dirty_er) for block in blocks.values()]) + def drop_big_blocks_by_size(blocks: dict, max_block_size: int, is_dirty_er: bool) -> dict: """Drops blocks if: - Contain only one entity @@ -146,6 +164,8 @@ def chi_square(in_array: np.array) -> float: sum_sq += ((in_array[r][c]-expected)**2)/expected return sum_sq +def java_math_round(value): + return int(value + 0.5) def batch_pairs(iterable, batch_size: int = 1): """ @@ -155,6 +175,41 @@ def batch_pairs(iterable, batch_size: int = 1): """ return (iterable[pos:pos + batch_size] for pos in range(0, len(iterable), batch_size)) +def get_sorted_blocks_shuffled_entities(dirty_er: bool, blocks: dict) -> List[int]: + """Sorts blocks in alphabetical order based on their token, shuffles the entities of each block, concatenates the result in a list + + Args: + blocks (Dict[Block]): Dictionary of type token -> Block Instance + + Returns: + List[Int]: List of shuffled entities of alphabetically, token sorted blocks + """ + sorted_entities = [] + for _, block in sorted(blocks.items()): + _shuffled_neighbors = list(block.entities_D1 | block.entities_D2 if not dirty_er else block.entities_D1) + random.shuffle(_shuffled_neighbors) + sorted_entities += _shuffled_neighbors + + return sorted_entities + +class Tokenizer(ABC): + + def __init__(self) -> None: + super().__init__() + + @abstractmethod + def tokenize(self, text: str) -> list: + pass + +class WordQgrammsTokenizer(Tokenizer): + + def __init__(self, q: int = 3) -> None: + super().__init__() + self.q = q + + def tokenize(self, text: str) -> list: + return [' '.join(gram) for gram in list(ngrams(word_tokenize(text), self.q))] + class Tokenizer(ABC): @@ -178,14 +233,10 @@ class SubsetIndexer(ABC): """Stores the indices of retained entities of the initial datasets, calculates and stores the mapping of element indices from new to old dataset (id in subset -> id in original) """ - - def __init__(self): - self.d1_retained_ids: list[int] = None - self.d2_retained_ids : list[int] = None - - def __init__(self, blocks: dict, data: Data): + def __init__(self, blocks: dict, data: Data, subset : bool): self.d1_retained_ids: list[int] = None self.d2_retained_ids : list[int] = None + self.subset : bool = subset self.store_retained_ids(blocks, data) def from_source_dataset(self, entity_id : int, data: Data) -> bool: @@ -199,7 +250,7 @@ def store_retained_ids(self, blocks: dict, data: Data) -> None: data (Data): Dataset Module """ - if(blocks is None): + if(not self.subset): self.d1_retained_ids = list(range(data.num_of_entities_1)) if(not data.is_dirty_er): @@ -227,5 +278,343 @@ def store_retained_ids(self, blocks: dict, data: Data) -> None: self.d1_retained_ids = sorted(list(_d1_retained_ids_set)) self.d2_retained_ids = sorted(list(_d2_retained_ids_set)) + +class PositionIndex(ABC): + """For each entity identifier stores a list of index it appears in, within the list of shuffled entities of sorted blocks + + Args: + ABC (ABC): ABC Module + """ + + def __init__(self, num_of_entities: int, sorted_entities: List[int]) -> None: + self._num_of_entities = num_of_entities + self._counters = self._num_of_entities * [0] + self._entity_positions = [[] for _ in range(self._num_of_entities)] + + for entity in sorted_entities: + self._counters[entity]+=1 + + for i in range(self._num_of_entities): + self._entity_positions[i] = [0] * self._counters[i] + self._counters[i] = 0 + + for index, entity in enumerate(sorted_entities): + self._entity_positions[entity][self._counters[entity]] = index + self._counters[entity] += 1 + + def get_positions(self, entity: int): + return self._entity_positions[entity] + +class WhooshNeighborhood(ABC): + """Stores information about the neighborhood of a given entity ID: + - ID : The identifier of the entity as it is defined within the original dataframe + - Total Weight : The total weight of entity's neighbors + - Number of Neighbors : The total number of Neighbors + - Neighbors : Entity's neighbors sorted in descending order of weight + - Stage : Insert / Pop stage (entities stored in ascending / descending weight order) + + Args: + ABC (ABC): ABC Module + """ + + def __init__(self, id : int, budget : float) -> None: + self._id : int = id + self._budget : float = budget + self._neighbors : PriorityQueue = PriorityQueue(self._budget) if not is_infinite(self._budget) else PriorityQueue() + self._insert_stage : bool = True + self._minimum_weight : float = sys.float_info.min + self._neighbors_num : int = 0 + self._total_weight : float = 0.0 + self._average_weight : float = None + + def _insert(self, neighbor_id: int, weight : float) -> None: + if(not self._insert_stage): self._change_state() + + if weight >= self._minimum_weight: + self._neighbors.put((weight, neighbor_id)) + if self._neighbors.qsize() > self._budget: + self._minimum_weight = self._neighbors.get()[0] + + self._update_neighbors_counter_by(1) + self._update_total_weight_by(weight) + + def _pop(self) -> None: + if(self._insert_stage): self._change_state() + + if(self._empty()): + raise ValueError("No neighbors to pop!") + + _weight, _neighbor_id = self._neighbors.get() + return -_weight, _neighbor_id + + def _empty(self) -> bool: + return self._neighbors.empty() + + def _change_state(self) -> None: + "Neighborhood can either be accepting or emitting neighbors" + \ + "Accepting Stage - Neighbors stored in ascending weight order" + \ + "Emitting Stage - Neighbors stored in descending weight order" + _neighbors_resorted : PriorityQueue = PriorityQueue(int(self._budget)) if not is_infinite(self._budget) else PriorityQueue() + while(not self._neighbors.empty()): + _weight, _neighbor_id = self._neighbors.get() + _neighbors_resorted.put((-_weight, _neighbor_id)) + + self._neighbors = _neighbors_resorted + self._insert_stage = not self._insert_stage + + def _update_total_weight_by(self, weight) -> None: + self._total_weight = self._total_weight + weight + + def _update_neighbors_counter_by(self, count) -> None: + self._neighbors_num = self._neighbors_num + count + + def _get_neighbors_num(self) -> int: + return self._neighbors_num + + def _get_total_weight(self) -> float: + return self._total_weight + + def _get_average_weight(self) -> float: + if(self._average_weight is None): + self._average_weight = 0.0 if not self._get_neighbors_num() else (float(self._get_total_weight()) / float(self._get_neighbors_num())) + return self._average_weight + else: + return self._average_weight + + def __eq__(self, other): + if isinstance(other, WhooshNeighborhood): + return self._get_average_weight() == other._get_average_weight() + return NotImplemented + + def __lt__(self, other): + if isinstance(other, WhooshNeighborhood): + return self._get_average_weight() < other._get_average_weight() + return NotImplemented + + def __gt__(self, other): + if isinstance(other, WhooshNeighborhood): + return self._get_average_weight() > other._get_average_weight() + return NotImplemented + + def __le__(self, other): + if isinstance(other, WhooshNeighborhood): + return self._get_average_weight() <= other._get_average_weight() + return NotImplemented + + def __ge__(self, other): + if isinstance(other, WhooshNeighborhood): + return self._get_average_weight() >= other._get_average_weight() + return NotImplemented + +class WhooshDataset(ABC): + """Stores a dictionary [Entity -> Entity's Neighborhood Data (Whoosh Neighborhood)] + Supplies auxiliarry functions for information retrieval from the sorted dataset + + Args: + ABC (ABC): ABC Module + """ + + def __init__(self, entity_ids : List[int], budget : float) -> None: + self._budget : float = budget + self._total_entities : int = len(entity_ids) + self._entity_budget : float = budget if is_infinite(self._budget) else max(1, 2 * self._budget / self._total_entities) + self._neighborhoods : dict = {} + for entity_id in entity_ids: + self._neighborhoods[entity_id] = WhooshNeighborhood(id=entity_id, budget=self._entity_budget) + # used in defining proper emission strategy + self._sorted_entities : List[int] = None + self._current_neighborhood_index : int = 0 + self._current_entity : int = None + self._current_neighborhood : WhooshNeighborhood = None + + def _insert_entity_neighbor(self, entity : int, neighbor : int, weight : float) -> None: + self._neighborhoods[entity]._insert(neighbor, weight) + + def _pop_entity_neighbor(self, entity : int) -> Tuple[float, int]: + return self._neighborhoods[entity]._pop() + + def _get_entity_neighborhood(self, entity : int) -> WhooshNeighborhood: + return self._neighborhoods[entity] + + def _entity_has_neighbors(self, entity : int) -> bool: + return not self._neighborhoods[entity]._empty() + + def _sort_neighborhoods_by_avg_weight(self) -> None: + """Store a list of entity ids sorted in descending order of the average weight of their corresponding neighborhood""" + self._sorted_entities : List = sorted(self._neighborhoods, key=lambda entity: self._neighborhoods[entity]._get_average_weight(), reverse=True) + + def _get_current_neighborhood(self) -> WhooshNeighborhood: + return self._neighborhoods[self._current_entity] + + def _enter_next_neighborhood(self) -> None: + """Sets the next in descending average weight order neighborhood + """ + _curr_nei_idx : int = self._current_neighborhood_index + self._current_neighborhood_index = _curr_nei_idx + 1 if _curr_nei_idx + 1 < self._total_entities else 0 + self._current_entity = self._sorted_entities[self._current_neighborhood_index] + self._current_neighborhood = self._neighborhoods[self._current_entity] + + def _successful_emission(self, pair : Tuple[int, int]) -> bool: + + _entity, _neighbor = pair + _entity_id = self._data._ids_mapping_1[_entity] + _neighbor_id = self._data._ids_mapping_1[_neighbor] if self._data.is_dirty_er else self._data._ids_mapping_2[_neighbor] + + if(self._emitted_comparisons < self._budget): + self._emitted_pairs.append((_entity_id, _neighbor_id)) + self._emitted_comparisons += 1 + return True + else: + return False + + def _emit_pairs(self, method : str, data : Data) -> List[Tuple[int, int]]: + """Emits candidate pairs according to specified method + + Args: + method (str): Emission Method + data (Data): Dataset Module + + Returns: + List[Tuple[int, int]]: List of candidate pairs + """ + + self._method : str = method + self._data : Data = data + + self._emitted_pairs = [] + self._emitted_comparisons = 0 + + if(self._method == 'HB'): + for sorted_entity in self._sorted_entities: + if(self._entity_has_neighbors(sorted_entity)): + _, neighbor = self._pop_entity_neighbor(sorted_entity) + if(not self._successful_emission(pair=(sorted_entity, neighbor))): + return self._emitted_pairs + + if(self._method == 'HB' or self._method == 'DFS'): + _checked_entity = np.zeros(self._total_entities, dtype=bool) + _sorted_entity_to_index = dict(zip(self._sorted_entities, range(0, self._total_entities))) + + for index, sorted_entity in enumerate(self._sorted_entities): + _checked_entity[index] = True + while(self._entity_has_neighbors(sorted_entity)): + _, neighbor = self._pop_entity_neighbor(sorted_entity) + if(neighbor not in _sorted_entity_to_index or _checked_entity[_sorted_entity_to_index[neighbor]]): + if(not self._successful_emission(pair=(sorted_entity, neighbor))): + return self._emitted_pairs + else: + _emissions_left = True + _checked_entities = set() + while(_emissions_left): + _emissions_left = False + for sorted_entity in self._sorted_entities: + if(self._entity_has_neighbors(sorted_entity)): + _, neighbor = self._pop_entity_neighbor(sorted_entity) + if(canonical_swap(sorted_entity, neighbor) not in _checked_entities): + if(not self._successful_emission(pair=(sorted_entity, neighbor))): + return self._emitted_pairs + _checked_entities.add(canonical_swap(sorted_entity, neighbor)) + _emissions_left = True + return self._emitted_pairs + +class PredictionData(ABC): + """Auxiliarry module used to store basic information about the to-emit, predicted pairs + It is used to retrieve that data efficiently during the evaluation phase, and subsequent storage of emission data (e.x. total emissions) + + Args: + ABC (ABC): ABC Module + """ + def __init__(self, name : str, predictions, tps_checked = dict) -> None: + self.set_name(name) + self.set_tps_checked(tps_checked) + self.set_predictions(self._format_predictions(predictions)) + # Pairs have not been emitted yet - Data Module has not been populated with performance data + self.set_total_emissions(None) + self.set_normalized_auc(None) + self.set_cumulative_recall(None) + + def _format_predictions(self, predictions) -> List[Tuple[int, int]]: + """Transforms given predictions into a list of duplets (candidate pairs) + Currently Graph and Default input is supported + + Args: + predictions (Graph / List[Tuple[int, int]]): Progressive Matcher predictions + + Returns: + List[Tuple[int, int]]: Formatted Predictions + """ + return [edge[:2] for edge in predictions.edges] if isinstance(predictions, Graph) else predictions + + def get_name(self) -> str: + return self._name + + def get_predictions(self) -> List[Tuple[int, int]]: + return self._predictions + + def get_tps_checked(self) -> dict: + return self._tps_checked + + def get_total_emissions(self) -> int: + if(self._total_emissions is None): raise ValueError("Pairs not emitted yet - Total Emissions are undefined") + return self._total_emissions + + def get_normalized_auc(self) -> float: + if(self._normalized_auc is None): raise ValueError("Pairs not emitted yet - Normalized AUC is undefined") + return self._normalized_auc + + def get_cumulative_recall(self) -> float: + if(self._cumulative_recall is None): raise ValueError("Pairs not emitted yet - Cumulative Recall is undefined") + return self._cumulative_recall + + def set_name(self, name : str): + self._name : str = name + + def set_predictions(self, predictions : List[Tuple[int, int]]) -> None: + self._predictions : List[Tuple[int, int]] = predictions + + def set_tps_checked(self, tps_checked : dict) -> None: + self._tps_checked : dict = tps_checked + + def set_total_emissions(self, total_emissions : int) -> None: + self._total_emissions : int = total_emissions + + def set_normalized_auc(self, normalized_auc : float) -> None: + self._normalized_auc : float = normalized_auc + + def set_cumulative_recall(self, cumulative_recall : float) -> None: + self._cumulative_recall : float = cumulative_recall + + +def canonical_swap(id1: int, id2: int) -> Tuple[int, int]: + """Returns the identifiers in canonical order + + Args: + id1 (int): ID1 + id2 (int): ID2 + + Returns: + Tuple[int, int]: IDs tuple in canonical order (ID1 < ID2) + """ + if id2 > id1: + return id1, id2 + else: + return id2, id1 + +def sorted_enumerate(seq, reverse=True): + return [i for (v, i) in sorted(((v, i) for (i, v) in enumerate(seq)), reverse=reverse)] + + +def is_infinite(value : float): + return math.isinf(value) and value > 0 + + + + + + + + + + \ No newline at end of file diff --git a/src/pyjedai/vector_based_blocking.py b/src/pyjedai/vector_based_blocking.py index 118af2e..6cee5e2 100644 --- a/src/pyjedai/vector_based_blocking.py +++ b/src/pyjedai/vector_based_blocking.py @@ -24,6 +24,7 @@ XLNetTokenizer) transformers.logging.set_verbosity_error() +from faiss import normalize_L2 from .datamodel import Data, PYJEDAIFeature from .evaluation import Evaluation @@ -34,7 +35,7 @@ os.makedirs(EMBEDDINGS_DIR) EMBEDDINGS_DIR = os.path.abspath(EMBEDDINGS_DIR) print('Created embeddings directory at: ' + EMBEDDINGS_DIR) - + LINUX_ENV=False # try: # if 'linux' in sys.platform: @@ -103,7 +104,9 @@ def build_blocks(self, tqdm_disable: bool = False, save_embeddings: bool = True, load_embeddings_if_exist: bool = False, - with_entity_matching: bool = False + with_entity_matching: bool = False, + input_cleaned_blocks: dict = None, + similarity_distance: str = 'cosine' ) -> any: """Main method of the vector based approach. Contains two steps. First an embedding method. \ And afterwards a similarity search upon the vectors created in the previous step. @@ -132,82 +135,113 @@ def build_blocks(self, dict: Entity ids to sets of top-K candidate ids. OR Tuple(np.array, np.array): vectors from d1 and vectors from d2 """ + print('Building blocks via Embeddings-NN Block Building [' + self.vectorizer + ', ' + self.similarity_search + ']') _start_time = time() self.blocks = dict() self.with_entity_matching = with_entity_matching self.save_embeddings, self.load_embeddings_if_exist = save_embeddings, load_embeddings_if_exist self.max_word_embeddings_size = max_word_embeddings_size - self.data, self.attributes_1, self.attributes_2, self.vector_size, self.num_of_clusters, self.top_k \ - = data, attributes_1, attributes_2, vector_size, num_of_clusters, top_k + self.simiarity_distance = similarity_distance + self.data, self.attributes_1, self.attributes_2, self.vector_size, self.num_of_clusters, self.top_k, self.input_cleaned_blocks \ + = data, attributes_1, attributes_2, vector_size, num_of_clusters, top_k, input_cleaned_blocks self._progress_bar = tqdm(total=data.num_of_entities, desc=(self._method_name + ' [' + self.vectorizer + ', ' + self.similarity_search + ']'), disable=tqdm_disable) - self._si = SubsetIndexer(None, self.data) + + if(input_cleaned_blocks == None): + self._applied_to_subset = False + else: + _all_blocks = list(input_cleaned_blocks.values()) + if 'Block' in str(type(_all_blocks[0])): + self._applied_to_subset = False + elif isinstance(_all_blocks[0], set): + self._applied_to_subset = True + else: + raise AttributeError("Wrong type of blocks given") + + self._si = SubsetIndexer(self.input_cleaned_blocks, self.data, self._applied_to_subset) + self._d1_valid_indices: list[int] = self._si.d1_retained_ids + self._d2_valid_indices: list[int] = [x - self.data.dataset_limit for x in self._si.d2_retained_ids] # print(data.attributes_1, data.attributes_2) + print(attributes_1 if attributes_1 else data.attributes_1) self._entities_d1 = data.dataset_1[attributes_1 if attributes_1 else data.attributes_1] \ .apply(" ".join, axis=1) \ .apply(self._tokenize_entity) \ .values.tolist() - + self._entities_d1 = [self._entities_d1[x] for x in self._d1_valid_indices] self._entities_d2 = data.dataset_2[attributes_2 if attributes_2 else data.attributes_2] \ .apply(" ".join, axis=1) \ .apply(self._tokenize_entity) \ .values.tolist() if not data.is_dirty_er else None + self._entities_d2 = [self._entities_d2[x] for x in self._d2_valid_indices] if not data.is_dirty_er else None - vectors_1 = [] + self.vectors_1 = None + self.vectors_2 = None self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("Device selected: ", self.device) - + if self.with_entity_matching: self.graph = nx.Graph() + self._d1_loaded : bool = False + self._d2_loaded : bool = False if load_embeddings_if_exist: - try: print("Loading embeddings from file...") - p1 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + self.data.dataset_name_1 \ - if self.data.dataset_name_1 is not None else "d1" +'_1.npy') + p1 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + (self.data.dataset_name_1 \ + if self.data.dataset_name_1 is not None else "d1") +'.npy') print("Loading file: ", p1) - self.vectors_1 = vectors_1 = np.load(p1) - self._progress_bar.update(data.num_of_entities_1) - - p2 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + self.data.dataset_name_2 \ - if self.data.dataset_name_2 is not None else "d2" +'_2.npy') - print("Loading file: ", p2) - self.vectors_2 = vectors_2 = np.load(p2) - self._progress_bar.update(data.num_of_entities_2) + if os.path.exists(p1): + self.vectors_1 = vectors_1 = np.load(p1) + self.vectors_1 = vectors_1 = vectors_1[self._d1_valid_indices] + self._progress_bar.update(data.num_of_entities_1) + self._d1_loaded = True + else: + print("Embeddings not found. Creating new ones.") + p2 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + (self.data.dataset_name_2 \ + if self.data.dataset_name_2 is not None else "d2") +'.npy') + print("Loading file: ", p2) + if os.path.exists(p2): + self.vectors_2 = vectors_2 = np.load(p2) + self.vectors_2 = vectors_2 = vectors_2[self._d2_valid_indices] + self._progress_bar.update(data.num_of_entities_2) + self._d2_loaded = True + else: + print("Embeddings not found. Creating new ones.") print("Loading embeddings from file finished") - except: - print("Embeddings not found. Creating new ones.") - raise ValueError("Embeddings not found.") - else: + if not self._d1_loaded or not self._d2_loaded: if self.vectorizer in ['word2vec', 'fasttext', 'doc2vec', 'glove']: - vectors_1, vectors_2 = self._create_gensim_embeddings() + self.vectors_1, self.vectors_2 = self._create_gensim_embeddings() elif self.vectorizer in ['bert', 'distilbert', 'roberta', 'xlnet', 'albert']: - vectors_1, vectors_2 = self._create_pretrained_word_embeddings() + self.vectors_1, self.vectors_2 = self._create_pretrained_word_embeddings() elif self.vectorizer in ['smpnet', 'st5', 'sent_glove', 'sdistilroberta', 'sminilm']: - vectors_1, vectors_2 = self._create_pretrained_sentence_embeddings() + self.vectors_1, self.vectors_2 = self._create_pretrained_sentence_embeddings() else: raise AttributeError("Not available vectorizer") - + if save_embeddings: print("Saving embeddings...") - p1 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + self.data.dataset_name_1 \ - if self.data.dataset_name_1 is not None else "d1" +'_1.npy') - print("Saving file: ", p1) - np.save(p1, self.vectors_1) - - p2 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + self.data.dataset_name_2 \ - if self.data.dataset_name_2 is not None else "d2" +'_2.npy') - print("Saving file: ", p2) - np.save(p2, self.vectors_2) + if self._applied_to_subset: + print("Cannot save embeddings, subset embeddings storing not supported.") + else: + if not self._d1_loaded: + p1 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + (self.data.dataset_name_1 \ + if self.data.dataset_name_1 is not None else "d1") +'.npy') + print("Saving file: ", p1) + np.save(p1, self.vectors_1) + + if not self._d2_loaded: + p2 = os.path.join(EMBEDDINGS_DIR, self.vectorizer + '_' + (self.data.dataset_name_2 \ + if self.data.dataset_name_2 is not None else "d2") +'.npy') + print("Saving file: ", p2) + np.save(p2, self.vectors_2) if return_vectors: - return (vectors_1, _) if data.is_dirty_er else (vectors_1, vectors_2) + return (self.vectors_1, _) if data.is_dirty_er else (self.vectors_1, self.vectors_2) if self.similarity_search == 'faiss': self._faiss_metric_type = faiss.METRIC_L2 @@ -220,11 +254,11 @@ def build_blocks(self, raise AttributeError("Not available method") self._progress_bar.close() self.execution_time = time() - _start_time - + if self.with_entity_matching: return self.blocks, self.graph - - return self.blocks + else: + return self.blocks def _create_gensim_embeddings(self) -> Tuple[np.array, np.array]: """Embeddings with Gensim. More on https://github.com/RaRe-Technologies/gensim-data @@ -238,17 +272,19 @@ def _create_gensim_embeddings(self) -> Tuple[np.array, np.array]: """ vectors_1 = [] vocabulary = api.load(self._gensim_mapping_download[self.vectorizer]) - for e1 in self._entities_d1: - vectors_1.append(self._create_vector(e1, vocabulary)) - self._progress_bar.update(1) - self.vectors_1 = np.array(vectors_1).astype('float32') + + if not self._d1_loaded: + for e1 in self._entities_d1: + vectors_1.append(self._create_vector(e1, vocabulary)) + self._progress_bar.update(1) + vectors_1 = np.vstack(vectors_1).astype('float32') vectors_2 = [] - if not self.data.is_dirty_er: + if not self.data.is_dirty_er and not self._d2_loaded: for e2 in self._entities_d2: vectors_2.append(self._create_vector(e2, vocabulary)) self._progress_bar.update(1) - self.vectors_2 = np.array(vectors_2).astype('float32') + vectors_2 = np.vstack(vectors_2).astype('float32') return vectors_1, vectors_2 @@ -272,15 +308,14 @@ def _create_pretrained_word_embeddings(self) -> Tuple[np.array, np.array]: model = model.to(self.device) self.vectors_1 = self._transform_entities_to_word_embeddings(self._entities_d1, model, - tokenizer) + tokenizer) if not self._d1_loaded else self.vectors_1 self.vector_size = self.vectors_1[0].shape[0] print("Vector size: ", self.vectors_1.shape) self.vectors_2 = self._transform_entities_to_word_embeddings(self._entities_d2, model, - tokenizer) if not self.data.is_dirty_er else None + tokenizer) if not self.data.is_dirty_er and not self._d2_loaded else self.vectors_2 return self.vectors_1, self.vectors_2 - def _transform_entities_to_word_embeddings(self, entities, model, tokenizer) -> np.array: model = model.to(self.device) @@ -312,60 +347,78 @@ def _create_pretrained_sentence_embeddings(self): model = SentenceTransformer(self._sentence_transformer_mapping[self.vectorizer], device=self.device) vectors_1 = [] - for e1 in self._entities_d1: - vector = model.encode(e1) - vectors_1.append(vector) - self._progress_bar.update(1) - self.vector_size = len(vectors_1[0]) - self.vectors_1 = np.array(vectors_1).astype('float32') + if not self._d1_loaded: + for e1 in self._entities_d1: + vector = model.encode(e1) + vectors_1.append(vector) + self._progress_bar.update(1) + self.vector_size = len(vectors_1[0]) + vectors_1 = np.vstack(vectors_1).astype('float32') vectors_2 = [] - if not self.data.is_dirty_er: + if not self.data.is_dirty_er and not self._d2_loaded: for e2 in self._entities_d2: + # print("e2: ", e2) vector = model.encode(e2) vectors_2.append(vector) self._progress_bar.update(1) self.vector_size = len(vectors_2[0]) - self.vectors_2 = np.array(vectors_2).astype('float32') - - return vectors_1, vectors_2 + vectors_2 = np.vstack(vectors_2).astype('float32') + + return vectors_1, vectors_2 def _similarity_search_with_FAISS(self): index = faiss.IndexFlatL2(self.vectors_1.shape[1]) - index.metric_type = faiss.METRIC_INNER_PRODUCT + + if self.simiarity_distance == 'cosine' or self.simiarity_distance == 'cosine_without_normalization': + index.metric_type = faiss.METRIC_INNER_PRODUCT + elif self.simiarity_distance == 'euclidean': + index.metric_type = faiss.METRIC_L2 + else: + raise ValueError("Invalid similarity distance: ", self.simiarity_distance) + + if self.simiarity_distance == 'cosine': + faiss.normalize_L2(self.vectors_1) + faiss.normalize_L2(self.vectors_2) + index.train(self.vectors_1) # train on the vectors of dataset 1 + + if self.simiarity_distance == 'cosine': + faiss.normalize_L2(self.vectors_1) + faiss.normalize_L2(self.vectors_2) + index.add(self.vectors_1) # add the vectors and update the index + if self.simiarity_distance == 'cosine': + faiss.normalize_L2(self.vectors_1) + faiss.normalize_L2(self.vectors_2) + self.distances, self.neighbors = index.search(self.vectors_1 if self.data.is_dirty_er else self.vectors_2, - self.top_k) + self.top_k) self.blocks = dict() - for i in range(0, self.neighbors.shape[0]): - if self.data.is_dirty_er: - entity_id_d2 = i - else: - entity_id_d2 = i + self.data.dataset_limit + for _entity in range(0, self.neighbors.shape[0]): - if entity_id_d2 not in self.blocks.keys(): - self.blocks[entity_id_d2] = set() + _entity_id = self._si.d1_retained_ids[_entity] if self.data.is_dirty_er else self._si.d2_retained_ids[_entity] - j = 0 - for entity_id_d1 in self.neighbors[i]: + if _entity_id not in self.blocks: + self.blocks[_entity_id] = set() + + for _neighbor_index, _neighbor in enumerate(self.neighbors[_entity]): - if entity_id_d1 == -1: + if _neighbor == -1: continue - if entity_id_d1 not in self.blocks.keys(): - self.blocks[entity_id_d1] = set() + _neighbor_id = self._si.d1_retained_ids[_neighbor] + + if _neighbor_id not in self.blocks: + self.blocks[_neighbor_id] = set() - self.blocks[entity_id_d1].add(entity_id_d2) - self.blocks[entity_id_d2].add(entity_id_d1) + self.blocks[_neighbor_id].add(_entity_id) + self.blocks[_entity_id].add(_neighbor_id) if self.with_entity_matching: - self.graph.add_edge(entity_id_d2, entity_id_d1, weight=self.distances[i][j]) - - j += 1 - + self.graph.add_edge(_entity_id, _neighbor_id, weight=self.distances[_entity][_neighbor_index]) def _similarity_search_with_FALCONN(self): pass @@ -442,6 +495,11 @@ def evaluate(self, export_to_dict, with_classification_report, verbose) + + if with_stats: + self.stats() + + return evaluation if with_stats: self.stats() @@ -468,145 +526,4 @@ def stats(self) -> None: elif self.similarity_search == 'scann' and LINUX_ENV: pass - print(u'\u2500' * 123) - -class PREmbeddingsNNBlockBuilding(EmbeddingsNNBlockBuilding): - """Block building via creation of embeddings and a Nearest Neighbor Approach with specified budget - """ - - _method_name = "Progressive Embeddings-NN Block Building" - _method_info = "Creates a set of candidate pairs for every entity id " + \ - "based on Progresssive Similarity Search over the entity embeddings neighborhood" - - def __init__( - self, - vectorizer: str, - similarity_search: str, - budget: int - ) -> None: - super().__init__(vectorizer, similarity_search) - self._budget = budget - self.vectorizer, self.similarity_search = vectorizer, similarity_search - self.embeddings: np.array - self.vectors_1: np.array - self.vectors_2: np.array = None - self.vector_size: int - self.num_of_clusters: int - self.top_k: int - - def precomputed_vectors(self, - data: Data, - vectors_1: np.array = None, - vectors_2: np.array = None - ) -> bool: - - if(not vectors_1): - return False - - if(not data.is_dirty_er): - return vectors_2 is not None - - return True - - def build_blocks(self, - data: Data, - cc_blocks: dict = None, - vectors_1: np.array = None, - vectors_2: np.array = None, - vector_size: int = 300, - num_of_clusters: int = 5, - top_k: int = 30, - max_word_embeddings_size: int = 256, - attributes_1: list = None, - attributes_2: list = None, - return_vectors: bool = False, - tqdm_disable: bool = True - ) -> any: - """Retrieves the entities retained after the last step, produces/retrieves their embeddings and applies popular NN neighbor techniques - to produce pair candidates - Args: - data (Data): Data Module - cc_blocks (dict, optional): Blocks, subset of the initial dataset retrieved from the last CC step. Defaults to None. - vectors_1 (np.array, optional): Precalculated embeddings of entities of dataset 1. Defaults to None. - vectors_2 (np.array, optional): Precalculated embeddings of entities of dataset 2. Defaults to None. - vector_size (int, optional): The size of the embeddings. Defaults to 300. - num_of_clusters (int, optional): The number of clusters. Defaults to 5. - top_k (int, optional): The number of candidates that will be produced per entity. Defaults to 30. - attributes_1 (list, optional): Attributes of entities in dataset 1 that we want to take into consideration. Defaults to None. - attributes_2 (list, optional): Attributes of entities in dataset 2 that we want to take into consideration. Defaults to None. - return_vectors (bool, optional): If true, returns the entities vector embeddings. Defaults to True. - tqdm_disable (bool, optional): Disable progress bar. For experiment purposes. Defaults to False. - Returns: - any: Blocks produced by Vector Based BB, vector distances and entity embeddings - """ - _start_time = time() - self.blocks = dict() - self.max_word_embeddings_size = max_word_embeddings_size - self.data, self.attributes_1, self.attributes_2, self.vector_size, self.num_of_clusters, self.top_k \ - = data, attributes_1, attributes_2, vector_size, num_of_clusters, top_k - self._progress_bar = tqdm(total=data.num_of_entities, - desc=self._method_name, - disable=tqdm_disable) - - self._si = SubsetIndexer(cc_blocks, self.data) - self._d1_valid_indices: list[int] = self._si.d1_retained_ids - self._d2_valid_indices: list[int] = [x - self.data.dataset_limit for x in self._si.d2_retained_ids] - - - self._entities_d1 = data.dataset_1[attributes_1 if attributes_1 else data.attributes_1] \ - .apply(" ".join, axis=1) \ - .apply(self._tokenize_entity) \ - .values.tolist() - self._entities_d1 = [self._entities_d1[x] for x in self._d1_valid_indices] - - - self._entities_d2 = data.dataset_2[attributes_2 if attributes_2 else data.attributes_2] \ - .apply(" ".join, axis=1) \ - .apply(self._tokenize_entity) \ - .values.tolist() if not data.is_dirty_er else None - self._entities_d2 = [self._entities_d2[x] for x in self._d2_valid_indices] if not data.is_dirty_er else None - - - self.vectors_1 = vectors_1 - self.vectors_2 = vectors_2 - self.create_vectors() - if(return_vectors): return self.vectors_1, self.vectors_2 - self.blocks = self.create_blocks() - self._progress_bar.close() - self.execution_time = time() - _start_time - - return self.blocks - - def create_vectors(self): - - if(self.precomputed_vectors(data = self.data)): - self.vectors_1 = self.vectors_1[self._d1_valid_indices] - - if(not self.data.is_dirty_er): - self.vectors_2 = self.vectors_2[self._d2_valid_indices] - else: - self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - print("Device selected: ", self.device) - - if self.vectorizer in ['word2vec', 'fasttext', 'doc2vec', 'glove']: - vectors_1, vectors_2 = self._create_gensim_embeddings() - elif self.vectorizer in ['bert', 'distilbert', 'roberta', 'xlnet', 'albert']: - vectors_1, vectors_2 = self._create_pretrained_word_embeddings() - elif self.vectorizer in ['smpnet', 'st5', 'glove', 'sdistilroberta', 'sminilm']: - vectors_1, vectors_2 = self._create_pretrained_sentence_embeddings() - else: - raise AttributeError("Not available vectorizer") - - self.vectors_1 = vectors_1 - self.vectors_2 = vectors_2 - - def create_blocks(self): - if self.similarity_search == 'faiss': - self._similarity_search_with_FAISS() - elif self.similarity_search == 'falconn': - raise NotImplementedError("FALCONN") - elif self.similarity_search == 'scann' and LINUX_ENV: - self._similarity_search_with_SCANN() - else: - raise AttributeError("Not available method") - return self.blocks + print(u'\u2500' * 123) \ No newline at end of file