Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Add NontemporalArithmeticChunked #17708

Closed
wants to merge 1 commit into from

Conversation

ruihe774
Copy link
Contributor

For arrays that are larger than cache size, arithmetic is usually memory bound. Temporal store (std::ptr::write) used in ArithmeticChunked will quickly fill up the cache, discarding previously cached data. What's worse, because we usually perform operations from the head of an array to the tail, after temporal store fills up the cache, the following operations will have to re-read the array from the memory to cache, which is a significant performance penalty.

This PR adds NontemporalArithmeticChunked that uses nontemporal store (std::intrinsics::nontemporal_store), which 1) does not allocate new lines in the cache, instead storing directly to the memory with higher bandwidth, 2) is not ordered with regard to other loads/stores, giving the CPU more flexibility in hiding memory latency. There are public benchmarks w.r.t. temporal store and nontemporal store, demonstrating the performance improvement of nontemporal store on large amount of data.

NontemporalArithmeticChunked is only enabled when the feature nontemporal is enabled, which is enabled by the feature nightly. The Rust doc says std::intrinsics::nontemporal_store will probably never become stable, so NontemporalArithmeticChunked will never, either. The doc also says std::intrinsics::nontemporal_store violates the memory model of Rust; we do not care about that; after all, no one will put atomic integers in a PrimitiveArray.

This PR only adds a trait in Rust. It is not used in lazy and is not exposed to Python, yet. In the future, optimizations like automatically choosing nontemporal arithmetic when operands are large arrays can be added.

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature rust Related to Rust Polars labels Jul 18, 2024
Copy link

codecov bot commented Jul 18, 2024

Codecov Report

Attention: Patch coverage is 41.81818% with 448 lines in your changes missing coverage. Please review.

Project coverage is 80.27%. Comparing base (235cad3) to head (aa73afc).
Report is 2 commits behind head on main.

Files Patch % Lines
...olars-core/src/chunked_array/arithmetic/numeric.rs 0.00% 296 Missing ⚠️
crates/polars-compute/src/arithmetic/unsigned.rs 51.69% 57 Missing ⚠️
crates/polars-compute/src/arithmetic/signed.rs 74.41% 44 Missing ⚠️
crates/polars-compute/src/arithmetic/float.rs 79.16% 25 Missing ⚠️
crates/polars-compute/src/arithmetic/mod.rs 45.23% 23 Missing ⚠️
crates/polars-compute/src/arity.rs 86.36% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #17708      +/-   ##
==========================================
- Coverage   80.37%   80.27%   -0.10%     
==========================================
  Files        1500     1500              
  Lines      196605   197179     +574     
  Branches     2793     2793              
==========================================
+ Hits       158016   158291     +275     
- Misses      38076    38375     +299     
  Partials      513      513              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@orlp
Copy link
Collaborator

orlp commented Jul 19, 2024

We are working on a new streaming engine which has much smaller chunks of data (1-5MiB, roughly), which will cause the kernels to operate mostly on in-cache data. We've already seen benchmarks where it's 3x faster than the current streaming engine due to this.

Since in the future we want to run most queries using this method, it wouldn't really help a lot of things to also have the nontemporal operations (in fact using them in the new streaming engine would be significantly worse), but they do add quite a bit of code to maintain, also making it harder to add new arithmetic operations/kernels.

Finally the Rust docs specifically state that you should NOT use the function: std::intrinsics::nontemporal_store, which also makes it a bit questionable to include it here.


Can you run a benchmark how much faster it is for the current engine? If it speeds up our current operations by a lot I could consider it but if it's only a small benefit that vanishes as soon as we turn on the new streaming engine by default, I'm not that excited for it.

@ruihe774
Copy link
Contributor Author

We are working on a new streaming engine which has much smaller chunks of data (1-5MiB, roughly), which will cause the kernels to operate mostly on in-cache data.

1-5MiB is still larger than L1 and L2 cache in most CPUs.

