Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.function.Function;

/**
* Used on parameters on methods annotated with {@link Evaluator} to indicate
Expand All @@ -27,12 +26,23 @@
boolean includeInToString() default true;

/**
* Should the Evaluator's factory build this per evaluator with a
* {@code Function<DriverContext, T>} or just take fixed implementation?
* This is typically set to {@code true} to use the {@link Function}
* to make "scratch" objects which have to be isolated in a single thread.
* This is typically set to {@code false} when the parameter is simply
* immutable and can be shared.
* Defines the scope of the parameter.
* - SINGLETON (default) will build a single instance and share it across all evaluators
* - THREAD_LOCAL will build a new instance for each evaluator thread
*/
boolean build() default false;
Scope scope() default Scope.SINGLETON;

/**
* Defines the parameter scope
*/
enum Scope {
/**
* Should be used for immutable parameters that can be shared across different threads
*/
SINGLETON,
/**
* Should be used for mutable or not thread safe parameters
*/
THREAD_LOCAL,
}
Comment on lines -37 to +47
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change is here.
Please let me know if you have a better naming in mind.

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.squareup.javapoet.TypeSpec;

import org.elasticsearch.compute.ann.Fixed;
import org.elasticsearch.compute.ann.Fixed.Scope;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -725,7 +726,7 @@ public String closeInvocation() {
}
}

