Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/sqllogictest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@ export RUST_MIN_STACK=30485760;
PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test sqllogictests
```

To update the sqllite expected answers use the `datafusion/sqllogictest/regenerate_sqlite_files.sh` script.

Note this must be run with an empty postgres instance. For example

```shell
PG_URI=postgresql://postgres@localhost:5432/postgres bash datafusion/sqllogictest/regenerate_sqlite_files.sh
```
Copy link
Copy Markdown
Contributor

@jayzhan211 jayzhan211 Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postgresql://postgres@localhost:5432/postgres doesn't work for me.

Run with $(whoami) to find your name, and the command should be

PG_URI=postgres://$(whoami)@localhost:5432/{database} bash datafusion/sqllogictest/regenerate_sqlite_files.sh

And run this to find database

➜  datafusion git:(count-schema-name) ✗ psql -U jayzhan -h localhost -p 5432 -l
                           List of databases
   Name    |  Owner  | Encoding | Collate | Ctype |  Access privileges  
-----------+---------+----------+---------+-------+---------------------
 postgres  | jayzhan | UTF8     | C       | C     | 
 template0 | jayzhan | UTF8     | C       | C     | =c/jayzhan         +
           |         |          |         |       | jayzhan=CTc/jayzhan
 template1 | jayzhan | UTF8     | C       | C     | =c/jayzhan         +
           |         |          |         |       | jayzhan=CTc/jayzhan
(3 rows)

This works

PG_URI=postgres://jayzhan@localhost:5432/postgres bash datafusion/sqllogictest/regenerate_sqlite_files.sh

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just run postgres in docker:

docker run --name df_postgres -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test -e POSTGRES_DB=test -d -p 5432:5432 postgres:latest
export PG_URI=postgres://test:test@host.docker.internal:5432/test
./datafusion/sqllogictest/regenerate_sqlite_files.sh


## Updating tests: Completion Mode

In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/regenerate/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
// under the License.

use clap::Parser;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion::common::instant::Instant;
use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result};
use datafusion::common::runtime::SpawnedTask;
use datafusion_sqllogictest::{DataFusion, TestContext};
use futures::stream::StreamExt;
use indicatif::{
Expand Down Expand Up @@ -378,7 +378,7 @@ async fn run_test_file_with_postgres(
_mp: MultiProgress,
_mp_style: ProgressStyle,
) -> Result<()> {
use datafusion_common::plan_err;
use datafusion::common::plan_err;
plan_err!("Can not run with postgres as postgres feature is not enabled")
}

Expand Down Expand Up @@ -512,7 +512,7 @@ async fn run_complete_file_with_postgres(
_mp: MultiProgress,
_mp_style: ProgressStyle,
) -> Result<()> {
use datafusion_common::plan_err;
use datafusion::common::plan_err;
plan_err!("Can not run with postgres as postgres feature is not enabled")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty brutal -- I edited these files so they compiled and then copied them to the regenerate location, similarly to how it is tone for the runner.rs

// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use std::{path::PathBuf, time::Duration};

use super::{error::Result, normalize, DFSqlLogicTestError};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use indicatif::ProgressBar;
use log::Level::{Debug, Info};
use log::{debug, log_enabled, warn};
use sqllogictest::DBOutput;
use tokio::time::Instant;

use crate::engines::output::{DFColumnType, DFOutput};

pub struct DataFusion {
ctx: SessionContext,
relative_path: PathBuf,
pb: ProgressBar,
}

impl DataFusion {
pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) -> Self {
Self {
ctx,
relative_path,
pb,
}
}

fn update_slow_count(&self) {
let msg = self.pb.message();
let split: Vec<&str> = msg.split(" ").collect();
let mut current_count = 0;

if split.len() > 2 {
// third match will be current slow count
current_count = split[2].parse::<i32>().unwrap();
}

current_count += 1;

self.pb
.set_message(format!("{} - {} took > 500 ms", split[0], current_count));
}
}

#[async_trait]
impl sqllogictest::AsyncDB for DataFusion {
type Error = DFSqlLogicTestError;
type ColumnType = DFColumnType;

async fn run(&mut self, sql: &str) -> Result<DFOutput> {
if log_enabled!(Debug) {
debug!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
);
}

let start = Instant::now();
let result = run_query(&self.ctx, sql).await;
let duration = start.elapsed();

if duration.gt(&Duration::from_millis(500)) {
self.update_slow_count();
}

self.pb.inc(1);

if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) {
warn!(
"[{}] Running query took more than 2 sec ({duration:?}): \"{sql}\"",
self.relative_path.display()
);
}

result
}

/// Engine name of current database.
fn engine_name(&self) -> &str {
"DataFusion"
}

/// [`DataFusion`] calls this function to perform sleep.
///
/// The default implementation is `std::thread::sleep`, which is universal to any async runtime
/// but would block the current thread. If you are running in tokio runtime, you should override
/// this by `tokio::time::sleep`.
async fn sleep(dur: Duration) {
tokio::time::sleep(dur).await;
}
}

async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> Result<DFOutput> {
let df = ctx.sql(sql.into().as_str()).await?;
let task_ctx = Arc::new(df.task_ctx());
let plan = df.create_physical_plan().await?;

let stream = execute_stream(plan, task_ctx)?;
let types = normalize::convert_schema_to_types(stream.schema().fields());
let results: Vec<RecordBatch> = collect(stream).await?;
let rows = normalize::convert_batches(results)?;

if rows.is_empty() && types.is_empty() {
Ok(DBOutput::StatementComplete(0))
} else {
Ok(DBOutput::Rows { types, rows })
}
}
Loading