Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregating collection into a single value #276

Open
wilfredwee opened this issue Aug 8, 2020 · 0 comments
Open

Aggregating collection into a single value #276

wilfredwee opened this issue Aug 8, 2020 · 0 comments

Comments

@wilfredwee
Copy link

Hi there, I've been playing around with this promising project and I'm really excited about what it can achieve.

In the examples of reduce, all aggregations seem to be per-key. I'm trying to find a way to aggregate all values in a collection to a single value. For example, summing over a collection.

For example, I've attempted this with explode, using a constant as a key:

use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::agent::TraceAgent;
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::operators::Count;
use differential_dataflow::trace::cursor::CursorDebug;
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::trace::{Cursor, TraceReader};
use timely::dataflow::operators::probe::Handle;
use timely::{Configuration, PartialOrder};

pub fn sum_with_workers(initial_items: Vec<(String, isize)>) {
    timely::execute(Configuration::Process(3), move |worker| {
        let worker_index = worker.index();
        let initial_items = initial_items.clone();
        let mut probe = Handle::new();

        let (mut input_session, mut trace) = worker.dataflow(|scope| {
            let (input_session, collection) = scope.new_collection_from(initial_items);
            let trace = collection.probe_with(&mut probe).arrange_by_self().trace;

            (input_session, trace.clone())
        });

        let mut sum_trace = worker.dataflow(|scope| {
            let collection = trace
                .import(scope)
                .as_collection(|k, v| {
                    k.clone()
                })
                .probe_with(&mut probe);

            let sum = collection
                .explode(|(k, v)| {
                    Some(("CONSTANT_KEY".to_string(), v))
                })
                .count()
                .probe_with(&mut probe);

            sum.arrange_by_key().trace.clone()
        });

        let time = 2;
        input_session.advance_to(time);
        input_session.flush();

        worker.step_while(|| probe.less_than(&time));

        sum_trace.advance_by(&[time]);
        sum_trace.distinguish_since(&[time]);

        let sum_result = read_collection_at_time(&mut sum_trace, time);

        println!(
            "Worker: {}, Time: {}. Obtained sum of all items: {:?}",
            worker_index, time, sum_result
        )
    });
}

fn read_collection_at_time(
    trace_reader: &mut TraceAgent<OrdValSpine<String, isize, usize, isize>>,
    time: usize,
) -> Option<isize> {
    let (mut cursor, storage) = trace_reader.cursor();

    let mut result = None;
    while cursor.key_valid(&storage) {
        while cursor.val_valid(&storage) {
            let item = cursor.val(&storage);
            let key = cursor.key(&storage);

            let mut total = 0;
            cursor.map_times(&storage, |timestamp, update| {
                if timestamp.less_equal(&time) {
                    total = total + update;
                }
            });

            if total > 0 {
                result = Some(item);
            }

            cursor.step_val(&storage);
        }

        cursor.step_key(&storage);
    }

    result.map(|v| *v)
}

Inputting a value of

vec![
        ("id1".to_string(), 1),
        ("id2".to_string(), 2),
        ("id3".to_string(), 3),
    ];

Problem with this approach is that the result will be actual_sum * num_of_workers. I'm wondering if I should approach this differently?

Is my approach applicable for other types of aggregations that do not use explode? e.g. get the min of all items using reduce.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant