From 534709576e61d600b4d185f31be4d9feb74c4650 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 20 Nov 2025 16:15:03 -0800 Subject: [PATCH] Extend random access benchmarks --- .../benchmarks/test_random_access.py | 84 ++++++++++++++++--- .../python/ci_benchmarks/datagen/lineitems.py | 32 +++++-- python/python/ci_benchmarks/datasets.py | 17 ++++ rust/lance-datafusion/src/projection.rs | 2 + 4 files changed, 117 insertions(+), 18 deletions(-) diff --git a/python/python/ci_benchmarks/benchmarks/test_random_access.py b/python/python/ci_benchmarks/benchmarks/test_random_access.py index 62bbc8fe1cd..e5fea790224 100644 --- a/python/python/ci_benchmarks/benchmarks/test_random_access.py +++ b/python/python/ci_benchmarks/benchmarks/test_random_access.py @@ -1,24 +1,88 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The Lance Authors +import multiprocessing as mp +import os import random +from concurrent.futures import ThreadPoolExecutor +from urllib.parse import urlparse import lance import pytest -from ci_benchmarks.datasets import get_dataset_uri +from ci_benchmarks.datasets import open_dataset -DATASETS = ["tpch"] +# POSIX fadvise flag to drop page cache +POSIX_FADV_DONTNEED = 4 + +DATASETS = ["tpch", "tpch-2.1", "mem-tpch", "mem-tpch-2.1"] + + +def drop_cache(ds: lance.LanceDataset): + """Drop page cache for all files in the dataset using posix_fadvise. + + This only works for file-based datasets (not memory://). + """ + # Skip cache dropping for in-memory datasets + parsed = urlparse(ds.uri) + if parsed.scheme == "memory": + return + + # Get all data files from all fragments + for fragment in ds.get_fragments(): + for data_file in fragment.data_files(): + file_path = data_file.path + + # Convert file:// URIs to local paths + if file_path.startswith("file://"): + file_path = urlparse(file_path).path + + # Only process if it's a local file that exists + if os.path.exists(file_path): + try: + with open(file_path, "rb") as f: + os.posix_fadvise(f.fileno(), 0, 0, POSIX_FADV_DONTNEED) + except (OSError, AttributeError): + # posix_fadvise might not be available on all systems + pass @pytest.mark.parametrize("dataset", DATASETS) -def test_random_access(benchmark, dataset): - NUM_INDICES = 10 - dataset_uri = get_dataset_uri(dataset) +@pytest.mark.parametrize("rows_per_take", [1, 10, 100]) +def test_simple_random_access(benchmark, dataset, rows_per_take): + ds = open_dataset(dataset) + num_rows = ds.count_rows() + + def bench(indices): + return ds.take(indices) + + def setup(): + indices = random.sample(range(num_rows), rows_per_take) + return [indices], {} + + drop_cache(ds) + benchmark.pedantic(bench, rounds=100, setup=setup, warmup_rounds=1) + + +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("rows_per_take", [1, 10, 100]) +def test_parallel_random_access(benchmark, dataset, rows_per_take): + TAKES_PER_ITER = 100 + + ds = open_dataset(dataset) + num_rows = ds.count_rows() - ds = lance.dataset(dataset_uri) - random_indices = [random.randint(0, ds.count_rows()) for _ in range(NUM_INDICES)] + def bench(indices): + futures = [] + with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor: + for i in range(TAKES_PER_ITER): + iter_indices = indices[i * rows_per_take : (i + 1) * rows_per_take] + futures.append(executor.submit(ds.take, iter_indices)) + for future in futures: + future.result() - def bench(random_indices): - ds.take(random_indices) + def setup(): + indices = random.sample(range(num_rows), rows_per_take * TAKES_PER_ITER) + return [indices], {} - benchmark.pedantic(bench, args=(random_indices,), rounds=5) + drop_cache(ds) + benchmark.pedantic(bench, rounds=100, setup=setup, warmup_rounds=1) diff --git a/python/python/ci_benchmarks/datagen/lineitems.py b/python/python/ci_benchmarks/datagen/lineitems.py index 4e6d60c67b9..b91c1c3b422 100644 --- a/python/python/ci_benchmarks/datagen/lineitems.py +++ b/python/python/ci_benchmarks/datagen/lineitems.py @@ -12,24 +12,27 @@ NUM_ROWS = 59986052 -def _gen_data(): +def _gen_data(scale_factor: int): LOGGER.info("Using DuckDB to generate TPC-H dataset") con = duckdb.connect(database=":memory:") con.execute("INSTALL tpch; LOAD tpch") - con.execute("CALL dbgen(sf=10)") + con.execute(f"CALL dbgen(sf={scale_factor})") res = con.query("SELECT * FROM lineitem") return res.to_arrow_table() -def _create(dataset_uri: str): +def _create(dataset_uri: str, data_storage_version: str, scale_factor: int = 10): try: ds = lance.dataset(dataset_uri) print(ds.count_rows()) if ds.count_rows() == NUM_ROWS: return elif ds.count_rows() == 0: - lance.write_dataset( - _gen_data(), dataset_uri, mode="append", use_legacy_format=False + ds = lance.write_dataset( + _gen_data(scale_factor), + dataset_uri, + mode="append", + data_storage_version=data_storage_version, ) else: raise Exception( @@ -38,11 +41,24 @@ def _create(dataset_uri: str): "same dataset" ) except ValueError: - lance.write_dataset( - _gen_data(), dataset_uri, mode="create", use_legacy_format=False + ds = lance.write_dataset( + _gen_data(scale_factor), + dataset_uri, + mode="create", + data_storage_version=data_storage_version, ) + return ds def gen_tcph(): dataset_uri = get_dataset_uri("tpch") - _create(dataset_uri) + _create(dataset_uri, data_storage_version="2.0") + dataset_uri = get_dataset_uri("tpch-2.1") + _create(dataset_uri, data_storage_version="2.1") + + +def gen_mem_tcph(data_storage_version: str): + dataset_uri = "memory://tpch" + return _create( + dataset_uri, data_storage_version=data_storage_version, scale_factor=1 + ) diff --git a/python/python/ci_benchmarks/datasets.py b/python/python/ci_benchmarks/datasets.py index f71da448df5..3fc901ff3c8 100644 --- a/python/python/ci_benchmarks/datasets.py +++ b/python/python/ci_benchmarks/datasets.py @@ -4,6 +4,7 @@ from functools import cache from pathlib import Path +import lance import requests from lance.log import LOGGER @@ -41,3 +42,19 @@ def get_dataset_uri(name: str) -> str: raise ValueError("The image_eda dataset is only available on Google Cloud") return "gs://lance-benchmarks-ci-datasets/image_eda.lance" return f"{_get_base_uri()}{name}" + + +def open_dataset(name: str) -> lance.LanceDataset: + if name.startswith("mem-"): + if name == "mem-tpch": + from ci_benchmarks.datagen.lineitems import gen_mem_tcph + + return gen_mem_tcph(data_storage_version="2.0") + elif name == "mem-tpch-2.1": + from ci_benchmarks.datagen.lineitems import gen_mem_tcph + + return gen_mem_tcph(data_storage_version="2.1") + else: + raise ValueError(f"Unknown memory dataset: {name}") + else: + return lance.dataset(get_dataset_uri(name)) diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index 86ca0b0707e..ed8d243ac35 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -12,6 +12,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; +use tracing::instrument; use lance_core::{ datatypes::{BlobVersion, OnMissing, Projectable, Projection, Schema}, @@ -422,6 +423,7 @@ impl ProjectionPlan { Ok(ArrowSchema::new(fields)) } + #[instrument(skip_all, level = "debug")] pub async fn project_batch(&self, batch: RecordBatch) -> Result { let src = Arc::new(OneShotExec::from_batch(batch)); let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;