Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 74 additions & 10 deletions python/python/ci_benchmarks/benchmarks/test_random_access.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,88 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import multiprocessing as mp
import os
import random
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse

import lance
import pytest
from ci_benchmarks.datasets import get_dataset_uri
from ci_benchmarks.datasets import open_dataset

DATASETS = ["tpch"]
# POSIX fadvise flag to drop page cache
POSIX_FADV_DONTNEED = 4

DATASETS = ["tpch", "tpch-2.1", "mem-tpch", "mem-tpch-2.1"]


def drop_cache(ds: lance.LanceDataset):
"""Drop page cache for all files in the dataset using posix_fadvise.

This only works for file-based datasets (not memory://).
"""
# Skip cache dropping for in-memory datasets
parsed = urlparse(ds.uri)
if parsed.scheme == "memory":
return

# Get all data files from all fragments
for fragment in ds.get_fragments():
for data_file in fragment.data_files():
file_path = data_file.path

# Convert file:// URIs to local paths
if file_path.startswith("file://"):
file_path = urlparse(file_path).path

# Only process if it's a local file that exists
if os.path.exists(file_path):
try:
with open(file_path, "rb") as f:
os.posix_fadvise(f.fileno(), 0, 0, POSIX_FADV_DONTNEED)
except (OSError, AttributeError):
# posix_fadvise might not be available on all systems
pass


@pytest.mark.parametrize("dataset", DATASETS)
def test_random_access(benchmark, dataset):
NUM_INDICES = 10
dataset_uri = get_dataset_uri(dataset)
@pytest.mark.parametrize("rows_per_take", [1, 10, 100])
def test_simple_random_access(benchmark, dataset, rows_per_take):
ds = open_dataset(dataset)
num_rows = ds.count_rows()

def bench(indices):
return ds.take(indices)

def setup():
indices = random.sample(range(num_rows), rows_per_take)
return [indices], {}

drop_cache(ds)
benchmark.pedantic(bench, rounds=100, setup=setup, warmup_rounds=1)


@pytest.mark.parametrize("dataset", DATASETS)
@pytest.mark.parametrize("rows_per_take", [1, 10, 100])
def test_parallel_random_access(benchmark, dataset, rows_per_take):
TAKES_PER_ITER = 100

ds = open_dataset(dataset)
num_rows = ds.count_rows()

ds = lance.dataset(dataset_uri)
random_indices = [random.randint(0, ds.count_rows()) for _ in range(NUM_INDICES)]
def bench(indices):
futures = []
with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor:
for i in range(TAKES_PER_ITER):
iter_indices = indices[i * rows_per_take : (i + 1) * rows_per_take]
futures.append(executor.submit(ds.take, iter_indices))
for future in futures:
future.result()

def bench(random_indices):
ds.take(random_indices)
def setup():
indices = random.sample(range(num_rows), rows_per_take * TAKES_PER_ITER)
return [indices], {}

benchmark.pedantic(bench, args=(random_indices,), rounds=5)
drop_cache(ds)
benchmark.pedantic(bench, rounds=100, setup=setup, warmup_rounds=1)
32 changes: 24 additions & 8 deletions python/python/ci_benchmarks/datagen/lineitems.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,27 @@
NUM_ROWS = 59986052


def _gen_data():
def _gen_data(scale_factor: int):
LOGGER.info("Using DuckDB to generate TPC-H dataset")
con = duckdb.connect(database=":memory:")
con.execute("INSTALL tpch; LOAD tpch")
con.execute("CALL dbgen(sf=10)")
con.execute(f"CALL dbgen(sf={scale_factor})")
res = con.query("SELECT * FROM lineitem")
return res.to_arrow_table()


def _create(dataset_uri: str):
def _create(dataset_uri: str, data_storage_version: str, scale_factor: int = 10):
try:
ds = lance.dataset(dataset_uri)
print(ds.count_rows())
if ds.count_rows() == NUM_ROWS:
return
elif ds.count_rows() == 0:
lance.write_dataset(
_gen_data(), dataset_uri, mode="append", use_legacy_format=False
ds = lance.write_dataset(
_gen_data(scale_factor),
dataset_uri,
mode="append",
data_storage_version=data_storage_version,
)
else:
raise Exception(
Expand All @@ -38,11 +41,24 @@ def _create(dataset_uri: str):
"same dataset"
)
except ValueError:
lance.write_dataset(
_gen_data(), dataset_uri, mode="create", use_legacy_format=False
ds = lance.write_dataset(
_gen_data(scale_factor),
dataset_uri,
mode="create",
data_storage_version=data_storage_version,
)
return ds


def gen_tcph():
dataset_uri = get_dataset_uri("tpch")
_create(dataset_uri)
_create(dataset_uri, data_storage_version="2.0")
dataset_uri = get_dataset_uri("tpch-2.1")
_create(dataset_uri, data_storage_version="2.1")


def gen_mem_tcph(data_storage_version: str):
dataset_uri = "memory://tpch"
return _create(
dataset_uri, data_storage_version=data_storage_version, scale_factor=1
)
17 changes: 17 additions & 0 deletions python/python/ci_benchmarks/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from functools import cache
from pathlib import Path

import lance
import requests
from lance.log import LOGGER

Expand Down Expand Up @@ -41,3 +42,19 @@ def get_dataset_uri(name: str) -> str:
raise ValueError("The image_eda dataset is only available on Google Cloud")
return "gs://lance-benchmarks-ci-datasets/image_eda.lance"
return f"{_get_base_uri()}{name}"


def open_dataset(name: str) -> lance.LanceDataset:
if name.startswith("mem-"):
if name == "mem-tpch":
from ci_benchmarks.datagen.lineitems import gen_mem_tcph

return gen_mem_tcph(data_storage_version="2.0")
elif name == "mem-tpch-2.1":
from ci_benchmarks.datagen.lineitems import gen_mem_tcph

return gen_mem_tcph(data_storage_version="2.1")
else:
raise ValueError(f"Unknown memory dataset: {name}")
else:
return lance.dataset(get_dataset_uri(name))
2 changes: 2 additions & 0 deletions rust/lance-datafusion/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tracing::instrument;

use lance_core::{
datatypes::{BlobVersion, OnMissing, Projectable, Projection, Schema},
Expand Down Expand Up @@ -422,6 +423,7 @@ impl ProjectionPlan {
Ok(ArrowSchema::new(fields))
}

#[instrument(skip_all, level = "debug")]
pub async fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let src = Arc::new(OneShotExec::from_batch(batch));
let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
Expand Down