Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions js/src/compute/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type NextFunc = (idx: number, batch: RecordBatch) => void;

Table.prototype.countBy = function(this: Table, name: Col | string) { return new DataFrame(this.chunks).countBy(name); };
Table.prototype.scan = function(this: Table, next: NextFunc, bind?: BindFunc) { return new DataFrame(this.chunks).scan(next, bind); };
Table.prototype.scanReverse = function(this: Table, next: NextFunc, bind?: BindFunc) { return new DataFrame(this.chunks).scanReverse(next, bind); };
Table.prototype.filter = function(this: Table, predicate: Predicate): FilteredDataFrame { return new DataFrame(this.chunks).filter(predicate); };

export class DataFrame<T extends { [key: string]: DataType } = any> extends Table<T> {
Expand All @@ -49,6 +50,18 @@ export class DataFrame<T extends { [key: string]: DataType } = any> extends Tabl
}
}
}
public scanReverse(next: NextFunc, bind?: BindFunc) {
const batches = this.chunks, numBatches = batches.length;
for (let batchIndex = numBatches; --batchIndex >= 0;) {
// load batches
const batch = batches[batchIndex];
if (bind) { bind(batch); }
// yield all indices
for (let index = batch.length; --index >= 0;) {
next(index, batch);
}
}
}
public countBy(name: Col | string) {
const batches = this.chunks, numBatches = batches.length;
const count_by = typeof name === 'string' ? new Col(name) : name as Col;
Expand Down Expand Up @@ -130,6 +143,23 @@ export class FilteredDataFrame<T extends { [key: string]: DataType } = any> exte
}
}
}
public scanReverse(next: NextFunc, bind?: BindFunc) {
const batches = this._chunks;
const numBatches = batches.length;
for (let batchIndex = numBatches; --batchIndex >= 0;) {
// load batches
const batch = batches[batchIndex];
// TODO: bind batches lazily
// If predicate doesn't match anything in the batch we don't need
// to bind the callback
if (bind) { bind(batch); }
const predicate = this._predicate.bind(batch);
// yield all indices
for (let index = batch.length; --index >= 0;) {
if (predicate(index, batch)) { next(index, batch); }
}
}
}
public count(): number {
// inlined version of this:
// let sum = 0;
Expand Down
1 change: 1 addition & 0 deletions js/src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export interface Table<T extends { [key: string]: DataType } = any> {
clone(chunks?: RecordBatch<T>[], offsets?: Uint32Array): Table<T>;

scan(next: import('./compute/dataframe').NextFunc, bind?: import('./compute/dataframe').BindFunc): void;
scanReverse(next: import('./compute/dataframe').NextFunc, bind?: import('./compute/dataframe').BindFunc): void;
countBy(name: import('./compute/predicate').Col | string): import('./compute/dataframe').CountByResult;
filter(predicate: import('./compute/predicate').Predicate): import('./compute/dataframe').FilteredDataFrame<T>;
}
Expand Down
38 changes: 38 additions & 0 deletions js/test/unit/table-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,24 @@ describe(`Table`, () => {
}
});
});
describe(`scanReverse()`, () => {
test(`yields all values`, () => {
const table = datum.table();
let expected_idx = values.length;
table.scanReverse((idx, batch) => {
const columns = batch.schema.fields.map((_, i) => batch.getChildAt(i)!);
expect(columns.map((c) => c.get(idx))).toEqual(values[--expected_idx]);
});
});
test(`calls bind function with every batch`, () => {
const table = datum.table();
let bind = jest.fn();
table.scanReverse(() => { }, bind);
for (let batch of table.chunks) {
expect(bind).toHaveBeenCalledWith(batch);
}
});
});
test(`count() returns the correct length`, () => {
const table = datum.table();
const values = datum.values();
Expand Down Expand Up @@ -434,6 +452,26 @@ describe(`Table`, () => {
}
});
});
describe(`scanReverse()`, () => {
test(`iterates over expected values in reverse`, () => {
let expected_idx = expected.length;
filtered.scanReverse((idx, batch) => {
const columns = batch.schema.fields.map((_, i) => batch.getChildAt(i)!);
expect(columns.map((c) => c.get(idx))).toEqual(expected[--expected_idx]);
});
});
test(`calls bind function on every batch`, () => {
// Techincally, we only need to call bind on
// batches with data that match the predicate, so
// this test may fail in the future if we change
// that - and that's ok!
let bind = jest.fn();
filtered.scanReverse(() => { }, bind);
for (let batch of table.chunks) {
expect(bind).toHaveBeenCalledWith(batch);
}
});
});
});
}
test(`countBy on dictionary returns the correct counts`, () => {
Expand Down