And this PR aims to provide an option. The new streaming engine is not used everywhere. When using rust polars, for example pyo3 extensions and plugins, we usually directly operate on arrays. (And the lazy feature is not enabled.)

Finally the Rust docs specifically state that you should NOT use the function: std::intrinsics::nontemporal_store, which also makes it a bit questionable to include it here.

You can have a look at the corresponding issue (rust-lang/rust#114582). They mainly focus on the memory model (i.e. load/store ordering), about which we do not care in numeric computation.

Can you run a benchmark how much faster it is for the current engine?

Yes, I'm going to.

@ruihe774
Copy link
Contributor Author

ruihe774 commented Jul 20, 2024

I've wrote a simple benchmark:

use std::hint::black_box;
use std::time::Instant;

use mimalloc::MiMalloc;
use polars::chunked_array::arithmetic::NontemporalArithmeticChunked;
use polars::prelude::*;
use rand::RngCore;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

const N: usize = 100000000;
const ITERATIONS: usize = 100;

fn temporal_bench(mut a: UInt32Chunked, mut b: UInt32Chunked) -> UInt32Chunked {
    for i in 1..=ITERATIONS {
        eprint!("{i:03}\r");
        let c = (&a).wrapping_add(&b);
        std::mem::swap(&mut a, &mut b);
        let _ = std::mem::replace(&mut b, c);
    }
    b
}

fn nontemporal_bench(mut a: UInt32Chunked, mut b: UInt32Chunked) -> UInt32Chunked {
    for i in 1..=ITERATIONS {
        eprint!("{i:03}\r");
        let c = (&a).wrapping_add_nontemporal(&b);
        std::mem::swap(&mut a, &mut b);
        let _ = std::mem::replace(&mut b, c);
    }
    b
}

fn main() {
    unsafe {
        use libmimalloc_sys::*;
        // allow large os pages
        mi_option_set_enabled(mi_option_large_os_pages, true);
        // disable mi_option_purge_delay
        // we deallocate frequently, and purging is slow
        mi_option_set(15, -1);
    }

    let mut rng = rand::thread_rng();
    let mut a = Vec::with_capacity(N);
    let mut b = Vec::with_capacity(N);
    unsafe { a.set_len(N) };
    unsafe { b.set_len(N) };
    rng.fill_bytes(unsafe { std::mem::transmute(a.as_mut_slice()) });
    rng.fill_bytes(unsafe { std::mem::transmute(b.as_mut_slice()) });
    let a = UInt32Chunked::from_vec("a", a);
    let b = UInt32Chunked::from_vec("b", b);

    // warm up rounds
    let _ = black_box(temporal_bench(black_box(a.clone()), black_box(b.clone())));

    let start = Instant::now();
    let r1 = black_box(temporal_bench(black_box(a.clone()), black_box(b.clone())));
    let end = Instant::now();
    let duration_temporal = (end - start).as_millis();

    let start = Instant::now();
    let r2 = black_box(nontemporal_bench(black_box(a.clone()), black_box(b.clone())));
    let end = Instant::now();
    let duration_nontemporal = (end - start).as_millis();

    for v1 in r1.into_no_null_iter().take(10) {
        print!("{v1} ");
    }
    println!();
    for v2 in r2.into_no_null_iter().take(10) {
        print!("{v2} ");
    }
    println!();

    println!("temporal:\t{duration_temporal} ms");
    println!("nontemporal:\t{duration_nontemporal} ms");
}

On my machine, the result is:

temporal:       4075 ms
nontemporal:    3767 ms

Nontemporal version is ~8% faster. You can also have a try.

@ruihe774
Copy link
Contributor Author

ruihe774 commented Jul 20, 2024

I've wrote a bench that matches the case in new stream engine as well. In this bench N is 1000000, so the array is ~4MB:

use std::hint::black_box;
use std::time::Instant;

use mimalloc::MiMalloc;
use polars::chunked_array::arithmetic::NontemporalArithmeticChunked;
use polars::prelude::*;
use rand::RngCore;
use rayon::prelude::*;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

const N: usize = 1000000;
const ITERATIONS: usize = 1000;

fn temporal_bench(a: UInt32Chunked, b: UInt32Chunked) {
    for _ in 1..=ITERATIONS {
        let _ = black_box(black_box(&a).wrapping_add(black_box(&b)));
    }
}

fn nontemporal_bench(a: UInt32Chunked, b: UInt32Chunked) {
    for _ in 1..=ITERATIONS {
        let _ = black_box(black_box(&a).wrapping_add_nontemporal(black_box(&b)));
    }
}

fn main() {
    unsafe {
        use libmimalloc_sys::*;
        // allow large os pages
        mi_option_set_enabled(mi_option_large_os_pages, true);
        // disable mi_option_purge_delay
        // we deallocate frequently, and purging is slow
        mi_option_set(15, -1);
    }

    let mut rng = rand::thread_rng();
    let mut a = Vec::with_capacity(N);
    let mut b = Vec::with_capacity(N);
    unsafe { a.set_len(N) };
    unsafe { b.set_len(N) };
    rng.fill_bytes(unsafe { std::mem::transmute(a.as_mut_slice()) });
    rng.fill_bytes(unsafe { std::mem::transmute(b.as_mut_slice()) });
    let a = UInt32Chunked::from_vec("a", a);
    let b = UInt32Chunked::from_vec("b", b);

    // warm up rounds
    (0..16).into_par_iter().for_each(|_| {
        temporal_bench(a.clone(), b.clone());
    });

    let start = Instant::now();
    (0..32).into_par_iter().for_each(|_| {
        temporal_bench(a.clone(), b.clone());
    });
    let end = Instant::now();
    let duration_temporal = (end - start).as_millis();

    let start = Instant::now();
    (0..32).into_par_iter().for_each(|_| {
        nontemporal_bench(a.clone(), b.clone());
    });
    let end = Instant::now();
    let duration_nontemporal = (end - start).as_millis();

    println!("temporal:\t{duration_temporal} ms");
    println!("nontemporal:\t{duration_nontemporal} ms");
}
temporal:       2000 ms
nontemporal:    1846 ms

I can conclude that for simple operations under the situations:

  • arrays that are far larger than cache size, or
  • if the input arrays are accessed multiple times while the result array is not used immediately (e.g. df.select((pl.col("a") + 1).alias("b"), (pl.col("a") * 2).alias("c"))

the nontemporal version has better performance.

@orlp
Copy link
Collaborator

orlp commented Jul 20, 2024

@ruihe774 Thanks for your benchmarks, I will take a more detailed look when I'm back at work on Monday.

However, from a quick peek I don't think that your latter benchmark matches the typical usage in the new streaming engine. You're not reading back the results at all, you're just discarding them:

let _ = black_box(black_box(&a).wrapping_add_nontemporal(black_box(&b)));

In the new streaming engine the result from the wrapping_add would immediately flow into the next operator where the outcome gets used without it ever leaving cache. So a much more realistic benchmark is this:

fn temporal_bench(mut a: UInt32Chunked, b: UInt32Chunked) {
    for _ in 1..=ITERATIONS {
        a = a.wrapping_add(black_box(&b)));
    }
    black_box(a);
}

fn nontemporal_bench(mut a: UInt32Chunked, b: UInt32Chunked) {
    for _ in 1..=ITERATIONS {
        a = a.wrapping_add_nontermporal(black_box(&b)));
    }
    black_box(a);
}

@ruihe774
Copy link
Contributor Author

In the new streaming engine the result from the wrapping_add would immediately flow into the next operator where the outcome gets used without it ever leaving cache.

Is it also true for single expressions like df.select((pl.col("a") + 1).alias("b"), (pl.col("a") * 2).alias("c")?

And what about the situation that one operation has multiple (e.g., two) inputs? If input A is ready and B is not yet calculated, doesn't it have to wait for B before A gets used?

I'm not familiar with the new stream engine. Please correct me if I'm wrong.

@orlp
Copy link
Collaborator

orlp commented Jul 20, 2024

@ruihe774 Yes, both cases will work in a streaming fashion. It will support arbitrary directed acyclic graphs.

If input A is ready and B is not yet calculated, doesn't it have to wait for B before A gets used?

Yes, the new engine uses async to let the consumer of A + B wait (and possibly do other useful work) while one small bit(called a morsel) of A and B gets computed. Then the sum A + B is computed for that morsel, and the operator will wait for another morsel from A and from B. There is a lot of machinery to ensure we only have a minimal amount of stuff in memory at a time so things stay in cache.

@workingjubilee
Copy link

The doc also says std::intrinsics::nontemporal_store violates the memory model of Rust; we do not care about that; after all, no one will put atomic integers in a PrimitiveArray.

You can have a look at the corresponding issue (rust-lang/rust#114582). They mainly focus on the memory model (i.e. load/store ordering), about which we do not care in numeric computation.

Pray tell, what numerical algorithm are you executing that causes you to write data and then never access it again from any core on the machine? Because I think you have slightly misunderstood the issue if you think this is confined to usage of atomics (and rayon seems to use quite a bit of multithreaded access to slices).

You see, a nontemporal store is not ordered with respect to any other store on the machine. That includes ptr::write.

@workingjubilee
Copy link

workingjubilee commented Aug 2, 2024

@orlp On x86, this code incurs actual unquestionable hardware UB if anyone ever loads or stores to that region of memory again. The classic issue with nontemporal stores is precisely in libraries like polars where people update large slices and then pass a reference to that slice off to somewhere else, which does something like, oh, I don't know...

parr.slice().par_chunks().for_each(|chunk| {
     // It literally does not matter what goes into this block,
     // because it is already undefined behavior.
});

That's UB on x86. You cannot assume that the writes retire before any of the subsequent operations, even the implicit ones.

You cannot even assume e.g. that setting an AtomicBool after all those writes will create a happens-before relationship that will sync later accesses. This is because LLVM does not promise, when you use those operations, to use LOCK or MFENCE or anything like that. It assumes the hardware upholds x86-TSO, and thus will only use normal mov instructions, but those are not ordered with respect to non-temporal stores.

The entire problem with nontemporal_store on x86 is that it leaks weakly-ordered memory semantics into what should be, at that point, normal sequential code.

@coastalwhite
Copy link
Collaborator

I am not as knowledgeable as @orlp or @workingjubilee on this point, but I am in general very much against these kinds of changes that are too compiler specific or behavior that is very UB sensitive. At least before we can show that it significantly speeds up something or makes something easier.

This seems like this would require a lot of inconsideration from all developers into the future to keep this from turning into UB or from completely negating any performance improvements that it might cause now. Especially, if we start considering that this needs to interact with both the streaming and in-memory engine.

@workingjubilee
Copy link

The appropriate way to implement this PR is using things like _mm_stream_ps or other architecture-specific intrinsics.

Once this conga-line of PRs lands, this one will do nothing on architectures where LLVM does not implement a coherent non-temporal store semantics:

@ruihe774
Copy link
Contributor Author

ruihe774 commented Aug 3, 2024

@workingjubilee Thank you for your great explanation. I'm now convinced that std::intrinsics::nontemporal_store should not be used.

@ruihe774 ruihe774 closed this Aug 3, 2024
@RalfJung
Copy link

RalfJung commented Aug 5, 2024

The appropriate way to implement this PR is using things like _mm_stream_ps or other architecture-specific intrinsics.

And before using those, please read the documentation here. The PR was indeed actively unsound due to not using sfence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants