Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Jul 11, 2023
2 parents 947d71a + 06fe066 commit 4ba25e0
Show file tree
Hide file tree
Showing 152 changed files with 7,207 additions and 3,877 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
github.event_name == 'pull_request_target' &&
(github.event.action == 'opened' ||
github.event.action == 'synchronize')
uses: actions/labeler@4.1.0
uses: actions/labeler@v4.3.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
configuration-path: .github/workflows/dev_pr/labeler.yml
Expand Down
37 changes: 12 additions & 25 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,27 @@

[workspace]
exclude = ["datafusion-cli"]
members = [
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/proto",
"datafusion/proto/gen",
"datafusion/row",
"datafusion/sql",
"datafusion/substrait",
"datafusion-examples",
"test-utils",
"benchmarks",
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/proto/gen", "datafusion/row", "datafusion/sql", "datafusion/substrait", "datafusion-examples", "test-utils", "benchmarks",
]
resolver = "2"

[workspace.package]
version = "27.0.0"
edition = "2021"
readme = "README.md"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
homepage = "https://github.com/apache/arrow-datafusion"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.64"
version = "27.0.0"

[workspace.dependencies]
arrow = { version = "42.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow-flight = { version = "42.0.0", features = ["flight-sql-experimental"] }
arrow-buffer = { version = "42.0.0", default-features = false }
arrow-schema = { version = "42.0.0", default-features = false }
arrow-array = { version = "42.0.0", default-features = false, features = ["chrono-tz"] }
parquet = { version = "42.0.0", features = ["arrow", "async", "object_store"] }
arrow = { version = "43.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow-array = { version = "43.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "43.0.0", default-features = false }
arrow-flight = { version = "43.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "43.0.0", default-features = false }
parquet = { version = "43.0.0", features = ["arrow", "async", "object_store"] }
sqlparser = { version = "0.35", features = ["visitor"] }

[profile.release]
Expand Down
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ arrow = { workspace = true }
datafusion = { path = "../datafusion/core", version = "27.0.0" }
env_logger = "0.10"
futures = "0.3"
log = "^0.4"
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
parquet = { workspace = true }
Expand Down
93 changes: 69 additions & 24 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ compare: Comares results from benchmark runs
all(default): Data/Run/Compare for all benchmarks
tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table
tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table
tpch10_mem: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
Expand Down Expand Up @@ -124,14 +126,22 @@ main() {
echo "***************************"
case "$BENCHMARK" in
all)
data_tpch
data_tpch "1"
data_tpch "10"
;;
tpch)
data_tpch
data_tpch "1"
;;
tpch_mem)
# same data for tpch_mem
data_tpch
# same data as for tpch
data_tpch "1"
;;
tpch10)
data_tpch "10"
;;
tpch_mem10)
# same data as for tpch10
data_tpch "10"
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
Expand Down Expand Up @@ -162,16 +172,24 @@ main() {
mkdir -p "${RESULTS_DIR}"
case "$BENCHMARK" in
all)
run_tpch
run_tpch_mem
run_tpch "1"
run_tpch_mem "1"
run_tpch "10"
run_tpch_mem "10"
run_parquet
run_sort
;;
tpch)
run_tpch
run_tpch "1"
;;
tpch_mem)
run_tpch_mem
run_tpch_mem "1"
;;
tpch10)
run_tpch "10"
;;
tpch_mem10)
run_tpch_mem "10"
;;
parquet)
run_parquet
Expand Down Expand Up @@ -201,76 +219,103 @@ main() {



# Creates TPCH data if it doesn't already exist
# Creates TPCH data at a certain scale factor, if it doesn't already
# exist
#
# call like: data_tpch($scale_factor)
#
# Creates data in $DATA_DIR/tpch_sf1 for scale factor 1
# Creates data in $DATA_DIR/tpch_sf10 for scale factor 10
# etc
data_tpch() {
echo "Creating tpch dataset..."
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi

TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"
echo "Creating tpch dataset at Scale Factor ${SCALE_FACTOR} in ${TPCH_DIR}..."

# Ensure the target data directory exists
mkdir -p "${DATA_DIR}"
mkdir -p "${TPCH_DIR}"

# Create 'tbl' (CSV format) data into $DATA_DIR if it does not already exist
SCALE_FACTOR=1
FILE="${DATA_DIR}/supplier.tbl"
FILE="${TPCH_DIR}/supplier.tbl"
if test -f "${FILE}"; then
echo " tbl files exist ($FILE exists)."
else
echo " creating tbl files with tpch_dbgen..."
docker run -v "${DATA_DIR}":/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s ${SCALE_FACTOR}
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s ${SCALE_FACTOR}
fi

# Copy expected answers into the ./data/answers directory if it does not already exist
FILE="${DATA_DIR}/answers/q1.out"
FILE="${TPCH_DIR}/answers/q1.out"
if test -f "${FILE}"; then
echo " Expected answers exist (${FILE} exists)."
else
echo " Copying answers to ${DATA_DIR}/answers"
mkdir -p "${DATA_DIR}/answers"
docker run -v "${DATA_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
echo " Copying answers to ${TPCH_DIR}/answers"
mkdir -p "${TPCH_DIR}/answers"
docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
fi

# Create 'parquet' files from tbl
FILE="${DATA_DIR}/supplier"
FILE="${TPCH_DIR}/supplier"
if test -d "${FILE}"; then
echo " parquet files exist ($FILE exists)."
else
echo " creating parquet files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND --bin tpch -- convert --input "${DATA_DIR}" --output "${DATA_DIR}" --format parquet
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
popd > /dev/null
fi
}

# Runs the tpch benchmark
run_tpch() {
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

RESULTS_FILE="${RESULTS_DIR}/tpch.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${DATA_DIR}" --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 10 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
run_tpch_mem() {
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

RESULTS_FILE="${RESULTS_DIR}/tpch_mem.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${DATA_DIR}" -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 10 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 10 -o ${RESULTS_FILE}
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 10 -o ${RESULTS_FILE}
}

compare_benchmarks() {
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
// under the License.

//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
use log::info;

use arrow::util::pretty::pretty_format_batches;
use datafusion::datasource::file_format::{csv::CsvFormat, FileFormat};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
Expand Down Expand Up @@ -235,6 +237,7 @@ async fn benchmark_query(
let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
let ms = elapsed.as_secs_f64() * 1000.0;
millis.push(ms);
info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
Expand Down
Loading

0 comments on commit 4ba25e0

Please sign in to comment.