Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
revert "split huge batch into smaller ones during projection (#448)", This reverts commit 7d33df1.
 fix issue "Nondeterministic expression xxx should be initialized before eval".
 fix issue "IO error: Bad file descriptor" while accessing spill file.
 fix jvm coredump by removing chrono dependency for printing logs.

Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored May 9, 2024
1 parent ab89e5f commit d9272a6
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 77 deletions.
24 changes: 10 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ parquet = { version = "50.0.0" }
serde_json = { version = "1.0.96" }

[patch.crates-io]
# datafusion: branch=v30-blaze
datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9d1f7f2fc"}
datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9d1f7f2fc"}
datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9d1f7f2fc"}
datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9d1f7f2fc"}
datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9d1f7f2fc"}
datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9d1f7f2fc"}
# datafusion: branch=v36-blaze
datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "8cd557f32"}
datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "8cd557f32"}
datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "8cd557f32"}
datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "8cd557f32"}
datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "8cd557f32"}
datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "8cd557f32"}

# arrow: branch=v45-blaze
# arrow: branch=v50-blaze
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "b1e0762ba4"}
arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "b1e0762ba4"}
arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "b1e0762ba4"}
Expand Down
1 change: 0 additions & 1 deletion native-engine/blaze-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ default = ["prost/no-recursion-limit"]
[dependencies]
arrow = { workspace = true }
base64 = "*"
chrono = "*"
datafusion = { workspace = true }
datafusion-ext-commons = { workspace = true }
datafusion-ext-exprs = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use arrow::{
datatypes::{Field, FieldRef, SchemaRef},
};
use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine};
use chrono::DateTime;
use datafusion::{
common::stats::Precision,
datasource::{
Expand Down Expand Up @@ -1095,7 +1094,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(format!("/{}", BASE64_URL_SAFE_NO_PAD.encode(&val.path))),
size: val.size as usize,
last_modified: DateTime::default(),
last_modified: Default::default(),
e_tag: None,
version: None,
},
Expand Down
1 change: 0 additions & 1 deletion native-engine/blaze/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ default = ["tokio/rt-multi-thread"]
arrow = { workspace = true }
blaze-jni-bridge = { workspace = true }
blaze-serde = { workspace = true }
chrono = "0.4"
datafusion = { workspace = true }
datafusion-ext-commons = { workspace = true }
datafusion-ext-plans = { workspace = true }
Expand Down
21 changes: 15 additions & 6 deletions native-engine/blaze/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::Local;
use std::time::Instant;

use log::{Level, LevelFilter, Log, Metadata, Record};
use once_cell::sync::OnceCell;

const MAX_LEVEL: Level = Level::Info;

pub fn init_logging() {
log::set_logger(&SimpleLogger).expect("error setting logger");
static LOGGER: OnceCell<SimpleLogger> = OnceCell::new();
let logger = LOGGER.get_or_init(|| SimpleLogger {
start_instant: Instant::now(),
});

log::set_logger(logger).expect("error setting logger");
log::set_max_level(LevelFilter::Info);
}

#[derive(Clone, Copy)]
struct SimpleLogger;
struct SimpleLogger {
start_instant: Instant,
}

impl Log for SimpleLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
Expand All @@ -32,10 +41,10 @@ impl Log for SimpleLogger {

fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let local_time = Local::now().format("%d/%m/%Y %H:%M:%S");
let elapsed = Instant::now() - self.start_instant;
let elapsed_sec = elapsed.as_secs_f64();
eprintln!(
"{} [{}] Blaze - {}",
local_time,
"(+{elapsed_sec:.3}s) [{}] Blaze - {}",
record.level(),
record.args()
);
Expand Down
9 changes: 7 additions & 2 deletions native-engine/datafusion-ext-plans/src/memmgr/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::{
any::Any,
fs::File,
fs::{File, OpenOptions},
io::{BufReader, BufWriter, Cursor, Read, Seek, Write},
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -88,7 +88,12 @@ impl FileSpill {
jni_call!(BlazeOnHeapSpillManager(hsm.as_obj()).getDirectWriteSpillToDiskFile()-> JObject)?
.as_obj()
.into())?;
let file = File::create(file_name)?;
let file = OpenOptions::new() // create file and open under rw mode
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(&file_name)?;
Ok(Self(file, spill_metrics.clone()))
} else {
let file = tempfile::tempfile()?;
Expand Down
44 changes: 6 additions & 38 deletions native-engine/datafusion-ext-plans/src/project_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

use std::{any::Any, fmt::Formatter, sync::Arc};

use arrow::{
datatypes::{Field, Fields, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow::datatypes::{Field, Fields, Schema, SchemaRef};
use datafusion::{
common::{Result, Statistics},
execution::TaskContext,
Expand All @@ -31,9 +28,7 @@ use datafusion::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::{
array_size::ArraySize, streams::coalesce_stream::CoalesceInput, suggested_output_batch_mem_size,
};
use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
use futures::{stream::once, FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;

Expand Down Expand Up @@ -221,43 +216,16 @@ async fn execute_project_with_filtering(
InputBatchStatistics::from_metrics_set_and_blaze_conf(&metrics, partition)?,
input.execute_projected(partition, context.clone(), &projection)?,
)?;
let num_output_cols = output_schema.fields().len();

context.output_with_sender("Project", output_schema, move |sender| async move {
while let Some(batch) = input.next().await.transpose()? {
let mut timer = baseline_metrics.elapsed_compute().timer();
let output_batch = cached_expr_evaluator.filter_project(&batch)?;
drop(batch);

if batch.num_rows() == 0 {
continue;
}
for batch in split_batch_by_estimated_size(batch, num_output_cols) {
let output_batch = cached_expr_evaluator.filter_project(&batch)?;
baseline_metrics.record_output(output_batch.num_rows());
sender.send(Ok(output_batch), Some(&mut timer)).await;
}
baseline_metrics.record_output(output_batch.num_rows());
sender.send(Ok(output_batch), Some(&mut timer)).await;
}
Ok(())
})
}

fn split_batch_by_estimated_size(batch: RecordBatch, num_output_cols: usize) -> Vec<RecordBatch> {
let target_mem_size = suggested_output_batch_mem_size();
let target_num_batches = batch.get_array_mem_size() * 2 * num_output_cols
/ batch.num_columns().max(1)
/ target_mem_size;

if target_num_batches <= 1 {
return vec![batch];
}
let target_num_rows = (batch.num_rows() / target_num_batches.max(1)).max(1);

let mut batches = vec![];
let mut offset = 0;

while offset < batch.num_rows() {
let num_rows = target_num_rows.min(batch.num_rows() - offset);
batches.push(batch.slice(offset, num_rows));
offset += num_rows;
}
batches
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ case class SparkUDFWrapperContext(serialized: ByteBuffer) extends Logging {
val bytes = new Array[Byte](serialized.remaining())
serialized.get(bytes)
bytes
}) match {
case (nondeterministic: Nondeterministic, paramsSchema) =>
})

// initialize all nondeterministic children exprs
expr.foreach {
case nondeterministic: Nondeterministic =>
nondeterministic.initialize(TaskContext.get.partitionId())
(nondeterministic, paramsSchema)
case (expr, paramsSchema) =>
(expr, paramsSchema)
case _ =>
}

private val dictionaryProvider: DictionaryProvider = new MapDictionaryProvider()
Expand Down

0 comments on commit d9272a6

Please sign in to comment.