Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 116 additions & 173 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ serde_json = { version = "1", features = ["raw_value"] }
# Documentation: https://docs.rs/vercel_runtime/latest/vercel_runtime
vercel_runtime = { version = "1.1.4" }
datafusion = { version = "49.0.0" }
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", branch = "main" }
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", branch = "robtandy/move_chrono_to_main_dependencies" }
serde = { version = "1.0.203", features = ["derive"] }
futures = "0.3.31"
url = "2.5.7"
Expand Down
70 changes: 63 additions & 7 deletions api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::{displayable, execute_stream, ExecutionPlan};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use datafusion_distributed::{
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext,
display_plan_graphviz, ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver,
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext,
};
use futures::TryStreamExt;
use hyper_util::rt::TokioIo;
Expand Down Expand Up @@ -99,6 +99,18 @@ async fn main() -> Result<(), Error> {
struct SqlRequest {
distributed: bool,
stmts: Vec<String>,
#[serde(default = "default_partitions")]
partitions: usize,
#[serde(default = "default_partitions_per_task")]
partitions_per_task: usize,
}

fn default_partitions() -> usize {
4
}

fn default_partitions_per_task() -> usize {
2
}

pub async fn handler(req: Request) -> Result<Response<Body>, Error> {
Expand All @@ -107,7 +119,15 @@ pub async fn handler(req: Request) -> Result<Response<Body>, Error> {
None => return throw_error("No sql request was passed", None, StatusCode::BAD_REQUEST),
};

let res = match execute_statements(req.stmts, "api/parquet", req.distributed).await {
let res = match execute_statements(
req.stmts,
"api/parquet",
req.distributed,
req.partitions,
req.partitions_per_task,
)
.await
{
Ok(res) => res,
Err(err) => {
return throw_error(
Expand Down Expand Up @@ -152,23 +172,42 @@ struct SqlResult {
rows: Vec<Vec<String>>,
logical_plan: String,
physical_plan: String,
// if the plan is distributed, this contains the graphviz representation in dot format
graphviz: String,
// if the plan is distributed, this will be replaced browserside, with the svg html of the graphviz plan
graphviz_plan: String,
}

async fn execute_statements(
stmts: Vec<String>,
path: impl Display,
distributed: bool,
partitions: usize,
partitions_per_task: usize,
) -> datafusion::error::Result<SqlResult> {
let options = FormatOptions::default().with_display_error(true);
let cfg = SessionConfig::new().with_information_schema(true);
let mut cfg = SessionConfig::new()
.with_information_schema(true)
.with_target_partitions(partitions);

let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(cfg);
.with_config(cfg.clone());
if distributed {
cfg = cfg.set_str(
"datafusion.optimizer.hash_join_single_partition_threshold",
"0",
);
cfg = cfg.set_str(
"datafusion.optimizer.hash_join_single_partition_threshold.rows",
"0",
);

builder = builder
.with_config(cfg)
.with_physical_optimizer_rule(Arc::new(
DistributedPhysicalOptimizerRule::default().with_maximum_partitions_per_task(1),
DistributedPhysicalOptimizerRule::default()
.with_maximum_partitions_per_task(partitions_per_task),
))
.with_distributed_channel_resolver(CHANNEL_RESOLVER.clone());
}
Expand Down Expand Up @@ -222,11 +261,17 @@ async fn execute_statements(
rows.push(vec!["...".to_string(); columns.len()]);
}

let physical_plan_str =
display_physical_plan(&physical_plan).unwrap_or_else(|err| err.to_string());
let graphviz_str = display_graphviz_plan(&physical_plan).unwrap_or_else(|err| err.to_string());

Ok(SqlResult {
columns,
rows,
logical_plan: logical_plan_str,
physical_plan: display_physical_plan(&physical_plan).unwrap_or_else(|err| err.to_string()),
physical_plan: physical_plan_str,
graphviz_plan: "".to_owned(),
graphviz: graphviz_str,
})
}

Expand All @@ -238,6 +283,11 @@ fn display_physical_plan(physical_plan: &Arc<dyn ExecutionPlan>) -> Result<Strin
Ok(physical_plan_str)
}

fn display_graphviz_plan(physical_plan: &Arc<dyn ExecutionPlan>) -> Result<String, Error> {
let graphviz_plan_str = display_plan_graphviz(physical_plan.clone())?;
Ok(graphviz_plan_str)
}

async fn load_parquet_files(base: String, ctx: &SessionContext) -> Result<(), DataFusionError> {
let mut futures = vec![];
for entry in fs::read_dir(&base)? {
Expand Down Expand Up @@ -270,6 +320,8 @@ mod tests {
],
format!("{}/api/parquet", env!("CARGO_MANIFEST_DIR")),
false,
4,
2,
)
.await?;

Expand All @@ -289,6 +341,8 @@ mod tests {
vec!["SELECT * FROM weather LIMIT 10".to_string()],
format!("{}/api/parquet", env!("CARGO_MANIFEST_DIR")),
false,
4,
2,
)
.await?;

Expand Down Expand Up @@ -346,6 +400,8 @@ where
.to_string()],
format!("{}/api/parquet", env!("CARGO_MANIFEST_DIR")),
true,
4,
2,
)
.await?;

Expand Down
53 changes: 53 additions & 0 deletions components/PartitionsPerTaskSelect.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import React from 'react';
import * as Select from '@radix-ui/react-select';
import { ChevronDownIcon, ChevronUpIcon } from '@radix-ui/react-icons';

export interface PartitionsPerTaskSelectProps {
partitionsPerTask: number;
maxPartitions: number;
onChange: (partitionsPerTask: number) => void;
}

export function PartitionsPerTaskSelect({ partitionsPerTask = 1, maxPartitions = 1, onChange }: PartitionsPerTaskSelectProps) {
const options = Array.from({ length: maxPartitions }, (_, i) => i + 1);

return (
<div className="flex items-center gap-2">
<label htmlFor="partitions-per-task-select" className="text-sm text-text-secondary">
Per Task
</label>
<Select.Root value={partitionsPerTask?.toString() || '1'} onValueChange={(value) => onChange(parseInt(value))}>
<Select.Trigger
id="partitions-per-task-select"
className="inline-flex items-center justify-between gap-1 px-3 py-1 bg-secondary-surface text-sm text-text-primary rounded border border-gray-600 hover:border-gray-500 focus:outline-none focus:ring-2 focus:ring-blue-500 cursor-pointer min-w-[60px]"
>
<Select.Value />
<Select.Icon>
<ChevronDownIcon />
</Select.Icon>
</Select.Trigger>
<Select.Portal>
<Select.Content className="overflow-hidden bg-secondary-surface rounded-md shadow-lg border border-gray-600 z-50">
<Select.ScrollUpButton className="flex items-center justify-center h-6 bg-secondary-surface text-text-secondary cursor-default">
<ChevronUpIcon />
</Select.ScrollUpButton>
<Select.Viewport className="p-1">
{options.map((num) => (
<Select.Item
key={num}
value={num.toString()}
className="text-sm text-text-primary rounded-sm flex items-center px-6 py-2 relative select-none hover:bg-blue-600 focus:bg-blue-600 cursor-pointer outline-none"
>
<Select.ItemText>{num}</Select.ItemText>
</Select.Item>
))}
</Select.Viewport>
<Select.ScrollDownButton className="flex items-center justify-center h-6 bg-secondary-surface text-text-secondary cursor-default">
<ChevronDownIcon />
</Select.ScrollDownButton>
</Select.Content>
</Select.Portal>
</Select.Root>
</div>
);
}
50 changes: 50 additions & 0 deletions components/PartitionsSelect.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import React from 'react';
import * as Select from '@radix-ui/react-select';
import { ChevronDownIcon, ChevronUpIcon } from '@radix-ui/react-icons';

export interface PartitionsSelectProps {
partitions: number;
onChange: (partitions: number) => void;
}

export function PartitionsSelect({ partitions = 4, onChange }: PartitionsSelectProps) {
return (
<div className="flex items-center gap-2">
<label htmlFor="partitions-select" className="text-sm text-text-secondary">
Partitions
</label>
<Select.Root value={partitions?.toString() || '4'} onValueChange={(value) => onChange(parseInt(value))}>
<Select.Trigger
id="partitions-select"
className="inline-flex items-center justify-between gap-1 px-3 py-1 bg-secondary-surface text-sm text-text-primary rounded border border-gray-600 hover:border-gray-500 focus:outline-none focus:ring-2 focus:ring-blue-500 cursor-pointer min-w-[60px]"
>
<Select.Value />
<Select.Icon>
<ChevronDownIcon />
</Select.Icon>
</Select.Trigger>
<Select.Portal>
<Select.Content className="overflow-hidden bg-secondary-surface rounded-md shadow-lg border border-gray-600 z-50">
<Select.ScrollUpButton className="flex items-center justify-center h-6 bg-secondary-surface text-text-secondary cursor-default">
<ChevronUpIcon />
</Select.ScrollUpButton>
<Select.Viewport className="p-1">
{Array.from({ length: 10 }, (_, i) => i + 1).map((num) => (
<Select.Item
key={num}
value={num.toString()}
className="text-sm text-text-primary rounded-sm flex items-center px-6 py-2 relative select-none hover:bg-blue-600 focus:bg-blue-600 cursor-pointer outline-none"
>
<Select.ItemText>{num}</Select.ItemText>
</Select.Item>
))}
</Select.Viewport>
<Select.ScrollDownButton className="flex items-center justify-center h-6 bg-secondary-surface text-text-secondary cursor-default">
<ChevronDownIcon />
</Select.ScrollDownButton>
</Select.Content>
</Select.Portal>
</Select.Root>
</div>
);
}
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
},
"dependencies": {
"@monaco-editor/react": "^4.7.0",
"@radix-ui/react-icons": "^1.3.2",
"@radix-ui/react-select": "^2.2.6",
"@radix-ui/react-switch": "^1.2.6",
"@radix-ui/react-tabs": "^1.1.13",
"@types/react-syntax-highlighter": "^15.5.13",
"@viz-js/viz": "^3.17.0",
"monaco-editor": "^0.52.2",
"monaco-sql-languages": "^0.15.1",
"monaco-vim": "^0.4.2",
Expand Down
Loading