Skip to content

Commit

Permalink
Container GATs, improve traits (#541)
Browse files Browse the repository at this point in the history
* Bump dependency versions

Signed-off-by: Moritz Hoffmann <[email protected]>

* Introduce GATs for containers

Containers are generic over their contents, and over reading and draining.

Signed-off-by: Moritz Hoffmann <[email protected]>

* Session::give for containers

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Mar 19, 2024
1 parent ba4336b commit ded1a39
Show file tree
Hide file tree
Showing 13 changed files with 288 additions and 192 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- run: cargo install mdbook --version 0.4.27
- run: cargo install mdbook --version 0.4.36
- run: cd mdbook && mdbook build
- uses: JamesIves/github-pages-deploy-action@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- run: rustup update 1.64 --no-self-update && rustup default 1.64
- run: rustup update 1.65 --no-self-update && rustup default 1.65
- run: cargo build
- name: test mdBook
# rustdoc doesn't build dependencies, so it needs to run after `cargo build`,
Expand Down
74 changes: 45 additions & 29 deletions container/src/columnation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,24 @@ impl<T: Columnation> TimelyStack<T> {
});
(length, capacity)
}

/// The length in items.
#[inline]
pub fn len(&self) -> usize {
self.local.len()
}

/// The capacity of the local vector.
#[inline]
pub fn capacity(&self) -> usize {
self.local.capacity()
}

/// Reserve space for `additional` elements.
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.local.reserve(additional)
}
}

impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
Expand Down Expand Up @@ -199,7 +217,7 @@ impl<T: Columnation + Eq> Eq for TimelyStack<T> {}

impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
(&self[..]).fmt(f)
self[..].fmt(f)
}
}

Expand Down Expand Up @@ -291,12 +309,14 @@ mod serde {
}

mod container {
use crate::{Container, PushPartitioned};
use std::ops::Deref;
use crate::{Container, PushContainer};

use crate::columnation::{Columnation, TimelyStack};

impl<T: Columnation + 'static> Container for TimelyStack<T> {
type Item = T;
type ItemRef<'a> = &'a T where Self: 'a;
type Item<'a> = &'a T where Self: 'a;

fn len(&self) -> usize {
self.local.len()
Expand All @@ -306,38 +326,34 @@ mod container {
self.local.is_empty()
}

fn capacity(&self) -> usize {
self.local.capacity()
}

fn clear(&mut self) {
TimelyStack::clear(self)
}

type Iter<'a> = std::slice::Iter<'a, T>;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

type DrainIter<'a> = std::slice::Iter<'a, T>;

fn drain(&mut self) -> Self::DrainIter<'_> {
(*self).iter()
}
}

impl<T: Columnation + 'static> PushPartitioned for TimelyStack<T> {
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
I: FnMut(&Self::Item) -> usize,
F: FnMut(usize, &mut Self),
{
fn ensure_capacity<E: Columnation>(this: &mut TimelyStack<E>) {
let capacity = this.local.capacity();
let desired_capacity = crate::buffer::default_capacity::<E>();
if capacity < desired_capacity {
this.local.reserve(desired_capacity - capacity);
}
}
impl<T: Columnation + 'static> PushContainer for TimelyStack<T> {
fn capacity(&self) -> usize {
self.capacity()
}

for datum in &self[..] {
let index = index(&datum);
ensure_capacity(&mut buffers[index]);
buffers[index].copy(datum);
if buffers[index].len() == buffers[index].local.capacity() {
flush(index, &mut buffers[index]);
}
}
self.clear();
fn preferred_capacity() -> usize {
crate::buffer::default_capacity::<T>()
}

fn reserve(&mut self, additional: usize) {
self.reserve(additional)
}
}
}
156 changes: 128 additions & 28 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,74 @@ pub mod columnation;
/// is efficient (which is not necessarily the case when deriving `Clone`.)
/// TODO: Don't require `Container: Clone`
pub trait Container: Default + Clone + 'static {
/// The type of elements this container holds.
type Item;
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the continer.
type Item<'a> where Self: 'a;

/// The number of elements in this container
///
/// The length of a container must be consistent between sending and receiving it.
/// When exchanging a container and partitioning it into pieces, the sum of the length
/// of all pieces must be equal to the length of the original container.
/// of all pieces must be equal to the length of the original container. When combining
/// containers, the length of the result must be the sum of the individual parts.
fn len(&self) -> usize;

/// Determine if the container contains any elements, corresponding to `len() == 0`.
fn is_empty(&self) -> bool {
self.len() == 0
}

/// The capacity of the underlying container
fn capacity(&self) -> usize;

/// Remove all contents from `self` while retaining allocated memory.
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
fn clear(&mut self);

/// Iterator type when reading from the container.
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>>;

/// Returns an iterator that reads the contents of this container.
fn iter(&self) -> Self::Iter<'_>;

/// Iterator type when draining the container.
type DrainIter<'a>: Iterator<Item=Self::Item<'a>>;

/// Returns an iterator that drains the contents of this container.
/// Drain leaves the container in an undefined state.
fn drain(&mut self) -> Self::DrainIter<'_>;
}

/// A type that can push itself into a container.
pub trait PushInto<C> {
/// Push self into the target container.
fn push_into(self, target: &mut C);
}

/// A type that has the necessary infrastructure to push elements, without specifying how pushing
/// itself works. For this, pushable types should implement [`PushInto`].
// TODO: Reconsider this interface because it assumes
// * Containers have a capacity
// * Push presents single elements.
// * Instead of testing `len == cap`, we could have a `is_full` to test that we might
// not be able to absorb more data.
// * Example: A FlatStack with optimized offsets and deduplication can absorb many elements without reallocation. What does capacity mean in this context?
pub trait PushContainer: Container {
/// Push `item` into self
#[inline]
fn push<T: PushInto<Self>>(&mut self, item: T) {
item.push_into(self)
}
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
}

impl<T: Clone + 'static> Container for Vec<T> {
type Item = T;
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;

fn len(&self) -> usize {
Vec::len(self)
Expand All @@ -51,20 +94,58 @@ impl<T: Clone + 'static> Container for Vec<T> {
Vec::is_empty(self)
}

fn clear(&mut self) { Vec::clear(self) }

type Iter<'a> = std::slice::Iter<'a, T>;

fn iter(&self) -> Self::Iter<'_> {
self.as_slice().iter()
}

type DrainIter<'a> = std::vec::Drain<'a, T>;

fn drain(&mut self) -> Self::DrainIter<'_> {
self.drain(..)
}
}

impl<T: Clone + 'static> PushContainer for Vec<T> {
fn capacity(&self) -> usize {
Vec::capacity(self)
self.capacity()
}

fn clear(&mut self) { Vec::clear(self) }
fn preferred_capacity() -> usize {
buffer::default_capacity::<T>()
}

fn reserve(&mut self, additional: usize) {
self.reserve(additional);
}
}

impl<T> PushInto<Vec<T>> for T {
#[inline]
fn push_into(self, target: &mut Vec<T>) {
target.push(self)
}
}

impl<'a, T: Clone> PushInto<Vec<T>> for &'a T {
#[inline]
fn push_into(self, target: &mut Vec<T>) {
target.push(self.clone())
}
}

mod rc {
use std::ops::Deref;
use std::rc::Rc;

use crate::Container;

impl<T: Container> Container for Rc<T> {
type Item = T::Item;
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
Expand All @@ -74,10 +155,6 @@ mod rc {
std::ops::Deref::deref(self).is_empty()
}

fn capacity(&self) -> usize {
std::ops::Deref::deref(self).capacity()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Rc::get_mut(self) {
Expand All @@ -86,16 +163,30 @@ mod rc {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a>;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

type DrainIter<'a> = T::Iter<'a>;

fn drain(&mut self) -> Self::DrainIter<'_> {
self.iter()
}
}
}

mod arc {
use std::ops::Deref;
use std::sync::Arc;

use crate::Container;

impl<T: Container> Container for Arc<T> {
type Item = T::Item;
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
Expand All @@ -105,10 +196,6 @@ mod arc {
std::ops::Deref::deref(self).is_empty()
}

fn capacity(&self) -> usize {
std::ops::Deref::deref(self).capacity()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Arc::get_mut(self) {
Expand All @@ -117,43 +204,56 @@ mod arc {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a>;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

type DrainIter<'a> = T::Iter<'a>;

fn drain(&mut self) -> Self::DrainIter<'_> {
self.iter()
}
}
}

/// A container that can partition itself into pieces.
pub trait PushPartitioned: Container {
pub trait PushPartitioned: PushContainer {
/// Partition and push this container.
///
/// Drain all elements from `self`, and use the function `index` to determine which `buffer` to
/// append an element to. Call `flush` with an index and a buffer to send the data downstream.
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], index: I, flush: F)
where
I: FnMut(&Self::Item) -> usize,
for<'a> I: FnMut(&Self::Item<'a>) -> usize,
F: FnMut(usize, &mut Self);
}

impl<T: Clone + 'static> PushPartitioned for Vec<T> {
impl<T: PushContainer + 'static> PushPartitioned for T where for<'a> T::Item<'a>: PushInto<T> {
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
I: FnMut(&Self::Item) -> usize,
for<'a> I: FnMut(&Self::Item<'a>) -> usize,
F: FnMut(usize, &mut Self),
{
fn ensure_capacity<E>(this: &mut Vec<E>) {
let ensure_capacity = |this: &mut Self| {
let capacity = this.capacity();
let desired_capacity = buffer::default_capacity::<E>();
let desired_capacity = Self::preferred_capacity();
if capacity < desired_capacity {
this.reserve(desired_capacity - capacity);
}
}
};

for datum in self.drain(..) {
for datum in self.drain() {
let index = index(&datum);
ensure_capacity(&mut buffers[index]);
buffers[index].push(datum);
if buffers[index].len() == buffers[index].capacity() {
if buffers[index].len() >= buffers[index].capacity() {
flush(index, &mut buffers[index]);
}
}
self.clear();
}
}

Expand Down
Loading

0 comments on commit ded1a39

Please sign in to comment.