From e4c6e815c986bf193915387b36def16d8d71aba1 Mon Sep 17 00:00:00 2001 From: Sean MacAvaney Date: Sat, 23 Nov 2024 21:33:36 +0000 Subject: [PATCH] drop_query_vec (#24) * drop_query_vec for np_retriever * faiss retrievers * the rest * remove unused import * drop_query_vec for gar and ladr * column sort order --- pyterrier_dr/flex/faiss_retr.py | 52 ++++++++++----------- pyterrier_dr/flex/gar.py | 26 ++++++++--- pyterrier_dr/flex/ladr.py | 77 +++++++++++++++++-------------- pyterrier_dr/flex/np_retr.py | 40 ++++++++-------- pyterrier_dr/flex/scann_retr.py | 38 +++++++-------- pyterrier_dr/flex/torch_retr.py | 38 ++++++++------- pyterrier_dr/flex/voyager_retr.py | 38 +++++++-------- requirements.txt | 2 +- tests/test_flexindex.py | 17 ++++++- 9 files changed, 182 insertions(+), 146 deletions(-) diff --git a/pyterrier_dr/flex/faiss_retr.py b/pyterrier_dr/flex/faiss_retr.py index 0ad207d..1a0a802 100644 --- a/pyterrier_dr/flex/faiss_retr.py +++ b/pyterrier_dr/flex/faiss_retr.py @@ -1,36 +1,34 @@ import json -import pandas as pd import math import struct import os import pyterrier as pt -import itertools import numpy as np import tempfile import ir_datasets import pyterrier_dr +import pyterrier_alpha as pta from . import FlexIndex logger = ir_datasets.log.easy() class FaissRetriever(pt.Indexer): - def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64): + def __init__(self, flex_index, faiss_index, n_probe=None, ef_search=None, search_bounded_queue=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.faiss_index = faiss_index self.n_probe = n_probe self.ef_search = ef_search self.search_bounded_queue = search_bounded_queue self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) - assert all(f in inp.columns for f in ['qid', 'query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - idxs = [] - res = {'docid': [], 'score': [], 'rank': []} num_q = query_vecs.shape[0] QBATCH = self.qbatch if self.n_probe is not None: @@ -42,25 +40,27 @@ def transform(self, inp): it = range(0, num_q, QBATCH) if self.flex_index.verbose: it = logger.pbar(it, unit='qbatch') + + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) for qidx in it: scores, dids = self.faiss_index.search(query_vecs[qidx:qidx+QBATCH], self.flex_index.num_results) - for i, (s, d) in enumerate(zip(scores, dids)): + for s, d in zip(scores, dids): mask = d != -1 d = d[mask] s = s[mask] - res['docid'].append(d) - res['score'].append(s) - res['rank'].append(np.arange(d.shape[0])) - idxs.extend(itertools.repeat(qidx+i, d.shape[0])) - res = {k: np.concatenate(v) for k, v in res.items()} - res['docno'] = docnos.fwd[res['docid']] - for col in inp.columns: - if col != 'query_vec': - res[col] = inp[col][idxs].values - return pd.DataFrame(res) - - -def _faiss_flat_retriever(self, gpu=False, qbatch=64): + result.extend({ + 'docno': docnos.fwd[d], + 'docid': d, + 'score': s, + 'rank': np.arange(d.shape[0]), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) + + +def _faiss_flat_retriever(self, gpu=False, qbatch=64, drop_query_vec=False): pyterrier_dr.util.assert_faiss() import faiss if 'faiss_flat' not in self._cache: @@ -80,12 +80,12 @@ def _faiss_flat_retriever(self, gpu=False, qbatch=64): co = faiss.GpuMultipleClonerOptions() co.shard = True self._cache['faiss_flat_gpu'] = faiss.index_cpu_to_all_gpus(self._faiss_flat, co=co) - return FaissRetriever(self, self._cache['faiss_flat_gpu']) - return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch) + return FaissRetriever(self, self._cache['faiss_flat_gpu'], drop_query_vec=drop_query_vec) + return FaissRetriever(self, self._cache['faiss_flat'], qbatch=qbatch, drop_query_vec=drop_query_vec) FlexIndex.faiss_flat_retriever = _faiss_flat_retriever -def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64): +def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, cache=True, search_bounded_queue=True, qbatch=64, drop_query_vec=False): pyterrier_dr.util.assert_faiss() import faiss meta, = self.payload(return_dvecs=False, return_docnos=False) @@ -107,7 +107,7 @@ def _faiss_hnsw_retriever(self, neighbours=32, ef_construction=40, ef_search=16, with logger.duration('reading hnsw table'): self._cache[key] = faiss.read_index(str(self.index_path/index_name)) self._cache[key].storage = self.faiss_flat_retriever().faiss_index - return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch) + return FaissRetriever(self, self._cache[key], ef_search=ef_search, search_bounded_queue=search_bounded_queue, qbatch=qbatch, drop_query_vec=drop_query_vec) FlexIndex.faiss_hnsw_retriever = _faiss_hnsw_retriever @@ -154,7 +154,7 @@ def _sample_train(index, count=None): idxs = np.random.RandomState(0).choice(dvecs.shape[0], size=count, replace=False) return dvecs[idxs] -def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1): +def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_probe=1, drop_query_vec=False): pyterrier_dr.util.assert_faiss() import faiss meta, = self.payload(return_dvecs=False, return_docnos=False) @@ -197,5 +197,5 @@ def _faiss_ivf_retriever(self, train_sample=None, n_list=None, cache=True, n_pro else: with logger.duration('reading index'): self._cache[key] = faiss.read_index(str(self.index_path/index_name)) - return FaissRetriever(self, self._cache[key], n_probe=n_probe) + return FaissRetriever(self, self._cache[key], n_probe=n_probe, drop_query_vec=drop_query_vec) FlexIndex.faiss_ivf_retriever = _faiss_ivf_retriever diff --git a/pyterrier_dr/flex/gar.py b/pyterrier_dr/flex/gar.py index 7381faf..a9356c8 100644 --- a/pyterrier_dr/flex/gar.py +++ b/pyterrier_dr/flex/gar.py @@ -1,24 +1,31 @@ -import pandas as pd import pyterrier as pt import heapq +import pyterrier_alpha as pta from . import FlexIndex import numpy as np from pyterrier_dr import SimFn class FlexGar(pt.Transformer): - def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000): + def __init__(self, flex_index, graph, score_fn, batch_size=128, num_results=1000, drop_query_vec=False): self.flex_index = flex_index self.docnos, self.dvecs, _ = flex_index.payload() self.score_fn = score_fn self.graph = graph self.batch_size = batch_size self.num_results = num_results + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'qid' in inp.columns and 'query_vec' in inp.columns and 'docno' in inp.columns and 'score' in inp.columns - all_results = [] + pta.validate.result_frame(inp, extra_columns=['query_vec', 'score']) + + qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec'] + if not self.drop_query_vec: + qcols += ['query_vec'] + all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank']) + for qid, inp in inp.groupby('qid'): qvec = inp['query_vec'].iloc[0].reshape(1, -1) + qdata = {col: [inp[col].iloc[0]] for col in qcols} initial_heap = list(zip(-inp['score'], self.docnos.inv[inp['docno']])) heapq.heapify(initial_heap) results = {} @@ -47,10 +54,15 @@ def transform(self, inp): for did, score in zip(batch, scores): results[did] = score heapq.heappush(frontier_heap, (-score, did)) - for rank, (did, score) in enumerate(sorted(results.items(), key=lambda x: (-x[1], x[0]))): - all_results.append({'qid': qid, 'docno': self.docnos.fwd[did], 'score': score, 'rank': rank}) i += 1 - return pd.DataFrame(all_results) + d, s = zip(*sorted(results.items(), key=lambda x: (-x[1], x[0]))) + all_results.extend(dict( + **qdata, + docno=self.docnos.fwd[d], + score=s, + rank=np.arange(len(s)), + )) + return all_results.to_df() diff --git a/pyterrier_dr/flex/ladr.py b/pyterrier_dr/flex/ladr.py index ee83ba5..b84c50c 100644 --- a/pyterrier_dr/flex/ladr.py +++ b/pyterrier_dr/flex/ladr.py @@ -1,32 +1,34 @@ -import pandas as pd -import itertools import numpy as np import pyterrier as pt -from .. import SimFn +import pyterrier_alpha as pta from . import FlexIndex import ir_datasets logger = ir_datasets.log.easy() class LadrPreemptive(pt.Transformer): - def __init__(self, flex_index, graph, dense_scorer, hops=1): + def __init__(self, flex_index, graph, dense_scorer, hops=1, drop_query_vec=False): self.flex_index = flex_index self.graph = graph self.dense_scorer = dense_scorer self.hops = hops + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'query_vec' in inp.columns and 'qid' in inp.columns - assert 'docno' in inp.columns + pta.validate.result_frame(inp, extra_columns=['query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) - res = {'qid': [], 'docid': [], 'score': []} + qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec'] + if not self.drop_query_vec: + qcols += ['query_vec'] + all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank']) + it = iter(inp.groupby('qid')) if self.flex_index.verbose: it = logger.pbar(it) for qid, df in it: + qdata = {col: [df[col].iloc[0]] for col in qcols} docids = docnos.inv[df['docno'].values] - lx_docids = docids ext_docids = [docids] for _ in range(self.hops): docids = self.graph.edges_data[docids].reshape(-1) @@ -40,40 +42,46 @@ def transform(self, inp): else: idxs = np.arange(scores.shape[0]) docids, scores = ext_docids[idxs], scores[idxs] - res['qid'].extend(itertools.repeat(qid, len(docids))) - res['docid'].append(docids) - res['score'].append(scores) - res['docid'] = np.concatenate(res['docid']) - res['score'] = np.concatenate(res['score']) - res['docno'] = docnos.fwd[res['docid']] - res = pd.DataFrame(res) - res = pt.model.add_ranks(res) - return res + idxs = np.argsort(-scores) + docids, scores = docids[idxs], scores[idxs] + all_results.extend(dict( + **qdata, + docno=docnos.fwd[docids], + score=scores, + rank=np.arange(len(scores)), + )) + return all_results.to_df() + -def _pre_ladr(self, k=16, hops=1, dense_scorer=None): +def _pre_ladr(self, k=16, hops=1, dense_scorer=None, drop_query_vec=False): graph = self.corpus_graph(k) if isinstance(k, int) else k - return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer()) + return LadrPreemptive(self, graph, hops=hops, dense_scorer=dense_scorer or self.scorer(), drop_query_vec=drop_query_vec) FlexIndex.ladr = _pre_ladr # TODO: remove this alias later FlexIndex.pre_ladr = _pre_ladr class LadrAdaptive(pt.Transformer): - def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None): + def __init__(self, flex_index, graph, dense_scorer, depth=100, max_hops=None, drop_query_vec=False): self.flex_index = flex_index self.graph = graph self.dense_scorer = dense_scorer self.depth = depth self.max_hops = max_hops + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'query_vec' in inp.columns and 'qid' in inp.columns - assert 'docno' in inp.columns + pta.validate.result_frame(inp, extra_columns=['query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) - res = {'qid': [], 'docid': [], 'score': []} + qcols = [col for col in inp.columns if col.startswith('q') and col != 'query_vec'] + if not self.drop_query_vec: + qcols += ['query_vec'] + all_results = pta.DataFrameBuilder(qcols + ['docno', 'score', 'rank']) + it = iter(inp.groupby('qid')) if self.flex_index.verbose: it = logger.pbar(it) for qid, df in it: + qdata = {col: [df[col].iloc[0]] for col in qcols} query_vecs = df['query_vec'].iloc[0].reshape(1, -1) docids = np.unique(docnos.inv[df['docno'].values]) scores = self.dense_scorer.score(query_vecs, docids).reshape(-1) @@ -98,17 +106,18 @@ def transform(self, inp): idxs = np.argpartition(scores, -self.flex_index.num_results)[-self.flex_index.num_results:] else: idxs = np.arange(scores.shape[0]) - res['qid'].extend(itertools.repeat(qid, len(idxs))) - res['docid'].append(docids[idxs]) - res['score'].append(scores[idxs]) - res['docid'] = np.concatenate(res['docid']) - res['score'] = np.concatenate(res['score']) - res['docno'] = docnos.fwd[res['docid']] - res = pd.DataFrame(res) - res = pt.model.add_ranks(res) - return res + docids, scores = docids[idxs], scores[idxs] + idxs = np.argsort(-scores) + docids, scores = docids[idxs], scores[idxs] + all_results.extend(dict( + **qdata, + docno=docnos.fwd[docids], + score=scores, + rank=np.arange(len(scores)), + )) + return all_results.to_df() -def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None): +def _ada_ladr(self, k=16, dense_scorer=None, depth=100, max_hops=None, drop_query_vec=False): graph = self.corpus_graph(k) if isinstance(k, int) else k - return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops) + return LadrAdaptive(self, graph, dense_scorer=dense_scorer or self.scorer(), depth=depth, max_hops=max_hops, drop_query_vec=drop_query_vec) FlexIndex.ada_ladr = _ada_ladr diff --git a/pyterrier_dr/flex/np_retr.py b/pyterrier_dr/flex/np_retr.py index ff9a21d..6d9fb9a 100644 --- a/pyterrier_dr/flex/np_retr.py +++ b/pyterrier_dr/flex/np_retr.py @@ -1,4 +1,3 @@ -import itertools import pyterrier as pt import numpy as np import pandas as pd @@ -6,16 +5,19 @@ from ..indexes import RankedLists from . import FlexIndex import ir_datasets +import pyterrier_alpha as pta logger = ir_datasets.log.easy() class NumpyRetriever(pt.Transformer): - def __init__(self, flex_index, num_results=1000, batch_size=None): + def __init__(self, flex_index, num_results=1000, batch_size=None, drop_query_vec=False): self.flex_index = flex_index self.num_results = num_results self.batch_size = batch_size or 4096 + self.drop_query_vec = drop_query_vec def transform(self, inp: pd.DataFrame) -> pd.DataFrame: + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) query_vecs = np.stack(inp['query_vec']) docnos, dvecs, config = self.flex_index.payload() @@ -37,19 +39,19 @@ def transform(self, inp: pd.DataFrame) -> pd.DataFrame: scores = query_vecs @ doc_batch dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0) ranked_lists.update(scores, dids) - result_scores, result_dids = ranked_lists.results() - result_docnos = docnos.fwd[result_dids] - cols = { - 'score': np.concatenate(result_scores), - 'docno': np.concatenate(result_docnos), - 'docid': np.concatenate(result_dids), - 'rank': np.concatenate([np.arange(len(scores)) for scores in result_scores]), - } - idxs = list(itertools.chain(*(itertools.repeat(i, len(scores)) for i, scores in enumerate(result_scores)))) - for col in inp.columns: - if col != 'query_vec': - cols[col] = inp[col][idxs].values - return pd.DataFrame(cols) + + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) + for scores, dids in zip(*ranked_lists.results()): + result.extend({ + 'docno': docnos.fwd[dids], + 'docid': dids, + 'score': scores, + 'rank': np.arange(len(scores)), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) class NumpyVectorLoader(pt.Transformer): @@ -108,18 +110,18 @@ def transform(self, inp): return res -def _np_retriever(self, num_results=1000, batch_size=None): - return NumpyRetriever(self, num_results=num_results, batch_size=batch_size) +def _np_retriever(self, num_results=1000, batch_size=None, drop_query_vec=False): + return NumpyRetriever(self, num_results=num_results, batch_size=batch_size, drop_query_vec=drop_query_vec) FlexIndex.np_retriever = _np_retriever def _np_vec_loader(self): - return NumpyVectorLoader(self) + return NumpyVectorLoader(self) FlexIndex.np_vec_loader = _np_vec_loader FlexIndex.vec_loader = _np_vec_loader # default vec_loader def _np_scorer(self, num_results=None): - return NumpyScorer(self, num_results) + return NumpyScorer(self, num_results) FlexIndex.np_scorer = _np_scorer FlexIndex.scorer = _np_scorer # default scorer diff --git a/pyterrier_dr/flex/scann_retr.py b/pyterrier_dr/flex/scann_retr.py index 628d912..e55eb9a 100644 --- a/pyterrier_dr/flex/scann_retr.py +++ b/pyterrier_dr/flex/scann_retr.py @@ -1,10 +1,9 @@ -import pandas as pd import math import os import pyterrier as pt -import itertools import numpy as np import ir_datasets +import pyterrier_alpha as pta import pyterrier_dr from . import FlexIndex @@ -12,41 +11,42 @@ class ScannRetriever(pt.Indexer): - def __init__(self, flex_index, scann_index, leaves_to_search=None, qbatch=64): + def __init__(self, flex_index, scann_index, leaves_to_search=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.scann_index = scann_index self.leaves_to_search = leaves_to_search self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) - assert all(f in inp.columns for f in ['qid', 'query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - idxs = [] - res = {'docid': [], 'score': [], 'rank': []} + + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) num_q = query_vecs.shape[0] QBATCH = self.qbatch for qidx in range(0, num_q, QBATCH): dids, scores = self.scann_index.search_batched(query_vecs[qidx:qidx+QBATCH], leaves_to_search=self.leaves_to_search, final_num_neighbors=self.flex_index.num_results) - for i, (s, d) in enumerate(zip(scores, dids)): + for s, d in zip(scores, dids): mask = d != -1 d = d[mask] s = s[mask] - res['docid'].append(d) - res['score'].append(s) - res['rank'].append(np.arange(d.shape[0])) - idxs.extend(itertools.repeat(qidx+i, d.shape[0])) - res = {k: np.concatenate(v) for k, v in res.items()} - res['docno'] = docnos.fwd[res['docid']] - for col in inp.columns: - if col != 'query_vec': - res[col] = inp[col][idxs].values - return pd.DataFrame(res) + result.extend({ + 'docno': docnos.fwd[d], + 'docid': d, + 'score': s, + 'rank': np.arange(d.shape[0]), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) -def _scann_retriever(self, n_leaves=None, leaves_to_search=1, train_sample=None): +def _scann_retriever(self, n_leaves=None, leaves_to_search=1, train_sample=None, drop_query_vec=False): pyterrier_dr.util.assert_scann() import scann dvecs, meta, = self.payload(return_docnos=False) @@ -79,5 +79,5 @@ def _scann_retriever(self, n_leaves=None, leaves_to_search=1, train_sample=None) else: with logger.duration('reading index'): self._cache[key] = scann.scann_ops_pybind.load_searcher(dvecs, str(self.index_path/index_name)) - return ScannRetriever(self, self._cache[key], leaves_to_search=leaves_to_search) + return ScannRetriever(self, self._cache[key], leaves_to_search=leaves_to_search, drop_query_vec=drop_query_vec) FlexIndex.scann_retriever = _scann_retriever diff --git a/pyterrier_dr/flex/torch_retr.py b/pyterrier_dr/flex/torch_retr.py index a8e32d7..da3e133 100644 --- a/pyterrier_dr/flex/torch_retr.py +++ b/pyterrier_dr/flex/torch_retr.py @@ -1,6 +1,6 @@ -import pandas as pd import numpy as np import torch +import pyterrier_alpha as pta import pyterrier as pt from .. import SimFn, infer_device from . import FlexIndex @@ -30,15 +30,16 @@ def score(self, query_vecs, docids): class TorchRetriever(pt.Transformer): - def __init__(self, flex_index, torch_vecs, num_results=None, qbatch=64): + def __init__(self, flex_index, torch_vecs, num_results=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.torch_vecs = torch_vecs self.num_results = num_results or 1000 self.docnos, meta = flex_index.payload(return_dvecs=False) self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): - assert 'query_vec' in inp.columns + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) query_vecs = np.stack(inp['query_vec']) query_vecs = torch.from_numpy(query_vecs).to(self.torch_vecs) @@ -47,10 +48,7 @@ def transform(self, inp): if self.flex_index.verbose: it = pt.tqdm(it, desc='TorchRetriever', unit='qbatch') - res_scores = [] - res_docids = [] - res_idxs = [] - res_ranks = [] + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) for start_idx in it: end_idx = start_idx + self.qbatch batch = query_vecs[start_idx:end_idx] @@ -63,17 +61,17 @@ def transform(self, inp): else: docids = scores.argsort(descending=True, dim=1) scores = torch.gather(scores, dim=1, index=docids) - res_scores.append(scores.cpu().numpy().reshape(-1)) - res_docids.append(docids.cpu().numpy().reshape(-1)) - res_idxs.append(np.arange(start_idx, start_idx+batch.shape[0]).reshape(-1, 1).repeat(scores.shape[1], axis=1).reshape(-1)) - res_ranks.append(np.arange(scores.shape[1]).reshape(1, -1).repeat(batch.shape[0], axis=0).reshape(-1)) - res_idxs = np.concatenate(res_idxs) - res = {k: inp[k][res_idxs] for k in inp.columns if k not in ['docid', 'docno', 'rank', 'score']} - res['score'] = np.concatenate(res_scores) - res['docid'] = np.concatenate(res_docids) - res['docno'] = self.docnos.fwd[res['docid']] - res['rank'] = np.concatenate(res_ranks) - return pd.DataFrame(res) + for s, d in zip(scores.cpu().numpy(), docids.cpu().numpy()): + result.extend({ + 'docno': self.docnos[d], + 'docid': d, + 'score': s, + 'rank': np.arange(s.shape[0]), + }) + + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) def _torch_vecs(self, device=None, fp16=False): @@ -94,6 +92,6 @@ def _torch_scorer(self, num_results=None, device=None, fp16=False): FlexIndex.torch_scorer = _torch_scorer -def _torch_retriever(self, num_results=None, device=None, fp16=False, qbatch=64): - return TorchRetriever(self, self.torch_vecs(device=device, fp16=fp16), num_results=num_results, qbatch=qbatch) +def _torch_retriever(self, num_results=None, device=None, fp16=False, qbatch=64, drop_query_vec=False): + return TorchRetriever(self, self.torch_vecs(device=device, fp16=fp16), num_results=num_results, qbatch=qbatch, drop_query_vec=drop_query_vec) FlexIndex.torch_retriever = _torch_retriever diff --git a/pyterrier_dr/flex/voyager_retr.py b/pyterrier_dr/flex/voyager_retr.py index 6e7197d..1890f2a 100644 --- a/pyterrier_dr/flex/voyager_retr.py +++ b/pyterrier_dr/flex/voyager_retr.py @@ -1,9 +1,8 @@ -import pandas as pd import os import pyterrier as pt -import itertools import numpy as np import ir_datasets +import pyterrier_alpha as pta import pyterrier_dr from . import FlexIndex @@ -11,20 +10,21 @@ class VoyagerRetriever(pt.Indexer): - def __init__(self, flex_index, voyager_index, query_ef=None, qbatch=64): + def __init__(self, flex_index, voyager_index, query_ef=None, qbatch=64, drop_query_vec=False): self.flex_index = flex_index self.voyager_index = voyager_index self.query_ef = query_ef self.qbatch = qbatch + self.drop_query_vec = drop_query_vec def transform(self, inp): + pta.validate.query_frame(inp, extra_columns=['query_vec']) inp = inp.reset_index(drop=True) - assert all(f in inp.columns for f in ['qid', 'query_vec']) docnos, config = self.flex_index.payload(return_dvecs=False) query_vecs = np.stack(inp['query_vec']) query_vecs = query_vecs.copy() - idxs = [] - res = {'docid': [], 'score': [], 'rank': []} + + result = pta.DataFrameBuilder(['docno', 'docid', 'score', 'rank']) num_q = query_vecs.shape[0] QBATCH = self.qbatch it = range(0, num_q, QBATCH) @@ -33,23 +33,23 @@ def transform(self, inp): for qidx in it: qvec_batch = query_vecs[qidx:qidx+QBATCH] neighbor_ids, distances = self.voyager_index.query(qvec_batch, self.flex_index.num_results, self.query_ef) - for i, (s, d) in enumerate(zip(distances, neighbor_ids)): + for s, d in zip(distances, neighbor_ids): mask = d != -1 d = d[mask] s = s[mask] - res['docid'].append(d) - res['score'].append(-s) - res['rank'].append(np.arange(d.shape[0])) - idxs.extend(itertools.repeat(qidx+i, d.shape[0])) - res = {k: np.concatenate(v) for k, v in res.items()} - res['docno'] = docnos.fwd[res['docid']] - for col in inp.columns: - if col != 'query_vec': - res[col] = inp[col][idxs].values - return pd.DataFrame(res) + result.extend({ + 'docno': docnos.fwd[d], + 'docid': d, + 'score': -s, + 'rank': np.arange(d.shape[0]), + }) + if self.drop_query_vec: + inp = inp.drop(columns='query_vec') + return result.to_df(inp) -def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1, storage_data_type='float32', query_ef=10): + +def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1, storage_data_type='float32', query_ef=10, drop_query_vec=False): pyterrier_dr.util.assert_voyager() import voyager meta, = self.payload(return_dvecs=False, return_docnos=False) @@ -83,5 +83,5 @@ def _voyager_retriever(self, neighbours=12, ef_construction=200, random_seed=1, else: with logger.duration('reading index'): self._cache[key] = voyager.Index.load(str(self.index_path/index_name)) - return VoyagerRetriever(self, self._cache[key], query_ef=query_ef) + return VoyagerRetriever(self, self._cache[key], query_ef=query_ef, drop_query_vec=drop_query_vec) FlexIndex.voyager_retriever = _voyager_retriever diff --git a/requirements.txt b/requirements.txt index 5697af2..3184823 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ transformers python-terrier>=0.11.0 -pyterrier-alpha>=0.2.0 +pyterrier-alpha>=0.9.3 torch numpy>=1.21.0, <2.0.0 npids diff --git a/tests/test_flexindex.py b/tests/test_flexindex.py index 9d5e99b..da043aa 100644 --- a/tests/test_flexindex.py +++ b/tests/test_flexindex.py @@ -73,7 +73,7 @@ def _test_retr(self, Retr, exact=True, test_smaller=True): {'qid': '0', 'query_vec': dataset[0]['doc_vec']}, {'qid': '1', 'query_vec': dataset[1]['doc_vec']}, ])) - self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score']) + self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score', 'query_vec']) if exact: self.assertEqual(len(res), 2000) self.assertEqual(len(res[res.qid=='0']), 1000) @@ -85,6 +85,21 @@ def _test_retr(self, Retr, exact=True, test_smaller=True): self.assertTrue(len(res[res.qid=='0']) <= 1000) self.assertTrue(len(res[res.qid=='1']) <= 1000) + with self.subTest('drop_query_vec=True'): + destdir = tempfile.mkdtemp() + self.test_dirs.append(destdir) + index = FlexIndex(destdir+'/index') + dataset = self._generate_data(count=2000) + index.index(dataset) + + retr = Retr(index, drop_query_vec=True) + res = retr(pd.DataFrame([ + {'qid': '0', 'query_vec': dataset[0]['doc_vec']}, + {'qid': '1', 'query_vec': dataset[1]['doc_vec']}, + ])) + self.assertTrue(all(c in res.columns) for c in ['qid', 'docno', 'rank', 'score']) + self.assertTrue(all(c not in res.columns) for c in ['query_vec']) + if test_smaller: with self.subTest('smaller'): destdir = tempfile.mkdtemp()