Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

/**
* Abstract base class for block types that are implemented by delegating to several concrete sub blocks.
*/
public abstract class AbstractDelegatingCompoundBlock<T extends Block> extends AbstractNonThreadSafeRefCounted implements Block {

/**
* @return a list of the sub-blocks composing this compound block. The order of the list should match the order
* expected by {@link AbstractDelegatingCompoundBlock#buildFromSubBlocks(List)}
*/
protected abstract List<Block> getSubBlocks();

/**
* Construct a new instance of the block, based on the given list of sub-blocks.
* @param subBlocks List of sub-blocks, in the same order as {@link AbstractDelegatingCompoundBlock#getSubBlocks()}
* @return a new instance based on the given blocks.
*/
protected abstract T buildFromSubBlocks(List<Block> subBlocks);

@Override
public void allowPassingToDifferentDriver() {
getSubBlocks().forEach(Block::allowPassingToDifferentDriver);
}

@Override
public BlockFactory blockFactory() {
return getSubBlocks().get(0).blockFactory();
}

@Override
protected void closeInternal() {
Releasables.close(getSubBlocks());
}

@Override
public T deepCopy(BlockFactory blockFactory) {
return applyOperationToSubBlocks(block -> block.deepCopy(blockFactory));
}

@Override
public T filter(int... positions) {
return applyOperationToSubBlocks(block -> block.filter(positions));
}

@Override
public int getPositionCount() {
return getSubBlocks().get(0).getPositionCount();
}

@Override
public T keepMask(BooleanVector mask) {
return applyOperationToSubBlocks(block -> block.keepMask(mask));
}

@Override
public long ramBytesUsed() {
long bytes = 0;
for (Block b : getSubBlocks()) {
bytes += b.ramBytesUsed();
}
return bytes;
}

private T applyOperationToSubBlocks(Function<Block, Block> operation) {
List<Block> modifiedBlocks = new ArrayList<>(getSubBlocks().size());
boolean success = false;
try {
for (Block block : getSubBlocks()) {
modifiedBlocks.add(operation.apply(block));
}
success = true;
} finally {
if (success == false) {
closeInternal();
}
}
return buildFromSubBlocks(modifiedBlocks);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock {
public final class AggregateMetricDoubleArrayBlock extends AbstractDelegatingCompoundBlock<AggregateMetricDoubleBlock>
implements
AggregateMetricDoubleBlock {
public static final TransportVersion WRITE_TYPED_BLOCK = TransportVersion.fromName("aggregate_metric_double_typed_block");

private final DoubleBlock minBlock;
private final DoubleBlock maxBlock;
private final DoubleBlock sumBlock;
private final IntBlock countBlock;
private final int positionCount;

public AggregateMetricDoubleArrayBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
this.minBlock = minBlock;
this.maxBlock = maxBlock;
this.sumBlock = sumBlock;
this.countBlock = countBlock;
this.positionCount = minBlock.getPositionCount();
int positionCount = minBlock.getPositionCount();
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
if (b.getPositionCount() != positionCount) {
assert false : "expected positionCount=" + positionCount + " but was " + b;
Expand Down Expand Up @@ -69,13 +70,13 @@ public CompositeBlock asCompositeBlock() {
}

@Override
protected void closeInternal() {
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
public Vector asVector() {
return null;
}

@Override
public Vector asVector() {
return null;
public int getFirstValueIndex(int position) {
return minBlock.getFirstValueIndex(position);
}

@Override
Expand All @@ -87,16 +88,6 @@ public int getTotalValueCount() {
return totalValueCount;
}

@Override
public int getPositionCount() {
return positionCount;
}

@Override
public int getFirstValueIndex(int position) {
return minBlock.getFirstValueIndex(position);
}

@Override
public int getValueCount(int position) {
int max = 0;
Expand All @@ -112,15 +103,18 @@ public ElementType elementType() {
}

@Override
public BlockFactory blockFactory() {
return minBlock.blockFactory();
protected List<Block> getSubBlocks() {
return List.of(minBlock, maxBlock, sumBlock, countBlock);
}

@Override
public void allowPassingToDifferentDriver() {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
block.allowPassingToDifferentDriver();
}
protected AggregateMetricDoubleArrayBlock buildFromSubBlocks(List<Block> subBlocks) {
return new AggregateMetricDoubleArrayBlock(
(DoubleBlock) subBlocks.get(0),
(DoubleBlock) subBlocks.get(1),
(DoubleBlock) subBlocks.get(2),
(IntBlock) subBlocks.get(3)
);
}

@Override
Expand Down Expand Up @@ -156,69 +150,6 @@ public boolean doesHaveMultivaluedFields() {
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields);
}

@Override
public AggregateMetricDoubleBlock filter(int... positions) {
AggregateMetricDoubleArrayBlock result = null;
DoubleBlock newMinBlock = null;
DoubleBlock newMaxBlock = null;
DoubleBlock newSumBlock = null;
IntBlock newCountBlock = null;
try {
newMinBlock = minBlock.filter(positions);
newMaxBlock = maxBlock.filter(positions);
newSumBlock = sumBlock.filter(positions);
newCountBlock = countBlock.filter(positions);
result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
return result;
} finally {
if (result == null) {
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
}
}
}

@Override
public AggregateMetricDoubleBlock keepMask(BooleanVector mask) {
AggregateMetricDoubleArrayBlock result = null;
DoubleBlock newMinBlock = null;
DoubleBlock newMaxBlock = null;
DoubleBlock newSumBlock = null;
IntBlock newCountBlock = null;
try {
newMinBlock = minBlock.keepMask(mask);
newMaxBlock = maxBlock.keepMask(mask);
newSumBlock = sumBlock.keepMask(mask);
newCountBlock = countBlock.keepMask(mask);
result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
return result;
} finally {
if (result == null) {
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
}
}
}

@Override
public Block deepCopy(BlockFactory blockFactory) {
AggregateMetricDoubleArrayBlock result = null;
DoubleBlock newMinBlock = null;
DoubleBlock newMaxBlock = null;
DoubleBlock newSumBlock = null;
IntBlock newCountBlock = null;
try {
newMinBlock = minBlock.deepCopy(blockFactory);
newMaxBlock = maxBlock.deepCopy(blockFactory);
newSumBlock = sumBlock.deepCopy(blockFactory);
newCountBlock = countBlock.deepCopy(blockFactory);
result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
return result;
} finally {
if (result == null) {
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
}
}
}

@Override
public ReleasableIterator<? extends AggregateMetricDoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO: support
Expand Down Expand Up @@ -300,11 +231,6 @@ public static Block readFrom(StreamInput in) throws IOException {
}
}

@Override
public long ramBytesUsed() {
return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof AggregateMetricDoubleBlock that) {
Expand Down
Loading