Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim committed Feb 5, 2025
1 parent d76f12a commit 7e2494e
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.exec;

import java.lang.reflect.Type;
import java.util.Arrays;

import org.apache.ignite.internal.util.typedef.F;

Expand All @@ -42,20 +41,6 @@ private ArrayRowHandler() {}
row[field] = val;
}

/** */
@Override public void resetRow(Object[] row) {
Arrays.fill(row, null);
}

/** */
@Override public Object[] copyRow(Object[] row) {
Object[] copy = new Object[row.length];

System.arraycopy(row, 0, copy, 0, row.length);

return copy;
}

/** {@inheritDoc} */
@Override public Object[] concat(Object[] left, Object[] right) {
return F.concat(left, right);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ public interface RowHandler<Row> {
/** */
void set(int field, Row row, Object val);

/** Reset values all fields of the row to {@code null}. */
void resetRow(Row row);

/** Creates a copy of the row. */
Row copyRow(Row row);

/** */
Row concat(Row left, Row right);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
import java.util.List;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;

/**
*
*/
public interface Accumulator<Row> extends Serializable {
/**
* @param row Reusable object of Row. To collect rows one has to use {@link RowHandler#copyRow(Object)} call.
*/
/** */
void add(Row row);

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ else if ("ARRAY_AGG".equals(aggName))
if (call.getCollation() != null && !call.getCollation().getFieldCollations().isEmpty()) {
Comparator<Row> cmp = ctx.expressionFactory().comparator(call.getCollation());

return () -> new SortingAccumulator<>(accSup, cmp, hnd);
return () -> new SortingAccumulator<>(accSup, cmp);
}

return accSup;
Expand Down Expand Up @@ -304,11 +304,6 @@ protected List<Integer> arguments() {
int columnCount(Row row) {
return hnd.columnCount(row);
}

/** */
Row copyRow(Row row) {
return hnd.copyRow(row);
}
}

/** */
Expand Down Expand Up @@ -1200,24 +1195,20 @@ private static class SortingAccumulator<Row> implements IterableAccumulator<Row>
/** */
private final Accumulator<Row> acc;

/** */
private final RowHandler<Row> hnd;

/**
* @param accSup Accumulator supplier.
* @param cmp Comparator.
*/
private SortingAccumulator(Supplier<Accumulator<Row>> accSup, Comparator<Row> cmp, RowHandler<Row> hnd) {
private SortingAccumulator(Supplier<Accumulator<Row>> accSup, Comparator<Row> cmp) {
this.cmp = cmp;
this.hnd = hnd;

list = new ArrayList<>();
acc = accSup.get();
}

/** {@inheritDoc} */
@Override public void add(Row row) {
list.add(hnd.copyRow(row));
list.add(row);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1270,7 +1261,7 @@ protected AggAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
if (row == null)
return;

buf.add(copyRow(row));
buf.add(row);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1434,7 +1425,7 @@ public ArrayConcatAggregateAccumulator(AggregateCall aggCall, RowHandler<Row> hn
}

/** */
private static class DistinctAccumulator<Row> extends AbstractAccumulator<Row> {
private static class DistinctAccumulator<Row> extends AbstractAccumulator<Row> implements StoringAccumulator {
/** */
private final Accumulator<Row> acc;

Expand Down Expand Up @@ -1465,7 +1456,7 @@ private DistinctAccumulator(AggregateCall aggCall, RowHandler<Row> hnd, Supplier
if (row == null || columnCount(row) == 0 || (key = get(0, row)) == null)
return;

rows.put(key, copyRow(row));
rows.put(key, row);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ private final class WrapperPrototype implements Supplier<AccumulatorWrapper<Row>
/** */
private final AggregateCall call;

/** */
private final RowHandler.RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), inputRowType);

/** */
private Function<Row, Row> inAdapter;

Expand All @@ -191,7 +194,7 @@ private WrapperPrototype(AggregateCall call) {
@Override public AccumulatorWrapper<Row> get() {
Accumulator<Row> accumulator = accumulator();

return new AccumulatorWrapperImpl(accumulator, call, inAdapter, outAdapter);
return new AccumulatorWrapperImpl(accumulator, call, inAdapter, outAdapter, rowFactory);
}

/** */
Expand Down Expand Up @@ -236,13 +239,15 @@ private WrapperPrototype(AggregateCall call) {
for (int i = 0; i < call.getArgList().size(); ++i)
argMapping[call.getArgList().get(i)] = i;

return new Function<Row, Row>() {
boolean createRow = accumulator.getClass().isAssignableFrom(StoringAccumulator.class);

return new Function<>() {
final RowHandler<Row> hnd = ctx.rowHandler();

final Row out = hnd.factory(ctx.getTypeFactory(), inputRowType).create();
final Row reuseRow = createRow ? null : rowFactory.create();

@Override public Row apply(Row in) {
hnd.resetRow(out);
Row out = createRow ? rowFactory.create() : reuseRow;

for (int i = 0; i < hnd.columnCount(in); ++i) {
Object val = hnd.get(i, in);
Expand Down Expand Up @@ -296,12 +301,16 @@ private final class AccumulatorWrapperImpl implements AccumulatorWrapper<Row> {
/** */
private final RowHandler<Row> handler;

/** */
private final Row reuseRow;

/** */
AccumulatorWrapperImpl(
Accumulator<Row> accumulator,
AggregateCall call,
Function<Row, Row> inAdapter,
Function<Object, Object> outAdapter
Function<Object, Object> outAdapter,
RowHandler.RowFactory<Row> rowFactory
) {
this.accumulator = accumulator;
this.inAdapter = inAdapter;
Expand All @@ -310,6 +319,8 @@ private final class AccumulatorWrapperImpl implements AccumulatorWrapper<Row> {
filterArg = call.hasFilter() ? call.filterArg : -1;

handler = ctx.rowHandler();

reuseRow = accumulator.getClass().isAssignableFrom(StoringAccumulator.class) ? rowFactory.create() : null;
}

/** {@inheritDoc} */
Expand All @@ -319,9 +330,14 @@ private final class AccumulatorWrapperImpl implements AccumulatorWrapper<Row> {
if (filterArg >= 0 && Boolean.TRUE != handler.get(filterArg, row))
return;

Row newRow = inAdapter.apply(row);
if (newRow == null)
return;
Row newRow = reuseRow;

if (newRow == null) {
newRow = inAdapter.apply(row);

if (newRow == null)
return;
}

accumulator.add(newRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;

/**
* Interface for row storing accumulator.
*/
public interface IterableAccumulator<Row> extends Accumulator<Row>, Iterable<Row> {
/** */
public interface IterableAccumulator<Row> extends Accumulator<Row>, Iterable<Row>, StoringAccumulator {
// No-op.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;

/**
* Interface for row storing accumulator.
*/
public interface StoringAccumulator {
// No-op.
}

0 comments on commit 7e2494e

Please sign in to comment.