private record FixedProcessFunctionArg(TypeName type, String name, boolean includeInToString, boolean build, boolean releasable)
private record FixedProcessFunctionArg(TypeName type, String name, boolean includeInToString, Scope scope, boolean releasable)
implements
ProcessFunctionArg {
@Override
Expand Down Expand Up @@ -762,12 +763,18 @@ public void implementFactoryCtor(MethodSpec.Builder builder) {
}

private TypeName factoryFieldType() {
return build ? ParameterizedTypeName.get(ClassName.get(Function.class), DRIVER_CONTEXT, type.box()) : type;
return switch (scope) {
case SINGLETON -> type;
case THREAD_LOCAL -> ParameterizedTypeName.get(ClassName.get(Function.class), DRIVER_CONTEXT, type.box());
};
}

@Override
public String factoryInvocation(MethodSpec.Builder factoryMethodBuilder) {
return build ? name + ".apply(context)" : name;
return switch (scope) {
case SINGLETON -> name;
case THREAD_LOCAL -> name + ".apply(context)";
};
}

@Override
Expand Down Expand Up @@ -1020,7 +1027,7 @@ private ProcessFunction(
type,
name,
fixed.includeInToString(),
fixed.build(),
fixed.scope(),
Types.extendsSuper(types, v.asType(), "org.elasticsearch.core.Releasable")
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Base64;
import java.util.List;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;

Expand Down Expand Up @@ -85,7 +86,7 @@ protected NodeInfo<? extends Expression> info() {
}

@Evaluator()
static BytesRef process(BytesRef field, @Fixed(includeInToString = false, build = true) BytesRefBuilder oScratch) {
static BytesRef process(BytesRef field, @Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRefBuilder oScratch) {
byte[] bytes = new byte[field.length];
System.arraycopy(field.bytes, field.offset, bytes, 0, field.length);
oScratch.grow(field.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Base64;
import java.util.List;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;

Expand Down Expand Up @@ -78,7 +79,7 @@ protected NodeInfo<? extends Expression> info() {
}

@Evaluator(warnExceptions = { ArithmeticException.class })
static BytesRef process(BytesRef field, @Fixed(includeInToString = false, build = true) BytesRefBuilder oScratch) {
static BytesRef process(BytesRef field, @Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRefBuilder oScratch) {
int outLength = Math.multiplyExact(4, (Math.addExact(field.length, 2) / 3));
byte[] bytes = new byte[field.length];
System.arraycopy(field.bytes, field.offset, bytes, 0, field.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.THIRD;
Expand Down Expand Up @@ -138,7 +139,7 @@ static BytesRef process(
BytesRef ip,
int prefixLengthV4,
int prefixLengthV6,
@Fixed(includeInToString = false, build = true) BytesRef scratch
@Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRef scratch
) {
if (prefixLengthV4 < 0 || prefixLengthV4 > 32) {
throw new IllegalArgumentException("Prefix length v4 must be in range [0, 32], found " + prefixLengthV4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isFoldable;
Expand Down Expand Up @@ -144,7 +145,7 @@ static void process(
DoubleBlock.Builder builder,
int position,
DoubleBlock block,
@Fixed(includeInToString = false, build = true) CompensatedSum sum,
@Fixed(includeInToString = false, scope = THREAD_LOCAL) CompensatedSum sum,
@Fixed double p
) {
sum.reset(0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
Expand Down Expand Up @@ -167,7 +168,7 @@ static void process(
int position,
DoubleBlock values,
double percentile,
@Fixed(includeInToString = false, build = true) DoubleSortingScratch scratch
@Fixed(includeInToString = false, scope = THREAD_LOCAL) DoubleSortingScratch scratch
) {
int valueCount = values.getValueCount(position);
int firstValueIndex = values.getFirstValueIndex(position);
Expand All @@ -190,7 +191,7 @@ static void process(
int position,
IntBlock values,
double percentile,
@Fixed(includeInToString = false, build = true) IntSortingScratch scratch
@Fixed(includeInToString = false, scope = THREAD_LOCAL) IntSortingScratch scratch
) {
int valueCount = values.getValueCount(position);
int firstValueIndex = values.getFirstValueIndex(position);
Expand All @@ -213,7 +214,7 @@ static void process(
int position,
LongBlock values,
double percentile,
@Fixed(includeInToString = false, build = true) LongSortingScratch scratch
@Fixed(includeInToString = false, scope = THREAD_LOCAL) LongSortingScratch scratch
) {
int valueCount = values.getValueCount(position);
int firstValueIndex = values.getFirstValueIndex(position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Stream;

import static org.elasticsearch.common.unit.ByteSizeUnit.MB;
import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;

Expand Down Expand Up @@ -111,7 +112,7 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
}

@Evaluator
static BytesRef process(@Fixed(includeInToString = false, build = true) BreakingBytesRefBuilder scratch, BytesRef[] values) {
static BytesRef process(@Fixed(includeInToString = false, scope = THREAD_LOCAL) BreakingBytesRefBuilder scratch, BytesRef[] values) {
scratch.grow(checkedTotalLength(values));
scratch.clear();
for (int i = 0; i < values.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;
Expand Down Expand Up @@ -77,8 +78,8 @@ public String getWriteableName() {

@Evaluator
static BytesRef process(
@Fixed(includeInToString = false, build = true) BytesRef out,
@Fixed(includeInToString = false, build = true) UnicodeUtil.UTF8CodePoint cp,
@Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRef out,
@Fixed(includeInToString = false, scope = THREAD_LOCAL) UnicodeUtil.UTF8CodePoint cp,
BytesRef str,
int length
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;

import static org.elasticsearch.common.unit.ByteSizeUnit.MB;
import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;
Expand Down Expand Up @@ -101,15 +102,19 @@ public boolean foldable() {

@Evaluator(extraName = "Constant", warnExceptions = { IllegalArgumentException.class })
static BytesRef processConstantNumber(
@Fixed(includeInToString = false, build = true) BreakingBytesRefBuilder scratch,
@Fixed(includeInToString = false, scope = THREAD_LOCAL) BreakingBytesRefBuilder scratch,
BytesRef str,
@Fixed int number
) {
return processInner(scratch, str, number);
}

@Evaluator(warnExceptions = { IllegalArgumentException.class })
static BytesRef process(@Fixed(includeInToString = false, build = true) BreakingBytesRefBuilder scratch, BytesRef str, int number) {
static BytesRef process(
@Fixed(includeInToString = false, scope = THREAD_LOCAL) BreakingBytesRefBuilder scratch,
BytesRef str,
int number
) {
if (number < 0) {
throw new IllegalArgumentException("Number parameter cannot be negative, found [" + number + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;
Expand Down Expand Up @@ -77,8 +78,8 @@ public String getWriteableName() {

@Evaluator
static BytesRef process(
@Fixed(includeInToString = false, build = true) BytesRef out,
@Fixed(includeInToString = false, build = true) UnicodeUtil.UTF8CodePoint cp,
@Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRef out,
@Fixed(includeInToString = false, scope = THREAD_LOCAL) UnicodeUtil.UTF8CodePoint cp,
BytesRef str,
int length
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;

import static org.elasticsearch.common.unit.ByteSizeUnit.MB;
import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;
Expand Down Expand Up @@ -82,7 +83,7 @@ protected TypeResolution resolveType() {
}

@Evaluator(warnExceptions = { IllegalArgumentException.class })
static BytesRef process(@Fixed(includeInToString = false, build = true) BreakingBytesRefBuilder scratch, int number) {
static BytesRef process(@Fixed(includeInToString = false, scope = THREAD_LOCAL) BreakingBytesRefBuilder scratch, int number) {
checkNumber(number);
scratch.grow(number);
scratch.setLength(number);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;

import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
import static org.elasticsearch.xpack.esql.expression.EsqlTypeResolutions.isStringAndExact;
Expand Down Expand Up @@ -110,7 +111,7 @@ static void process(
BytesRefBlock.Builder builder,
BytesRef str,
@Fixed byte delim,
@Fixed(includeInToString = false, build = true) BytesRef scratch
@Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRef scratch
) {
scratch.bytes = str.bytes;
scratch.offset = str.offset;
Expand Down Expand Up @@ -140,7 +141,7 @@ static void process(
BytesRefBlock.Builder builder,
BytesRef str,
BytesRef delim,
@Fixed(includeInToString = false, build = true) BytesRef scratch
@Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRef scratch
) {
checkDelimiter(delim);
process(builder, str, delim.bytes[delim.offset], scratch);
Expand Down