Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lazily compute for null count(seems help to high cardinality aggr) #6155

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ mod tests {

let record_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
assert_eq!(record_batch.get_array_memory_size(), 364);
assert_eq!(record_batch.get_array_memory_size(), 380);
}

fn check_batch(record_batch: RecordBatch, num_rows: usize) {
Expand Down
84 changes: 71 additions & 13 deletions arrow-buffer/src/buffer/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,66 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::atomic::{AtomicI64, Ordering};

use crate::bit_iterator::{BitIndexIterator, BitIterator, BitSliceIterator};
use crate::buffer::BooleanBuffer;
use crate::{Buffer, MutableBuffer};

const UNINITIALIZED_NULL_COUNT: i64 = -1;

#[derive(Debug)]
pub enum NullCount {
Eager(usize),
Lazy(AtomicI64),
Copy link
Contributor

@XiangpengHao XiangpengHao Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the state here is a bit complicated, here we have three states: Eager, Lazy (initialized), Lazy (uninitialized). And we use both enum and fence value to differentiate them.

I wonder if we can simplify this with just two states: uninitialized and initialized; and when we try to read a uninitialized value, we count the value and set it to initialized state.

struct NullCount {
  val: AtomicI64,
}

impl NullCount {
  fn get(&self, ...) -> i64 {
     let val = self.val.load(...);
     if val == UNINIT {
         val.store(cal_null_count);
     } else {
         return val
     }
  }
}

This way we only have two states to manage, and we also keep the NullCount to be 8 byte, instead of the current 24 bytes, which might help with performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the state here is a bit complicated, here we have three states: Eager, Lazy (initialized), Lazy (uninitialized). And we use both enum and fence value to differentiate them.

I wonder if we can simplify this with just two states: uninitialized and initialized; and when we try to read a uninitialized value, we count the value and set it to initialized state.

struct NullCount {
  val: AtomicI64,
}

impl NullCount {
  fn get(&self, ...) -> i64 {
     let val = self.val.load(...);
     if val == UNINIT {
         val.store(cal_null_count);
     } else {
         return val
     }
  }
}

This way we only have two states to manage, and we also keep the NullCount to be 8 byte, instead of the current 24 bytes, which might help with performance

I don't want to make it so complicated too... And I impl it with two state at the beginning.

But I found it seems to make some queries slower (maybe noise?) in my first version about this pr, and I encountered a strange performance problem maybe related to atomic in my another pr, so for trying best to avoid the possible cost of atomic, I refactor it to this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for trying best to avoid the possible cost of atomic

Atomic with Ordering::Relaxed has no extra cost comparing to directly loading/storing the value, as shown here: https://godbolt.org/z/4oreernqh, they compile to same instructions. The regressions are probably not from atomics.

Relaxed ordering is sufficient here, more strict ordering won't help with correctness and is slower. If we really want to prevent multiple threads to count the null count, we has to use Mutex or RwLock, which is a bit overkill for our use case, imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for trying best to avoid the possible cost of atomic

Atomic with Ordering::Relaxed has no extra cost comparing to directly loading/storing the value, as shown here: https://godbolt.org/z/4oreernqh, they compile to same instructions. The regressions are probably not from atomics.

Relaxed ordering is sufficient here, more strict ordering won't help with correctness and is slower. If we really want to prevent multiple threads to count the null count, we has to use Mutex or RwLock, which is a bit overkill for our use case, imo.

😄 Thanks, I indeed don't know enough about atomic.

}

impl Clone for NullCount {
fn clone(&self) -> Self {
match self {
Self::Eager(v) => Self::Eager(*v),
Self::Lazy(v) => {
let v = v.load(Ordering::Relaxed);
Self::Lazy(AtomicI64::new(v))
}
}
}
}

/// A [`BooleanBuffer`] used to encode validity for arrow arrays
///
/// As per the [Arrow specification], array validity is encoded in a packed bitmask with a
/// `true` value indicating the corresponding slot is not null, and `false` indicating
/// that it is null.
///
/// [Arrow specification]: https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone)]
pub struct NullBuffer {
buffer: BooleanBuffer,
null_count: usize,
null_count: NullCount,
}

impl PartialEq for NullBuffer {
fn eq(&self, other: &Self) -> bool {
self.buffer == other.buffer
}
}

impl Eq for NullBuffer {}

impl NullBuffer {
/// Create a new [`NullBuffer`] computing the null count
pub fn new(buffer: BooleanBuffer) -> Self {
let null_count = buffer.len() - buffer.count_set_bits();
// Expensive to calc the null count, we should lazily compute it when it is really needed
let null_count = NullCount::Lazy(AtomicI64::new(UNINITIALIZED_NULL_COUNT));
Self { buffer, null_count }
}

/// Create a new [`NullBuffer`] of length `len` where all values are null
pub fn new_null(len: usize) -> Self {
Self {
buffer: BooleanBuffer::new_unset(len),
null_count: len,
null_count: NullCount::Eager(len),
}
}

Expand All @@ -53,7 +84,7 @@ impl NullBuffer {
pub fn new_valid(len: usize) -> Self {
Self {
buffer: BooleanBuffer::new_set(len),
null_count: 0,
null_count: NullCount::Eager(0),
}
}

Expand All @@ -63,7 +94,10 @@ impl NullBuffer {
///
/// `buffer` must contain `null_count` `0` bits
pub unsafe fn new_unchecked(buffer: BooleanBuffer, null_count: usize) -> Self {
Self { buffer, null_count }
Self {
buffer,
null_count: NullCount::Eager(null_count),
}
}

/// Computes the union of the nulls in two optional [`NullBuffer`]
Expand All @@ -81,9 +115,12 @@ impl NullBuffer {

/// Returns true if all nulls in `other` also exist in self
pub fn contains(&self, other: &NullBuffer) -> bool {
if other.null_count == 0 {
return true;
if let NullCount::Eager(v) = &other.null_count {
if *v == 0 {
return true;
}
}

let lhs = self.inner().bit_chunks().iter_padded();
let rhs = other.inner().bit_chunks().iter_padded();
lhs.zip(rhs).all(|(l, r)| (l & !r) == 0)
Expand All @@ -106,9 +143,17 @@ impl NullBuffer {
crate::bit_util::set_bit(buffer.as_mut(), i * count + j)
}
}

let null_count = if let NullCount::Eager(v) = &self.null_count {
NullCount::Eager(v * count)
} else {
// TODO: not sure about if it is better to load the atomic and attempt to reuse the compute result
NullCount::Lazy(AtomicI64::new(UNINITIALIZED_NULL_COUNT))
};

Self {
buffer: BooleanBuffer::new(buffer.into(), 0, capacity),
null_count: self.null_count * count,
null_count,
}
}

Expand All @@ -131,9 +176,20 @@ impl NullBuffer {
}

/// Returns the null count for this [`NullBuffer`]
#[inline]
pub fn null_count(&self) -> usize {
self.null_count
match &self.null_count {
NullCount::Eager(v) => *v,
NullCount::Lazy(v) => {
let cached_null_count = v.load(Ordering::Acquire);
if cached_null_count != UNINITIALIZED_NULL_COUNT {
return cached_null_count as usize;
}

let computed_null_count = self.buffer.len() - self.buffer.count_set_bits();
v.store(computed_null_count as i64, Ordering::Release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if using Ordering::Relaxed https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html#variant.Relaxed would make this PR potentially faster

Specifically, it might make the generated code simpler

Also, was there a reason to remove #[inline] - maybe that could account for the slow down 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

( They would be similiar in x86, just affect reordering )?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- I was thinking about ways to reduce the overhead of using atomic instructions on inlining, etc -- it may not be a good idea

Copy link
Member

@mapleFU mapleFU Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atomic is just a load and store instr? but count_set_bits() maybe not suitable for inline( or it's just __builtin counting zeros)?

Maybe let compiler decide it is ok

Copy link
Contributor Author

@Rachelint Rachelint Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if using Ordering::Relaxed https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html#variant.Relaxed would make this PR potentially faster

Specifically, it might make the generated code simpler

Also, was there a reason to remove #[inline] - maybe that could account for the slow down 🤔

I am not sure should we use Relaxed or Aquire + Release, I switch to Aquire + Release because I am worried about the situation that:

ArrayRef is hold by multiple threads, and they call the null_count function concurrently, if we use Relaxed, the count_set_bits computation is possible to be performed many times...

Another reason is that I found the strongest SeqCst used in arrow cpp
https://github.com/apache/arrow/blob/187197c369058f7d1377c1b161c469a9e4542caf/cpp/src/arrow/array/data.cc#L206-L218

Copy link
Contributor

@tustvold tustvold Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayRef is hold by multiple threads, and they call the null_count function concurrently, if we use Relaxed, the count_set_bits computation is possible to be performed many times...

No memory ordering will prevent this, you'd need some mutual exclusion primitive if this was the goal. However, it doesn't really matter if two threads race to compute the null count as they'll just set it to the same value. As we aren't using the write to synchronise other memory operations, Relaxed would be perfectly valid here.

I do think the proposal to simplify the state is worth considering, as it will potentially remove a conditional load

Copy link
Contributor Author

@Rachelint Rachelint Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayRef is hold by multiple threads, and they call the null_count function concurrently, if we use Relaxed, the count_set_bits computation is possible to be performed many times...

No memory ordering will prevent this, you'd need some mutual exclusion primitive if this was the goal. However, it doesn't really matter if two threads race to compute the null count as they'll just set it to the same value. As we aren't using the write to synchronise other memory operations, Relaxed would be perfectly valid here.

I do think the proposal to simplify the state is worth considering, as it will potentially remove a conditional load

Sorry, maybe I didn't state it clearly. Is it possible that, due to compiler reordering or cpu cache consistence problem, and the actual execution order becomes:

  if cached_null_count != UNINITIALIZED_NULL_COUNT {
      return cached_null_count as usize;
  }
  let cached_null_count = v.load(Ordering::Relaxed);

  let computed_null_count = self.buffer.len() - self.buffer.count_set_bits();

And finally, the computation will be performance one more times? Although it actually don't affect the correctness, and maybe rare to happen.

cc @XiangpengHao

Copy link
Contributor

@tustvold tustvold Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, all processors have coherent caches and compilers respect single threaded data dependencies. You can't observe two different values for the same memory locations simultaneously. The ordering concerns whether operations performed to other memory locations are guaranteed to be visible following this operation, we aren't using this as a synchronisation primitive and so this doesn't matter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, all processors have coherent caches and compilers respect single threaded data dependencies. You can't observe two different values for the same memory locations simultaneously. The ordering concerns whether operations performed to other memory locations are guaranteed to be visible following this operation, we aren't using this as a synchronisation primitive and so this doesn't matter

Ok, thanks, I misunderstand it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may hurt the cpu cache, too? But I know few about the detial.

Comparing to Relaxed, Acq-Rel would just re-ordering in x86, but in weak ordering machine it might affect the cache an would require memory barrier or other instr

computed_null_count
}
}
}

/// Returns `true` if the value at `idx` is not null
Expand Down Expand Up @@ -189,8 +245,10 @@ impl NullBuffer {
&self,
f: F,
) -> Result<(), E> {
if self.null_count == self.len() {
return Ok(());
if let NullCount::Eager(v) = &self.null_count {
if *v == self.len() {
return Ok(());
}
}
self.valid_indices().try_for_each(f)
}
Expand Down
Loading