From e891f48670836099aecac1225e2a94111a06ab93 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Thu, 14 Aug 2025 15:57:17 +0800 Subject: [PATCH 01/11] Optimizing Sorting Performance via Radix Sort Algorithm --- .../org/apache/geaflow/common/type/Types.java | 4 +- .../type/primitive/BinaryStringType.java | 14 ++ .../apache/geaflow/dsl/util/SqlTypeUtil.java | 9 +- .../function/table/OrderByHeapSort.java | 72 +++++++ .../function/table/OrderByRadixSort.java | 59 +++++ .../function/table/OrderByTimSort.java | 62 ++++++ .../table/order/MultiFieldRadixSort.java | 204 ++++++++++++++++++ .../function/table/order/SortInfo.java | 19 ++ .../dsl/runtime/plan/PhysicSortRelNode.java | 15 +- 9 files changed, 450 insertions(+), 8 deletions(-) create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java diff --git a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java index ea365a0a5..64481553e 100644 --- a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java +++ b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java @@ -94,7 +94,7 @@ public static IType getType(Class type) { return TYPE_IMMUTABLE_MAP.get(type); } - public static IType of(String typeName) { + public static IType of(String typeName, int precision) { if (typeName == null) { throw new IllegalArgumentException("typeName is null"); } @@ -116,7 +116,7 @@ public static IType of(String typeName) { case TYPE_NAME_DECIMAL: return DECIMAL; case TYPE_NAME_BINARY_STRING: - return BINARY_STRING; + return new BinaryStringType(precision); case TYPE_NAME_TIMESTAMP: return TIMESTAMP; case TYPE_NAME_DATE: diff --git a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java index 1d4bf2bee..609018104 100644 --- a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java +++ b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java @@ -27,6 +27,20 @@ public class BinaryStringType implements IType { public static final BinaryStringType INSTANCE = new BinaryStringType(); + private int precision; + + public BinaryStringType() { + + } + + public BinaryStringType(int precision) { + this.precision = precision; + } + + public int getPrecision() { + return precision; + } + @Override public String getName() { return Types.TYPE_NAME_BINARY_STRING; diff --git a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java index a79d3c242..6a90b4219 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java @@ -29,6 +29,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.geaflow.common.type.IType; import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.common.type.primitive.BinaryStringType; import org.apache.geaflow.dsl.calcite.EdgeRecordType; import org.apache.geaflow.dsl.calcite.GraphRecordType; import org.apache.geaflow.dsl.calcite.PathRecordType; @@ -47,7 +48,7 @@ public final class SqlTypeUtil { public static IType convertType(SqlDataTypeSpec typeSpec) { String typeName = typeSpec.getTypeName().getSimple().toUpperCase(); typeName = convertTypeName(typeName); - return Types.of(typeName); + return Types.of(typeName, typeSpec.getPrecision()); } public static IType convertType(RelDataType type) { @@ -90,7 +91,7 @@ private static List toTableFields(List fields) { public static IType ofTypeName(SqlTypeName sqlTypeName) { String typeName = convertTypeName(sqlTypeName.getName()); - return Types.of(typeName); + return Types.of(typeName, sqlTypeName.getPrecision()); } public static RelDataType convertToRelType(IType type, boolean isNullable, @@ -129,7 +130,9 @@ public static RelDataType convertToRelType(IType type, boolean isNullable, default: if (type.isPrimitive()) { String sqlTypeName = convertToSqlTypeName(type); - SqlTypeName typeName = SqlTypeName.valueOf(sqlTypeName); + SqlTypeName typeName = Types.getType(type.getTypeClass()) == Types.BINARY_STRING ? + SqlTypeName.get(sqlTypeName, ((BinaryStringType) type).getPrecision()): + SqlTypeName.get(sqlTypeName); return typeFactory.createTypeWithNullability(typeFactory.createSqlType(typeName), isNullable); } else { throw new GeaFlowDSLException("Not support type: " + type); diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java new file mode 100644 index 000000000..c86800dbe --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.runtime.function.table; + +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.function.FunctionContext; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.PriorityQueue; + +public class OrderByHeapSort implements OrderByFunction { + + private final SortInfo sortInfo; + + private PriorityQueue topNQueue; + + private TopNRowComparator topNRowComparator; + + public OrderByHeapSort(SortInfo sortInfo) { + this.sortInfo = sortInfo; + } + + @Override + public void open(FunctionContext context) { + this.topNRowComparator = new TopNRowComparator<>(sortInfo); + this.topNQueue = new PriorityQueue<>( + sortInfo.fetch, topNRowComparator.getNegativeComparator()); + } + + @Override + public void process(Row row) { + if (topNQueue.size() == sortInfo.fetch) { + if (sortInfo.orderByFields.isEmpty()) { + return; + } + Row top = topNQueue.peek(); + if (topNQueue.comparator().compare(top, row) < 0) { + topNQueue.remove(); + topNQueue.add(row); + } + } else { + topNQueue.add(row); + } + } + + @Override + public Iterable finish() { + List results = Lists.newArrayList(topNQueue.iterator()); + results.sort(topNRowComparator); + topNQueue.clear(); + return results; + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java new file mode 100644 index 000000000..7669b06c3 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.runtime.function.table; + +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.function.FunctionContext; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.apache.geaflow.dsl.runtime.function.table.order.MultiFieldRadixSort; +import java.util.ArrayList; +import java.util.List; + +public class OrderByRadixSort implements OrderByFunction { + + private final SortInfo sortInfo; + + private List allRows; + + public OrderByRadixSort(SortInfo sortInfo) { + this.sortInfo = sortInfo; + } + + @Override + public void open(FunctionContext context) { + this.allRows = new ArrayList<>(); + } + + @Override + public void process(Row row) { + if (sortInfo.fetch == 0) { + return; + } + allRows.add(row); + } + + @Override + public Iterable finish() { + List sortedRows = new ArrayList<>(allRows); + MultiFieldRadixSort.multiFieldRadixSort(sortedRows, sortInfo); + allRows.clear(); + return sortedRows; + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java new file mode 100644 index 000000000..7da590570 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.runtime.function.table; + +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.function.FunctionContext; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator; +import java.util.ArrayList; +import java.util.List; + +public class OrderByTimSort implements OrderByFunction { + + private final SortInfo sortInfo; + + private List allRows; + + private TopNRowComparator topNRowComparator; + + public OrderByTimSort(SortInfo sortInfo) { + this.sortInfo = sortInfo; + } + + @Override + public void open(FunctionContext context) { + this.topNRowComparator = new TopNRowComparator<>(sortInfo); + this.allRows = new ArrayList<>(); + } + + @Override + public void process(Row row) { + if (sortInfo.fetch == 0) { + return; + } + allRows.add(row); + } + + @Override + public Iterable finish() { + List sortedRows = new ArrayList<>(allRows); + sortedRows.sort(topNRowComparator); + allRows.clear(); + return sortedRows; + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java new file mode 100644 index 000000000..007f82321 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.runtime.function.table.order; + +import java.util.Collections; + +import org.apache.geaflow.common.type.IType; +import org.apache.geaflow.dsl.common.data.Row; +import java.util.List; +import org.apache.geaflow.common.type.Types; +import java.util.IntSummaryStatistics; +import java.util.Objects; +import java.util.ArrayList; +import org.apache.geaflow.common.binary.BinaryString; + +public class MultiFieldRadixSort { + + /** + * Multi-field radix sort. + */ + public static void multiFieldRadixSort(List data, + SortInfo sortInfo) { + if (data == null || data.size() <= 1) return; + + // Sort by field with the lowest priority. + List fields = sortInfo.orderByFields; + + for (int i = fields.size() - 1; i >= 0; i--) { + OrderByField field = fields.get(i); + IType orderType = field.expression.getOutputType(); + if (Types.getType(orderType.getTypeClass()) == Types.INTEGER) { + radixSortByIntField(data, field); + } else if (Types.getType(orderType.getTypeClass()) == Types.BINARY_STRING) { + radixSortByStringField(data, field); + } + } + } + + /** + * Radix sort by integer field. + */ + private static void radixSortByIntField(List data, + OrderByField field) { + if (data.isEmpty()) return; + + // Determine the number of digits. + IntSummaryStatistics stats = data.stream() + .map(item -> field.expression.evaluate(item)) + .filter(Objects::nonNull) + .filter(obj -> obj instanceof Number) // Make sure it is a numeric type. + .mapToInt(obj -> ((Number) obj).intValue()) + .summaryStatistics(); + System.out.println("═══════════field.expression.evaluate(item)->number════════════"); + int max = 0, min = 0; + if (stats.getCount() > 0) { + max = stats.getMax(); + min = stats.getMin(); + } + + // Handling negative numbers: Add the offset to all numbers to make them positive. + int offset = min < 0 ? -min : 0; + max += offset; + + // Bitwise sorting. + for (int exp = 1; max / exp > 0; exp *= 10) { + countingSortByDigit(data, field, exp, offset); + } + } + + /** + * Radix sorting by string field. + */ + private static void radixSortByStringField(List data, + OrderByField field) { + if (data.isEmpty()) return; + + int maxLength = data.stream() + .map(item -> ((BinaryString)field.expression.evaluate(item)).getLength()) + .filter(Objects::nonNull) + .mapToInt(Integer::intValue) + .max() + .orElse(0); + + // Sort from the last digit of the string. + for (int pos = maxLength - 1; pos >= 0; pos--) { + countingSortByChar(data, field, pos); + } + } + + /** + * Sort by the specified number of digits (integer). + */ + private static void countingSortByDigit(List data, + OrderByField field, + int exp, int offset) { + int n = data.size(); + List output = new ArrayList<>(Collections.nCopies(n, null)); + int[] count = new int[10]; + + // Pre-calculate all values to avoid repeated evaluation. + int[] values = new int[n]; + for (int i = 0; i < n; i++) { + values[i] = (Integer) field.expression.evaluate(data.get(i)); + } + System.out.println("═══════════field.expression.evaluate(item)->Integer════════════"); + // Count the number of times each number appears. + for (int i = 0; i < n; i++) { + int digit = (values[i] + offset) / exp % 10; + count[digit]++; + } + + // Calculate cumulative count. + if (field.order.value>0){ + for (int i = 1; i < 10; i++) { + count[i] += count[i - 1]; + } + }else{ + for (int i = 8; i >= 0; i--) { + count[i] += count[i + 1]; + } + } + // Build the output array from back to front (to ensure stability). + for (int i = n - 1; i >= 0; i--) { + int digit = (values[i] + offset) / exp % 10; + output.set(count[digit] - 1, data.get(i)); + count[digit]--; + } + + // Copy back to the original array. + for (int i = 0; i < n; i++) { + data.set(i, output.get(i)); + } + } + + /** + * Sort by the specified number of digits (string). + */ + private static void countingSortByChar(List data, + OrderByField field, + int pos) { + int n = data.size(); + List output = new ArrayList<>(Collections.nCopies(n, null)); + + // Precompute all strings and character codes to avoid repeated evaluate and toString. + String[] strings = new String[n]; + int[] charCodes = new int[n]; + + int minChar = Integer.MAX_VALUE; + int maxChar = Integer.MIN_VALUE; + + for (int i = 0; i < n; i++) { + strings[i] = ((BinaryString)field.expression.evaluate(data.get(i))).toString(); + if (pos < strings[i].length()) { + int charCode = strings[i].codePointAt(pos); + minChar = Math.min(minChar, charCode); + maxChar = Math.max(maxChar, charCode); + } + } + int range = maxChar - minChar + 2; + int[] count = new int[range]; + + for (int i = 0; i < n; i++) { + charCodes[i] = pos < strings[i].length() ? strings[i].charAt(pos) : 0; + charCodes[i] = charCodes[i] == 0 ? 0 : charCodes[i] - minChar + 1; + count[charCodes[i]]++; + } + + if (field.order.value>0){ + for (int i = 1; i < range; i++) { + count[i] += count[i - 1]; + } + }else{ + for (int i = range-2; i >= 0; i--) { + count[i] += count[i + 1]; + } + } + + for (int i = n - 1; i >= 0; i--) { + output.set(count[charCodes[i]] - 1, data.get(i)); + count[charCodes[i]]--; + } + + for (int i = 0; i < n; i++) { + data.set(i, output.get(i)); + } + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java index b7bbf238a..d878c1faf 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java @@ -23,6 +23,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.geaflow.common.type.IType; +import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.common.type.primitive.BinaryStringType; public class SortInfo implements Serializable { @@ -36,4 +39,20 @@ public SortInfo copy(List orderByFields) { sortInfo.fetch = this.fetch; return sortInfo; } + + public boolean isRadixSortable() { + for (int i = 0; i < this.orderByFields.size(); i++) { + OrderByField field = this.orderByFields.get(i); + IType orderType = field.expression.getOutputType(); + if (Types.getType(orderType.getTypeClass()) != Types.INTEGER && Types.getType(orderType.getTypeClass()) != Types.BINARY_STRING) { + return false; + } else if (Types.getType(orderType.getTypeClass()) == Types.BINARY_STRING) { + BinaryStringType bsType = (BinaryStringType)orderType; + if (bsType.getPrecision() > 11 || bsType.getPrecision() < 0) { + return false; + } + } + } + return true; + } } diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java index 08c2179a3..7823eceef 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java @@ -38,7 +38,9 @@ import org.apache.geaflow.dsl.runtime.expression.ExpressionTranslator; import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction; -import org.apache.geaflow.dsl.runtime.function.table.OrderByFunctionImpl; +import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort; +import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort; +import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort; import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; @@ -69,8 +71,15 @@ public Sort copy(RelTraitSet traitSet, public RuntimeTable translate(QueryContext context) { SortInfo sortInfo = buildSortInfo(); RDataView dataView = ((PhysicRelNode) getInput()).translate(context); - - OrderByFunction orderByFunction = new OrderByFunctionImpl(sortInfo); + + OrderByFunction orderByFunction; + if (sortInfo.fetch > 0) { + orderByFunction = new OrderByHeapSort(sortInfo); + } else if (sortInfo.isRadixSortable()) { + orderByFunction = new OrderByRadixSort(sortInfo); + } else { + orderByFunction = new OrderByTimSort(sortInfo); + } if (dataView.getType() == ViewType.TABLE) { return ((RuntimeTable) dataView).orderBy(orderByFunction); } else { From 6499b640fe391365c996f1f5c13c62a09b54415c Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Mon, 25 Aug 2025 08:51:14 +0800 Subject: [PATCH 02/11] Fix style --- .../org/apache/geaflow/dsl/util/SqlTypeUtil.java | 8 ++++---- .../runtime/function/table/OrderByHeapSort.java | 6 +++--- .../runtime/function/table/OrderByRadixSort.java | 6 +++--- .../dsl/runtime/function/table/OrderByTimSort.java | 4 ++-- .../function/table/order/MultiFieldRadixSort.java | 14 ++++++-------- .../dsl/runtime/function/table/order/SortInfo.java | 2 +- 6 files changed, 19 insertions(+), 21 deletions(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java index 6a90b4219..ddc8578eb 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java @@ -28,8 +28,8 @@ import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.geaflow.common.type.IType; -import org.apache.geaflow.common.type.Types; import org.apache.geaflow.common.type.primitive.BinaryStringType; +import org.apache.geaflow.common.type.Types; import org.apache.geaflow.dsl.calcite.EdgeRecordType; import org.apache.geaflow.dsl.calcite.GraphRecordType; import org.apache.geaflow.dsl.calcite.PathRecordType; @@ -130,9 +130,9 @@ public static RelDataType convertToRelType(IType type, boolean isNullable, default: if (type.isPrimitive()) { String sqlTypeName = convertToSqlTypeName(type); - SqlTypeName typeName = Types.getType(type.getTypeClass()) == Types.BINARY_STRING ? - SqlTypeName.get(sqlTypeName, ((BinaryStringType) type).getPrecision()): - SqlTypeName.get(sqlTypeName); + SqlTypeName typeName = Types.getType(type.getTypeClass()) == Types.BINARY_STRING + ? SqlTypeName.get(sqlTypeName, ((BinaryStringType) type).getPrecision()) + : SqlTypeName.get(sqlTypeName); return typeFactory.createTypeWithNullability(typeFactory.createSqlType(typeName), isNullable); } else { throw new GeaFlowDSLException("Not support type: " + type); diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java index c86800dbe..53e0a4388 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java @@ -19,13 +19,13 @@ package org.apache.geaflow.dsl.runtime.function.table; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.PriorityQueue; import org.apache.geaflow.dsl.common.data.Row; import org.apache.geaflow.dsl.common.function.FunctionContext; import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator; -import com.google.common.collect.Lists; -import java.util.List; -import java.util.PriorityQueue; public class OrderByHeapSort implements OrderByFunction { diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java index 7669b06c3..515d2c48a 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java @@ -19,12 +19,12 @@ package org.apache.geaflow.dsl.runtime.function.table; +import java.util.ArrayList; +import java.util.List; import org.apache.geaflow.dsl.common.data.Row; import org.apache.geaflow.dsl.common.function.FunctionContext; -import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; import org.apache.geaflow.dsl.runtime.function.table.order.MultiFieldRadixSort; -import java.util.ArrayList; -import java.util.List; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; public class OrderByRadixSort implements OrderByFunction { diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java index 7da590570..6b4e216d2 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java @@ -19,12 +19,12 @@ package org.apache.geaflow.dsl.runtime.function.table; +import java.util.ArrayList; +import java.util.List; import org.apache.geaflow.dsl.common.data.Row; import org.apache.geaflow.dsl.common.function.FunctionContext; import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator; -import java.util.ArrayList; -import java.util.List; public class OrderByTimSort implements OrderByFunction { diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java index 007f82321..e041e43f7 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java @@ -19,16 +19,15 @@ package org.apache.geaflow.dsl.runtime.function.table.order; +import java.util.ArrayList; import java.util.Collections; - -import org.apache.geaflow.common.type.IType; -import org.apache.geaflow.dsl.common.data.Row; -import java.util.List; -import org.apache.geaflow.common.type.Types; import java.util.IntSummaryStatistics; +import java.util.List; import java.util.Objects; -import java.util.ArrayList; import org.apache.geaflow.common.binary.BinaryString; +import org.apache.geaflow.common.type.IType; +import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.dsl.common.data.Row; public class MultiFieldRadixSort { @@ -67,7 +66,6 @@ private static void radixSortByIntField(List data, .filter(obj -> obj instanceof Number) // Make sure it is a numeric type. .mapToInt(obj -> ((Number) obj).intValue()) .summaryStatistics(); - System.out.println("═══════════field.expression.evaluate(item)->number════════════"); int max = 0, min = 0; if (stats.getCount() > 0) { max = stats.getMax(); @@ -119,7 +117,7 @@ private static void countingSortByDigit(List data, for (int i = 0; i < n; i++) { values[i] = (Integer) field.expression.evaluate(data.get(i)); } - System.out.println("═══════════field.expression.evaluate(item)->Integer════════════"); + // Count the number of times each number appears. for (int i = 0; i < n; i++) { int digit = (values[i] + offset) / exp % 10; diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java index d878c1faf..c61793490 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.geaflow.common.type.IType; -import org.apache.geaflow.common.type.Types; import org.apache.geaflow.common.type.primitive.BinaryStringType; +import org.apache.geaflow.common.type.Types; public class SortInfo implements Serializable { From 3547a1ad6e786bd54e56461c36579bc5598c419e Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Wed, 27 Aug 2025 15:02:16 +0800 Subject: [PATCH 03/11] Optimizing the efficiency of the radix sort algorithm --- .../table/order/MultiFieldRadixSort.java | 217 ++++++++++-------- .../function/table/order/SortInfo.java | 10 +- 2 files changed, 125 insertions(+), 102 deletions(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java index e041e43f7..574db45a1 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java @@ -19,154 +19,169 @@ package org.apache.geaflow.dsl.runtime.function.table.order; -import java.util.ArrayList; -import java.util.Collections; -import java.util.IntSummaryStatistics; import java.util.List; -import java.util.Objects; import org.apache.geaflow.common.binary.BinaryString; -import org.apache.geaflow.common.type.IType; -import org.apache.geaflow.common.type.Types; import org.apache.geaflow.dsl.common.data.Row; public class MultiFieldRadixSort { + + private static int dataSize; + + private static int[] intValues, sortedIntValues, charCodes; + + private static byte[] digits; + + private static String[] stringValues, sortedStringValues; + + private static Row[] srcData, dstData; /** * Multi-field radix sort. */ - public static void multiFieldRadixSort(List data, - SortInfo sortInfo) { - if (data == null || data.size() <= 1) return; - + public static void multiFieldRadixSort(List data, SortInfo sortInfo) { + dataSize = data.size(); + if (data == null || dataSize <= 1) return; + + // Init arrays. + intValues = new int[dataSize]; + sortedIntValues = new int[dataSize]; + charCodes = new int[dataSize]; + digits = new byte[dataSize]; + stringValues = new String[dataSize]; + sortedStringValues = new String[dataSize]; + srcData = data.toArray(new Row[0]); + dstData = new Row[dataSize]; + // Sort by field with the lowest priority. List fields = sortInfo.orderByFields; for (int i = fields.size() - 1; i >= 0; i--) { OrderByField field = fields.get(i); - IType orderType = field.expression.getOutputType(); - if (Types.getType(orderType.getTypeClass()) == Types.INTEGER) { + if (field.expression.getOutputType().getTypeClass() == Integer.class) { radixSortByIntField(data, field); - } else if (Types.getType(orderType.getTypeClass()) == Types.BINARY_STRING) { + } else { radixSortByStringField(data, field); } + for (int j = 0; j < dataSize; j++) { + data.set(j, srcData[j]); + } } } - + /** * Radix sort by integer field. */ - private static void radixSortByIntField(List data, - OrderByField field) { - if (data.isEmpty()) return; - + private static void radixSortByIntField(List data, OrderByField field) { // Determine the number of digits. - IntSummaryStatistics stats = data.stream() - .map(item -> field.expression.evaluate(item)) - .filter(Objects::nonNull) - .filter(obj -> obj instanceof Number) // Make sure it is a numeric type. - .mapToInt(obj -> ((Number) obj).intValue()) - .summaryStatistics(); - int max = 0, min = 0; - if (stats.getCount() > 0) { - max = stats.getMax(); - min = stats.getMin(); + int max = Integer.MIN_VALUE; + int min = Integer.MAX_VALUE; + boolean hasNull = false; + + for (int i = 0; i < dataSize; i++) { + Integer value = (Integer) field.expression.evaluate(data.get(i)); + if (value != null) { + intValues[i] = value; + max = value > max ? value : max; + min = value < min ? value : min; + } else { + intValues[i] = Integer.MIN_VALUE; + hasNull = true; + } + } + if (hasNull) { + min--; } // Handling negative numbers: Add the offset to all numbers to make them positive. - int offset = min < 0 ? -min : 0; + final int offset = min < 0 ? -min : 0; max += offset; - + + for (int i = 0; i < dataSize; i++) { + if (intValues[i] == Integer.MIN_VALUE) { + intValues[i] = min; + } + intValues[i] += offset; + } + // Bitwise sorting. for (int exp = 1; max / exp > 0; exp *= 10) { - countingSortByDigit(data, field, exp, offset); + for (int j = 0; j < dataSize; j++) { + digits[j] = (byte) (intValues[j] / exp % 10); + } + countingSortByDigit(field.order.value > 0); } } - + /** * Radix sorting by string field. */ - private static void radixSortByStringField(List data, - OrderByField field) { - if (data.isEmpty()) return; + private static void radixSortByStringField(List data, OrderByField field) { + // Precompute all strings to avoid repeated evaluation and toString. + int maxLength = 0; - int maxLength = data.stream() - .map(item -> ((BinaryString)field.expression.evaluate(item)).getLength()) - .filter(Objects::nonNull) - .mapToInt(Integer::intValue) - .max() - .orElse(0); + for (int i = 0; i < dataSize; i++) { + BinaryString binaryString = (BinaryString) field.expression.evaluate(data.get(i)); + stringValues[i] = binaryString != null ? binaryString.toString() : ""; + maxLength = Math.max(maxLength, stringValues[i].length()); + } // Sort from the last digit of the string. for (int pos = maxLength - 1; pos >= 0; pos--) { - countingSortByChar(data, field, pos); + countingSortByChar(field.order.value > 0, pos); } } - + /** * Sort by the specified number of digits (integer). */ - private static void countingSortByDigit(List data, - OrderByField field, - int exp, int offset) { - int n = data.size(); - List output = new ArrayList<>(Collections.nCopies(n, null)); + private static void countingSortByDigit(boolean ascending) { int[] count = new int[10]; - - // Pre-calculate all values to avoid repeated evaluation. - int[] values = new int[n]; - for (int i = 0; i < n; i++) { - values[i] = (Integer) field.expression.evaluate(data.get(i)); - } - + // Count the number of times each number appears. - for (int i = 0; i < n; i++) { - int digit = (values[i] + offset) / exp % 10; - count[digit]++; + for (int i = 0; i < dataSize; i++) { + count[digits[i]]++; } - + // Calculate cumulative count. - if (field.order.value>0){ + if (ascending) { for (int i = 1; i < 10; i++) { count[i] += count[i - 1]; } - }else{ + } else { for (int i = 8; i >= 0; i--) { count[i] += count[i + 1]; } } + // Build the output array from back to front (to ensure stability). - for (int i = n - 1; i >= 0; i--) { - int digit = (values[i] + offset) / exp % 10; - output.set(count[digit] - 1, data.get(i)); - count[digit]--; + for (int i = dataSize - 1; i >= 0; i--) { + int index = --count[digits[i]]; + dstData[index] = srcData[i]; + sortedIntValues[index] = intValues[i]; } + + int[] intTmp = intValues; + intValues = sortedIntValues; + sortedIntValues = intTmp; - // Copy back to the original array. - for (int i = 0; i < n; i++) { - data.set(i, output.get(i)); - } + Row[] rowTmp = srcData; + srcData = dstData; + dstData = rowTmp; } - + /** * Sort by the specified number of digits (string). */ - private static void countingSortByChar(List data, - OrderByField field, - int pos) { - int n = data.size(); - List output = new ArrayList<>(Collections.nCopies(n, null)); - + private static void countingSortByChar(boolean ascending, int pos) { // Precompute all strings and character codes to avoid repeated evaluate and toString. - String[] strings = new String[n]; - int[] charCodes = new int[n]; - int minChar = Integer.MAX_VALUE; int maxChar = Integer.MIN_VALUE; - for (int i = 0; i < n; i++) { - strings[i] = ((BinaryString)field.expression.evaluate(data.get(i))).toString(); - if (pos < strings[i].length()) { - int charCode = strings[i].codePointAt(pos); + for (int i = 0; i < dataSize; i++) { + String value = stringValues[i]; + if (pos < value.length()) { + int charCode = value.codePointAt(pos); + charCodes[i] = charCode; minChar = Math.min(minChar, charCode); maxChar = Math.max(maxChar, charCode); } @@ -174,29 +189,37 @@ private static void countingSortByChar(List data, int range = maxChar - minChar + 2; int[] count = new int[range]; - for (int i = 0; i < n; i++) { - charCodes[i] = pos < strings[i].length() ? strings[i].charAt(pos) : 0; - charCodes[i] = charCodes[i] == 0 ? 0 : charCodes[i] - minChar + 1; + for (int i = 0; i < dataSize; i++) { + if (pos < stringValues[i].length()) { + charCodes[i] -= (minChar - 1); + } else { + charCodes[i] = 0; // null character + } count[charCodes[i]]++; } - - if (field.order.value>0){ + + if (ascending) { for (int i = 1; i < range; i++) { count[i] += count[i - 1]; } - }else{ - for (int i = range-2; i >= 0; i--) { + } else { + for (int i = range - 2; i >= 0; i--) { count[i] += count[i + 1]; } } - - for (int i = n - 1; i >= 0; i--) { - output.set(count[charCodes[i]] - 1, data.get(i)); - count[charCodes[i]]--; + + for (int i = dataSize - 1; i >= 0; i--) { + int index = --count[charCodes[i]]; + dstData[index] = srcData[i]; + sortedStringValues[index] = stringValues[i]; } + + String[] stringTmp = stringValues; + stringValues = sortedStringValues; + sortedStringValues = stringTmp; - for (int i = 0; i < n; i++) { - data.set(i, output.get(i)); - } + Row[] rowTmp = srcData; + srcData = dstData; + dstData = rowTmp; } } diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java index c61793490..42c2b3dc2 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java @@ -23,9 +23,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.geaflow.common.binary.BinaryString; import org.apache.geaflow.common.type.IType; import org.apache.geaflow.common.type.primitive.BinaryStringType; -import org.apache.geaflow.common.type.Types; public class SortInfo implements Serializable { @@ -44,11 +44,11 @@ public boolean isRadixSortable() { for (int i = 0; i < this.orderByFields.size(); i++) { OrderByField field = this.orderByFields.get(i); IType orderType = field.expression.getOutputType(); - if (Types.getType(orderType.getTypeClass()) != Types.INTEGER && Types.getType(orderType.getTypeClass()) != Types.BINARY_STRING) { + if (orderType.getTypeClass() != Integer.class && orderType.getTypeClass() != BinaryString.class) { return false; - } else if (Types.getType(orderType.getTypeClass()) == Types.BINARY_STRING) { - BinaryStringType bsType = (BinaryStringType)orderType; - if (bsType.getPrecision() > 11 || bsType.getPrecision() < 0) { + } else if (orderType.getTypeClass() == BinaryString.class) { + int precision = ((BinaryStringType) orderType).getPrecision(); + if (precision > 11 || precision < 0) { return false; } } From 2fa1ca07cec3c2a3baf8e4ffb4b82b3ff2a2fda6 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Wed, 27 Aug 2025 15:52:21 +0800 Subject: [PATCH 04/11] Max String Precision --- .../geaflow/dsl/runtime/function/table/order/SortInfo.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java index 42c2b3dc2..c90c9e21d 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java @@ -48,7 +48,8 @@ public boolean isRadixSortable() { return false; } else if (orderType.getTypeClass() == BinaryString.class) { int precision = ((BinaryStringType) orderType).getPrecision(); - if (precision > 11 || precision < 0) { + // MongoDB ObjectId: 24-character hexadecimal + if (precision > 24 || precision < 0) { return false; } } From 5fb83d68142972922731c60e33b5d71f976a6651 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Tue, 16 Sep 2025 12:30:12 +0800 Subject: [PATCH 05/11] add tests --- .../dsl/catalog/console/CatalogUtil.java | 6 +- .../function/table/OrderByHeapSort.java | 10 +- .../benchmark/OrderMemoryBenchmark.java | 179 ++++++++++ .../runtime/benchmark/OrderTimeBenchmark.java | 312 ++++++++++++++++++ .../dsl/runtime/plan/StepPlanTest.java | 2 +- .../query/MultiFieldRadixSortTest.java | 297 +++++++++++++++++ .../geaflow/dsl/runtime/query/SortTest.java | 9 + .../src/test/resources/expect/sort_004.txt | 22 ++ .../src/test/resources/query/sort_004.sql | 45 +++ 9 files changed, 875 insertions(+), 7 deletions(-) create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql diff --git a/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java b/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java index e3aa4db61..6f200a179 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java @@ -98,7 +98,7 @@ public static VertexTable convertToVertexTable(String instanceName, VertexModel idFieldName = fieldModel.getName(); } String typeName = convertTypeName(fieldModel.getType().name()); - IType fieldType = Types.of(typeName); + IType fieldType = Types.of(typeName, -1); TableField field = new TableField(fieldModel.getName(), fieldType, false); fields.add(field); } @@ -154,7 +154,7 @@ public static EdgeTable convertToEdgeTable(String instanceName, EdgeModel model) default: } String typeName = convertTypeName(fieldModel.getType().name()); - IType fieldType = Types.of(typeName); + IType fieldType = Types.of(typeName, -1); TableField field = new TableField(fieldModel.getName(), fieldType, false); fields.add(field); } @@ -237,7 +237,7 @@ private static List convertToTableField(List fieldModels List fields = new ArrayList<>(fieldModels.size()); for (FieldModel fieldModel : fieldModels) { String typeName = convertTypeName(fieldModel.getType().name()); - IType fieldType = Types.of(typeName); + IType fieldType = Types.of(typeName, -1); TableField field = new TableField(fieldModel.getName(), fieldType, false); fields.add(field); } diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java index 53e0a4388..dc57095ce 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java @@ -19,7 +19,8 @@ package org.apache.geaflow.dsl.runtime.function.table; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.PriorityQueue; import org.apache.geaflow.dsl.common.data.Row; @@ -64,8 +65,11 @@ public void process(Row row) { @Override public Iterable finish() { - List results = Lists.newArrayList(topNQueue.iterator()); - results.sort(topNRowComparator); + List results = new ArrayList<>(); + while (!topNQueue.isEmpty()) { + results.add(topNQueue.remove()); + } + Collections.reverse(results); topNQueue.clear(); return results; } diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java new file mode 100644 index 000000000..572c8bedd --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.runtime.benchmark; + +import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.data.impl.ObjectRow; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction; +import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort; +import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort; +import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort; +import org.apache.geaflow.dsl.runtime.expression.Expression; +import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.profile.GCProfiler; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.SingleShotTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Measurement(iterations = 10) +@Fork(1) +public class OrderMemoryBenchmark { + + @Param({"10000", "100000", "1000000"}) + private int dataSize; + + @Param({"100", "1000", "10000"}) + private int topN; + + private OrderByFunction orderByFunction; + private List testData; + private SortInfo sortInfo = new SortInfo(); + + @Setup(Level.Trial) + public void setup() { + // Create sort expression + Expression expression = new FieldExpression(0, Types.INTEGER); + + OrderByField orderByField = new OrderByField(); + orderByField.expression = expression; + orderByField.order = ORDER.ASC; + + List orderByFields = new ArrayList<>(1); + orderByFields.add(orderByField); + + sortInfo.orderByFields = orderByFields; + sortInfo.fetch = topN; + + // Generate test data + testData = generateTestData(); + } + + private List generateTestData() { + List data = new ArrayList<>(dataSize); + Random random = new Random(42); + + for (int i = 0; i < dataSize; i++) { + Object[] values = {random.nextInt(dataSize * 10)}; + data.add(ObjectRow.create(values)); + } + + return data; + } + + @Benchmark + public Iterable benchmarkHeapSortMemory() { + // Create a copy of the input data to avoid state pollution + List inputData = new ArrayList<>(testData); + + orderByFunction = new OrderByHeapSort(sortInfo); + orderByFunction.open(null); + + for (int i = 0; i < dataSize; i++) { + orderByFunction.process(inputData.get(i)); + } + + // Perform Top-N sorting + return orderByFunction.finish(); + } + + @Benchmark + public Iterable benchmarkRadixSortMemory() { + List inputData = new ArrayList<>(testData); + + orderByFunction = new OrderByRadixSort(sortInfo); + orderByFunction.open(null); + + for (int i = 0; i < dataSize; i++) { + orderByFunction.process(inputData.get(i)); + } + + return orderByFunction.finish(); + } + + @Benchmark + public Iterable benchmarkTimSortMemory() { + List inputData = new ArrayList<>(testData); + + orderByFunction = new OrderByTimSort(sortInfo); + orderByFunction.open(null); + + for (int i = 0; i < dataSize; i++) { + orderByFunction.process(inputData.get(i)); + } + + return orderByFunction.finish(); + } + + public static void main(String[] args) throws RunnerException { + // Run a verification first + OrderMemoryBenchmark benchmark = new OrderMemoryBenchmark(); + benchmark.dataSize = 10; + benchmark.topN = 10; + benchmark.setup(); + Iterable heapResults = benchmark.benchmarkHeapSortMemory(); + System.out.println("===HEAP_SORT==="); + for (Row result: heapResults) { + System.out.print(result); + } + System.out.println(); + System.out.println("===RADIX_SORT==="); + Iterable radixResults = benchmark.benchmarkRadixSortMemory(); + for (Row result: radixResults) { + System.out.print(result); + } + System.out.println(); + System.out.println("===TIM_SORT==="); + Iterable timResults = benchmark.benchmarkTimSortMemory(); + for (Row result: timResults) { + System.out.print(result); + } + System.out.println(); + + String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()); + String resultFile = "target/benchmark-results/memory-" + timestamp + ".json"; + + Options opt = new OptionsBuilder() + .include(OrderMemoryBenchmark.class.getSimpleName()) + .addProfiler(GCProfiler.class) + .jvmArgs("-Xms2g", "-Xmx4g") + .result(resultFile) + .resultFormat(ResultFormatType.JSON) + .build(); + + new Runner(opt).run(); + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java new file mode 100644 index 000000000..331e8e55d --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.runtime.benchmark; + +import org.apache.geaflow.common.binary.BinaryString; +import org.apache.geaflow.common.type.IType; +import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.data.impl.ObjectRow; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction; +import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort; +import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort; +import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort; +import org.apache.geaflow.dsl.runtime.expression.Expression; +import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(2) +public class OrderTimeBenchmark { + private static final String CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + @Param({"1000"}) + private int dataSize; + + @Param({"1000"}) + private int topN; + + @Param({"RANDOM", "SORTED", "REVERSE_SORTED", "PARTIAL_SORTED", "DUPLICATED"}) + private String dataPattern; + + @Param({"STRING"}) + private String dataType; + + private OrderByFunction orderByFunction; + private List testData; + private SortInfo sortInfo = new SortInfo(); + + @Setup(Level.Trial) + public void setupBenchmark() { + // Create sort expression + setupOrderByExpressions(); + + // Generate test data + testData = generateTestData(); + } + + private void setupOrderByExpressions() { + IType fieldType; + switch (dataType) { + case "INTEGER": + fieldType = Types.INTEGER; + break; + case "DOUBLE": + fieldType = Types.DOUBLE; + break; + case "STRING": + fieldType = Types.BINARY_STRING; + break; + default: + fieldType = Types.INTEGER; + } + + List orderByFields = new ArrayList<>(2); + + // Primary sort field + Expression expression1 = new FieldExpression(0, fieldType); + OrderByField orderByField1 = new OrderByField(); + orderByField1.expression = expression1; + orderByField1.order = ORDER.ASC; + orderByFields.add(orderByField1); + + // Add a secondary sort field (for testing multi-field sorting performance) + Expression expression2 = new FieldExpression(1, Types.INTEGER); + OrderByField orderByField2 = new OrderByField(); + orderByField2.expression = expression2; + orderByField2.order = ORDER.ASC; + orderByFields.add(orderByField2); + + sortInfo.orderByFields = orderByFields; + sortInfo.fetch = topN; + } + + private List generateTestData() { + List data = new ArrayList<>(dataSize); + Random random = new Random(42); // Fixed seeds ensure reproducibility + + for (int i = 0; i < dataSize; i++) { + Object[] values = new Object[2]; + + // Generate the value of the primary sort field + switch (dataType) { + case "INTEGER": + values[0] = generateIntegerValue(i, random); + break; + case "DOUBLE": + values[0] = generateDoubleValue(i, random); + break; + case "STRING": + values[0] = BinaryString.fromString(generateStringValue(i, random)); + break; + } + + // Generate the value of the secondary sort field + values[1] = random.nextInt(100); + + data.add(ObjectRow.create(values)); + } + + return data; + } + + private Integer generateIntegerValue(int index, Random random) { + switch (dataPattern) { + case "RANDOM": + return random.nextInt(dataSize * 10); + case "SORTED": + return index; + case "REVERSE_SORTED": + return dataSize - index; + case "PARTIAL_SORTED": + // 70% ordered, 30% random + return index < dataSize * 0.7 ? index : random.nextInt(dataSize); + case "DUPLICATED": + // Generate a large number of repeated values + return random.nextInt(dataSize / 10); + default: + return random.nextInt(dataSize); + } + } + + private Double generateDoubleValue(int index, Random random) { + switch (dataPattern) { + case "RANDOM": + return random.nextDouble() * dataSize * 10; + case "SORTED": + return (double) index + random.nextDouble(); + case "REVERSE_SORTED": + return (double) (dataSize - index) + random.nextDouble(); + case "PARTIAL_SORTED": + return index < dataSize * 0.7 ? + (double) index + random.nextDouble() : + random.nextDouble() * dataSize; + case "DUPLICATED": + return (double) (random.nextInt(dataSize / 10)) + random.nextDouble(); + default: + return random.nextDouble() * dataSize; + } + } + + private String generateStringValue(int index, Random random) { + String[] prefixes = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}; + + switch (dataPattern) { + case "RANDOM": + return generateRandomString(1, 101, random); + case "SORTED": + return String.format("R%0100d", index); + case "REVERSE_SORTED": + return String.format("R%0100d", dataSize - index); + case "PARTIAL_SORTED": + return index < dataSize * 0.7 ? + String.format("R%0100d", index) : + generateRandomString(1, 101, random); + case "DUPLICATED": + return prefixes[random.nextInt(3)] + + String.format("%0100d", random.nextInt(dataSize / 10)); + default: + return String.format("R%0100d", random.nextInt(dataSize)); + } + } + + /** + * Generate a random string of fixed length + */ + private String generateRandomString(int length, Random random) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(CHARS.charAt(random.nextInt(CHARS.length()))); + } + return sb.toString(); + } + + /** + * Generate a random string of variable length + */ + private String generateRandomString(int minLength, int maxLength, Random random) { + int length = minLength + random.nextInt(maxLength - minLength + 1); + return generateRandomString(length, random); + } + + + @Benchmark + public Iterable benchmarkHeapSort() { + // Create a copy of the input data to avoid state pollution + List inputData = new ArrayList<>(testData); + + orderByFunction = new OrderByHeapSort(sortInfo); + orderByFunction.open(null); + + for (int i = 0; i < dataSize; i++) { + orderByFunction.process(inputData.get(i)); + } + + // Perform Top-N sorting + return orderByFunction.finish(); + } + + @Benchmark + public Iterable benchmarkRadixSort() { + List inputData = new ArrayList<>(testData); + + orderByFunction = new OrderByRadixSort(sortInfo); + orderByFunction.open(null); + + for (int i = 0; i < dataSize; i++) { + orderByFunction.process(inputData.get(i)); + } + + return orderByFunction.finish(); + } + + @Benchmark + public Iterable benchmarkTimSort() { + List inputData = new ArrayList<>(testData); + + orderByFunction = new OrderByTimSort(sortInfo); + orderByFunction.open(null); + + for (int i = 0; i < dataSize; i++) { + orderByFunction.process(inputData.get(i)); + } + + return orderByFunction.finish(); + } + + public static void main(String[] args) throws RunnerException { + // Run a verification first + OrderTimeBenchmark benchmark = new OrderTimeBenchmark(); + benchmark.dataSize = 10; + benchmark.topN = 10; + benchmark.dataPattern = "RANDOM"; + benchmark.dataType = "INTEGER"; + benchmark.setupBenchmark(); + Iterable heapResults = benchmark.benchmarkHeapSort(); + System.out.println("===HEAP_SORT==="); + for (Row result: heapResults) { + System.out.print(result); + } + System.out.println(); + System.out.println("===RADIX_SORT==="); + Iterable radixResults = benchmark.benchmarkRadixSort(); + for (Row result: radixResults) { + System.out.print(result); + } + System.out.println(); + System.out.println("===TIM_SORT==="); + Iterable timResults = benchmark.benchmarkTimSort(); + for (Row result: timResults) { + System.out.print(result); + } + System.out.println(); + + String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()); + String resultFile = "target/benchmark-results/time-" + timestamp + ".json"; + + Options opt = new OptionsBuilder() + .include(OrderTimeBenchmark.class.getSimpleName()) + .jvmArgs("-Xms4g", "-Xmx8g", "-XX:+UseG1GC") + .result(resultFile) + .resultFormat(ResultFormatType.JSON) + .build(); + + new Runner(opt).run(); + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java index eb08dd523..82d72af5c 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java @@ -175,7 +175,7 @@ public void testMultiOutputLogicalPlan() { } private GraphSchema createGraph() { - TableField idField = new TableField("id", Types.of("Long"), false); + TableField idField = new TableField("id", Types.of("Long", -1), false); VertexTable vTable = new VertexTable("default", "testV", Collections.singletonList(idField), "id"); GeaFlowGraph graph = new GeaFlowGraph("default", "test", Lists.newArrayList(vTable), new ArrayList<>(), new HashMap<>(), new HashMap<>(), false, false); diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java new file mode 100644 index 000000000..1b2b4502f --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.runtime.query; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import org.apache.geaflow.common.binary.BinaryString; +import org.apache.geaflow.common.type.primitive.BinaryStringType; +import org.apache.geaflow.common.type.primitive.IntegerType; +import org.apache.geaflow.dsl.common.data.impl.ObjectRow; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; +import org.apache.geaflow.dsl.runtime.function.table.order.MultiFieldRadixSort; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.testng.annotations.Test; + +public class MultiFieldRadixSortTest { + + @Test + public void testSortEmptyList() { + List data = new ArrayList<>(); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.ASC; + sortInfo.orderByFields.add(intField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 0); + } + + @Test + public void testSortSingleElement() { + List data = new ArrayList<>(1); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("test")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.ASC; + sortInfo.orderByFields.add(intField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 1); + assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), Integer.valueOf(1)); + } + + @Test + public void testSortByIntegerFieldAscending() { + List data = new ArrayList<>(3); + data.add(ObjectRow.create(new Object[]{3, BinaryString.fromString("c")})); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("a")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("b")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.ASC; + sortInfo.orderByFields.add(intField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 3); + assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), Integer.valueOf(1)); + assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), Integer.valueOf(2)); + assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), Integer.valueOf(3)); + } + + @Test + public void testSortByIntegerFieldDescending() { + List data = new ArrayList<>(3); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("a")})); + data.add(ObjectRow.create(new Object[]{3, BinaryString.fromString("c")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("b")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.DESC; + sortInfo.orderByFields.add(intField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 3); + assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), Integer.valueOf(3)); + assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), Integer.valueOf(2)); + assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), Integer.valueOf(1)); + } + + @Test + public void testSortByStringFieldAscending() { + List data = new ArrayList<>(3); + data.add(ObjectRow.create(new Object[]{3, BinaryString.fromString("zebra")})); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("apple")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("banana")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField stringField = new OrderByField(); + stringField.expression = new FieldExpression(1, BinaryStringType.INSTANCE); + stringField.order = ORDER.ASC; + sortInfo.orderByFields.add(stringField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 3); + assertEquals(((BinaryString)data.get(0).getField(1, BinaryStringType.INSTANCE)).toString(), "apple"); + assertEquals(((BinaryString)data.get(1).getField(1, BinaryStringType.INSTANCE)).toString(), "banana"); + assertEquals(((BinaryString)data.get(2).getField(1, BinaryStringType.INSTANCE)).toString(), "zebra"); + } + + @Test + public void testSortByStringFieldDescending() { + List data = new ArrayList<>(3); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("apple")})); + data.add(ObjectRow.create(new Object[]{3, BinaryString.fromString("zebra")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("banana")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField stringField = new OrderByField(); + stringField.expression = new FieldExpression(1, BinaryStringType.INSTANCE); + stringField.order = ORDER.DESC; + sortInfo.orderByFields.add(stringField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 3); + assertEquals(((BinaryString)data.get(0).getField(1, BinaryStringType.INSTANCE)).toString(), "zebra"); + assertEquals(((BinaryString)data.get(1).getField(1, BinaryStringType.INSTANCE)).toString(), "banana"); + assertEquals(((BinaryString)data.get(2).getField(1, BinaryStringType.INSTANCE)).toString(), "apple"); + } + + @Test + public void testMultiFieldSort() { + List data = new ArrayList<>(4); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("b")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("a")})); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("a")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("b")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(2); + + // First sort by integer field (ascending) + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.ASC; + sortInfo.orderByFields.add(intField); + + // Then sort by string field (ascending) + OrderByField stringField = new OrderByField(); + stringField.expression = new FieldExpression(1, BinaryStringType.INSTANCE); + stringField.order = ORDER.ASC; + sortInfo.orderByFields.add(stringField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 4); + // Expected order: (1, "a"), (1, "b"), (2, "a"), (2, "b") + assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), Integer.valueOf(1)); + assertEquals(((BinaryString)data.get(0).getField(1, BinaryStringType.INSTANCE)).toString(), "a"); + + assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), Integer.valueOf(1)); + assertEquals(((BinaryString)data.get(1).getField(1, BinaryStringType.INSTANCE)).toString(), "b"); + + assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), Integer.valueOf(2)); + assertEquals(((BinaryString)data.get(2).getField(1, BinaryStringType.INSTANCE)).toString(), "a"); + + assertEquals(data.get(3).getField(0, IntegerType.INSTANCE), Integer.valueOf(2)); + assertEquals(((BinaryString)data.get(3).getField(1, BinaryStringType.INSTANCE)).toString(), "b"); + } + + @Test + public void testSortWithNullValues() { + List data = new ArrayList<>(3); + data.add(ObjectRow.create(new Object[]{null, BinaryString.fromString("b")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("a")})); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("c")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.ASC; + sortInfo.orderByFields.add(intField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 3); + // Null values should appear first in ascending order + assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), null); + assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), Integer.valueOf(1)); + assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), Integer.valueOf(2)); + } + + @Test + public void testSortWithNegativeNumbers() { + List data = new ArrayList<>(4); + data.add(ObjectRow.create(new Object[]{-1, BinaryString.fromString("a")})); + data.add(ObjectRow.create(new Object[]{3, BinaryString.fromString("b")})); + data.add(ObjectRow.create(new Object[]{-5, BinaryString.fromString("c")})); + data.add(ObjectRow.create(new Object[]{0, BinaryString.fromString("d")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.ASC; + sortInfo.orderByFields.add(intField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 4); + assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), Integer.valueOf(-5)); + assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), Integer.valueOf(-1)); + assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), Integer.valueOf(0)); + assertEquals(data.get(3).getField(0, IntegerType.INSTANCE), Integer.valueOf(3)); + } + + @Test + public void testSortWithEmptyStrings() { + List data = new ArrayList<>(3); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("")})); + data.add(ObjectRow.create(new Object[]{2, BinaryString.fromString("hello")})); + data.add(ObjectRow.create(new Object[]{3, BinaryString.fromString("a")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField stringField = new OrderByField(); + stringField.expression = new FieldExpression(1, BinaryStringType.INSTANCE); + stringField.order = ORDER.ASC; + sortInfo.orderByFields.add(stringField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 3); + // Empty string should come first + assertEquals(((BinaryString)data.get(0).getField(1, BinaryStringType.INSTANCE)).toString(), ""); + assertEquals(((BinaryString)data.get(1).getField(1, BinaryStringType.INSTANCE)).toString(), "a"); + assertEquals(((BinaryString)data.get(2).getField(1, BinaryStringType.INSTANCE)).toString(), "hello"); + } + + @Test + public void testSortStability() { + List data = new ArrayList<>(3); + // Create rows with same sort key but different secondary values + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("first")})); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("second")})); + data.add(ObjectRow.create(new Object[]{1, BinaryString.fromString("third")})); + + SortInfo sortInfo = new SortInfo(); + sortInfo.orderByFields = new ArrayList<>(1); + OrderByField intField = new OrderByField(); + intField.expression = new FieldExpression(0, IntegerType.INSTANCE); + intField.order = ORDER.ASC; + sortInfo.orderByFields.add(intField); + + MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo); + + assertEquals(data.size(), 3); + // All should have the same integer value + for (Row row : data) { + assertEquals(row.getField(0, IntegerType.INSTANCE), Integer.valueOf(1)); + } + } +} \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java index a0823f199..f5e957862 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java @@ -49,4 +49,13 @@ public void testSort_003() throws Exception { .execute() .checkSinkResult(); } + + @Test + public void testSort_004() throws Exception { + QueryTester + .build() + .withQueryPath("/query/sort_004.sql") + .execute() + .checkSinkResult(); + } } diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt new file mode 100644 index 000000000..0767504c8 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt @@ -0,0 +1,22 @@ +1489576693883,2,2,1,mills +1489576693894,8,10,1,penys +1489576693864,7,7,2,jacks +1489576693844,5,5,3,jacks +1489576693834,4,4,4,mills +1489576693884,1,1,10,hello +1489576693884,1,19,10,hello +1489576693874,8,8,12,jacks +1489576693883,1,20,14,mills +1489576693883,1,21,14,中国 +1489576693854,6,6,15,calls +1489576693883,2,22,15,中国 +1489576693184,7,11,22,penys +1489576693822,3,3,30,mills +1489576693884,9,9,31,penys +1489576693284,6,12,33,penys +1489576693864,5,13,44,jacks +1489576693874,4,14,51,jacks +1489576693884,3,15,61,penys +1489576693894,2,16,71,penys +1489576693184,1,17,81,penys +1489576693284,5,18,91,penys \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql new file mode 100644 index 000000000..eae350ab2 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +CREATE TABLE orders( + createTime bigint, + productId bigint, + orderId bigint, + units int, + user_name VARCHAR(8) +) WITH ( + type='file', + geaflow.dsl.file.path = 'resource:///data/orders.txt' +); + +CREATE TABLE tbl_result ( + createTime bigint, + productId bigint, + orderId bigint, + units int, + user_name VARCHAR(8) +) WITH ( + type='file', + geaflow.dsl.file.path='${target}' +); + +INSERT INTO tbl_result +SELECT * +FROM orders o +ORDER BY units, user_name From 9e8d9fe4f39afd2e7ccdb11d72cf9eabeba5ad32 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Mon, 22 Sep 2025 19:07:39 +0800 Subject: [PATCH 06/11] modify pom --- geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml | 11 +++++++++++ geaflow/geaflow-dsl/pom.xml | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml index de11b9c2a..e82194a7f 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml @@ -116,6 +116,17 @@ testng + + org.openjdk.jmh + jmh-core + test + + + org.openjdk.jmh + jmh-generator-annprocess + test + + org.apache.kafka diff --git a/geaflow/geaflow-dsl/pom.xml b/geaflow/geaflow-dsl/pom.xml index b8d58db2f..f7bb8be61 100644 --- a/geaflow/geaflow-dsl/pom.xml +++ b/geaflow/geaflow-dsl/pom.xml @@ -44,7 +44,7 @@ - 1.18.0-geaflow_1.0 + 1.18.0-geaflow_1.1 2.4.1 2.11 2.1.214 From 0888bf8e55102714396bb86f6e3d25bdfa4a1a49 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Wed, 24 Sep 2025 17:29:23 +0800 Subject: [PATCH 07/11] fix style --- .../src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java index ddc8578eb..dc66526a1 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java @@ -28,8 +28,8 @@ import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.geaflow.common.type.IType; -import org.apache.geaflow.common.type.primitive.BinaryStringType; import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.common.type.primitive.BinaryStringType; import org.apache.geaflow.dsl.calcite.EdgeRecordType; import org.apache.geaflow.dsl.calcite.GraphRecordType; import org.apache.geaflow.dsl.calcite.PathRecordType; From d3eb972093279aaeec6326274b8f1f6fc9e61b6b Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Wed, 24 Sep 2025 18:32:50 +0800 Subject: [PATCH 08/11] fix style2 --- .../table/order/MultiFieldRadixSort.java | 20 ++++++-- .../benchmark/OrderMemoryBenchmark.java | 23 +++++---- .../runtime/benchmark/OrderTimeBenchmark.java | 47 +++++++++---------- 3 files changed, 47 insertions(+), 43 deletions(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java index 574db45a1..9a7a9b8c0 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java @@ -27,20 +27,30 @@ public class MultiFieldRadixSort { private static int dataSize; - private static int[] intValues, sortedIntValues, charCodes; + private static int[] intValues; + + private static int[] sortedIntValues; + + private static int[] charCodes; private static byte[] digits; - private static String[] stringValues, sortedStringValues; + private static String[] stringValues; + + private static String[] sortedStringValues; + + private static Row[] srcData; + + private static Row[] dstData; - private static Row[] srcData, dstData; - /** * Multi-field radix sort. */ public static void multiFieldRadixSort(List data, SortInfo sortInfo) { dataSize = data.size(); - if (data == null || dataSize <= 1) return; + if (data == null || dataSize <= 1) { + return; + } // Init arrays. intValues = new int[dataSize]; diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java index 572c8bedd..94a8a6fd6 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java @@ -19,18 +19,24 @@ package org.apache.geaflow.dsl.runtime.benchmark; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.geaflow.common.type.Types; import org.apache.geaflow.dsl.common.data.Row; import org.apache.geaflow.dsl.common.data.impl.ObjectRow; -import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; -import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; -import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.apache.geaflow.dsl.runtime.expression.Expression; +import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction; import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort; import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort; import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort; -import org.apache.geaflow.dsl.runtime.expression.Expression; -import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.profile.GCProfiler; import org.openjdk.jmh.results.format.ResultFormatType; @@ -39,13 +45,6 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Random; -import java.util.concurrent.TimeUnit; - @BenchmarkMode(Mode.SingleShotTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java index 331e8e55d..467c18f75 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java @@ -19,20 +19,26 @@ package org.apache.geaflow.dsl.runtime.benchmark; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.geaflow.common.binary.BinaryString; import org.apache.geaflow.common.type.IType; import org.apache.geaflow.common.type.Types; import org.apache.geaflow.dsl.common.data.Row; import org.apache.geaflow.dsl.common.data.impl.ObjectRow; -import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; -import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; -import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; +import org.apache.geaflow.dsl.runtime.expression.Expression; +import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction; import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort; import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort; import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort; -import org.apache.geaflow.dsl.runtime.expression.Expression; -import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField; +import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER; +import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.results.format.ResultFormatType; import org.openjdk.jmh.runner.Runner; @@ -40,13 +46,6 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Random; -import java.util.concurrent.TimeUnit; - @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) @@ -135,6 +134,8 @@ private List generateTestData() { case "STRING": values[0] = BinaryString.fromString(generateStringValue(i, random)); break; + default: + return data; } // Generate the value of the secondary sort field @@ -174,9 +175,9 @@ private Double generateDoubleValue(int index, Random random) { case "REVERSE_SORTED": return (double) (dataSize - index) + random.nextDouble(); case "PARTIAL_SORTED": - return index < dataSize * 0.7 ? - (double) index + random.nextDouble() : - random.nextDouble() * dataSize; + return index < dataSize * 0.7 + ? (double) index + random.nextDouble() + : random.nextDouble() * dataSize; case "DUPLICATED": return (double) (random.nextInt(dataSize / 10)) + random.nextDouble(); default: @@ -195,20 +196,17 @@ private String generateStringValue(int index, Random random) { case "REVERSE_SORTED": return String.format("R%0100d", dataSize - index); case "PARTIAL_SORTED": - return index < dataSize * 0.7 ? - String.format("R%0100d", index) : - generateRandomString(1, 101, random); + return index < dataSize * 0.7 + ? String.format("R%0100d", index) + : generateRandomString(1, 101, random); case "DUPLICATED": - return prefixes[random.nextInt(3)] + - String.format("%0100d", random.nextInt(dataSize / 10)); + return prefixes[random.nextInt(3)] + + String.format("%0100d", random.nextInt(dataSize / 10)); default: return String.format("R%0100d", random.nextInt(dataSize)); } } - /** - * Generate a random string of fixed length - */ private String generateRandomString(int length, Random random) { StringBuilder sb = new StringBuilder(length); for (int i = 0; i < length; i++) { @@ -217,9 +215,6 @@ private String generateRandomString(int length, Random random) { return sb.toString(); } - /** - * Generate a random string of variable length - */ private String generateRandomString(int minLength, int maxLength, Random random) { int length = minLength + random.nextInt(maxLength - minLength + 1); return generateRandomString(length, random); From 4a8e18dc681798a465ca266ae9ef8aecc2bd5b77 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Wed, 24 Sep 2025 21:20:13 +0800 Subject: [PATCH 09/11] fix style2 --- .../geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java index 467c18f75..96bdf77e8 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java @@ -176,8 +176,8 @@ private Double generateDoubleValue(int index, Random random) { return (double) (dataSize - index) + random.nextDouble(); case "PARTIAL_SORTED": return index < dataSize * 0.7 - ? (double) index + random.nextDouble() - : random.nextDouble() * dataSize; + ? (double) index + random.nextDouble() + : random.nextDouble() * dataSize; case "DUPLICATED": return (double) (random.nextInt(dataSize / 10)) + random.nextDouble(); default: From 74054737aa1fa4cd6b9f428f734b365b5cee24f9 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Fri, 26 Sep 2025 18:18:43 +0800 Subject: [PATCH 10/11] refractor equals for BinaryStringType --- .../common/type/primitive/BinaryStringType.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java index 609018104..d9817f073 100644 --- a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java +++ b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java @@ -41,6 +41,23 @@ public int getPrecision() { return precision; } + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + return true; + } + @Override public String getName() { return Types.TYPE_NAME_BINARY_STRING; From ad182ee0a9107f3a7781214b8f516df63ac6bae1 Mon Sep 17 00:00:00 2001 From: ftljk <2510317228@qq.com> Date: Wed, 1 Oct 2025 16:37:03 +0800 Subject: [PATCH 11/11] concurrency-safe sorting --- .../table/order/MultiFieldRadixSort.java | 205 ++++++++++-------- 1 file changed, 117 insertions(+), 88 deletions(-) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java index 9a7a9b8c0..223ebecb3 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java @@ -25,76 +25,87 @@ public class MultiFieldRadixSort { - private static int dataSize; - - private static int[] intValues; - - private static int[] sortedIntValues; - - private static int[] charCodes; - - private static byte[] digits; - - private static String[] stringValues; - - private static String[] sortedStringValues; - - private static Row[] srcData; - - private static Row[] dstData; + private static final ThreadLocal dataSize = new ThreadLocal<>(); + private static final ThreadLocal intValues = new ThreadLocal<>(); + private static final ThreadLocal sortedIntValues = new ThreadLocal<>(); + private static final ThreadLocal charCodes = new ThreadLocal<>(); + private static final ThreadLocal digits = new ThreadLocal<>(); + private static final ThreadLocal stringValues = new ThreadLocal<>(); + private static final ThreadLocal sortedStringValues = new ThreadLocal<>(); + private static final ThreadLocal srcData = new ThreadLocal<>(); + private static final ThreadLocal dstData = new ThreadLocal<>(); /** * Multi-field radix sort. */ public static void multiFieldRadixSort(List data, SortInfo sortInfo) { - dataSize = data.size(); - if (data == null || dataSize <= 1) { + if (data == null || data.size() <= 1) { return; } - - // Init arrays. - intValues = new int[dataSize]; - sortedIntValues = new int[dataSize]; - charCodes = new int[dataSize]; - digits = new byte[dataSize]; - stringValues = new String[dataSize]; - sortedStringValues = new String[dataSize]; - srcData = data.toArray(new Row[0]); - dstData = new Row[dataSize]; - - // Sort by field with the lowest priority. - List fields = sortInfo.orderByFields; - - for (int i = fields.size() - 1; i >= 0; i--) { - OrderByField field = fields.get(i); - if (field.expression.getOutputType().getTypeClass() == Integer.class) { - radixSortByIntField(data, field); - } else { - radixSortByStringField(data, field); + int size = data.size(); + + try { + dataSize.set(size); + intValues.set(new int[size]); + sortedIntValues.set(new int[size]); + charCodes.set(new int[size]); + digits.set(new byte[size]); + stringValues.set(new String[size]); + sortedStringValues.set(new String[size]); + srcData.set(data.toArray(new Row[0])); + dstData.set(new Row[size]); + + // Sort by field with the lowest priority. + List fields = sortInfo.orderByFields; + + for (int i = fields.size() - 1; i >= 0; i--) { + OrderByField field = fields.get(i); + if (field.expression.getOutputType().getTypeClass() == Integer.class) { + radixSortByIntField(field); + } else { + radixSortByStringField(field); + } } - for (int j = 0; j < dataSize; j++) { - data.set(j, srcData[j]); + + Row[] finalData = srcData.get(); + for (int j = 0; j < size; j++) { + data.set(j, finalData[j]); } + } finally { + dataSize.remove(); + intValues.remove(); + sortedIntValues.remove(); + charCodes.remove(); + digits.remove(); + stringValues.remove(); + sortedStringValues.remove(); + srcData.remove(); + dstData.remove(); } } /** * Radix sort by integer field. */ - private static void radixSortByIntField(List data, OrderByField field) { + private static void radixSortByIntField(OrderByField field) { + int size = dataSize.get(); + int[] intVals = intValues.get(); + byte[] digs = digits.get(); + Row[] src = srcData.get(); + // Determine the number of digits. int max = Integer.MIN_VALUE; int min = Integer.MAX_VALUE; boolean hasNull = false; - for (int i = 0; i < dataSize; i++) { - Integer value = (Integer) field.expression.evaluate(data.get(i)); + for (int i = 0; i < size; i++) { + Integer value = (Integer) field.expression.evaluate(src[i]); if (value != null) { - intValues[i] = value; + intVals[i] = value; max = value > max ? value : max; min = value < min ? value : min; } else { - intValues[i] = Integer.MIN_VALUE; + intVals[i] = Integer.MIN_VALUE; hasNull = true; } } @@ -106,17 +117,17 @@ private static void radixSortByIntField(List data, OrderByField field) { final int offset = min < 0 ? -min : 0; max += offset; - for (int i = 0; i < dataSize; i++) { - if (intValues[i] == Integer.MIN_VALUE) { - intValues[i] = min; + for (int i = 0; i < size; i++) { + if (intVals[i] == Integer.MIN_VALUE) { + intVals[i] = min; } - intValues[i] += offset; + intVals[i] += offset; } // Bitwise sorting. for (int exp = 1; max / exp > 0; exp *= 10) { - for (int j = 0; j < dataSize; j++) { - digits[j] = (byte) (intValues[j] / exp % 10); + for (int j = 0; j < size; j++) { + digs[j] = (byte) (intVals[j] / exp % 10); } countingSortByDigit(field.order.value > 0); } @@ -125,14 +136,18 @@ private static void radixSortByIntField(List data, OrderByField field) { /** * Radix sorting by string field. */ - private static void radixSortByStringField(List data, OrderByField field) { + private static void radixSortByStringField(OrderByField field) { + int size = dataSize.get(); + String[] strVals = stringValues.get(); + Row[] src = srcData.get(); + // Precompute all strings to avoid repeated evaluation and toString. int maxLength = 0; - for (int i = 0; i < dataSize; i++) { - BinaryString binaryString = (BinaryString) field.expression.evaluate(data.get(i)); - stringValues[i] = binaryString != null ? binaryString.toString() : ""; - maxLength = Math.max(maxLength, stringValues[i].length()); + for (int i = 0; i < size; i++) { + BinaryString binaryString = (BinaryString) field.expression.evaluate(src[i]); + strVals[i] = binaryString != null ? binaryString.toString() : ""; + maxLength = Math.max(maxLength, strVals[i].length()); } // Sort from the last digit of the string. @@ -145,11 +160,18 @@ private static void radixSortByStringField(List data, OrderByField field) { * Sort by the specified number of digits (integer). */ private static void countingSortByDigit(boolean ascending) { + int size = dataSize.get(); + byte[] digs = digits.get(); + int[] intVals = intValues.get(); + int[] sortedIntVals = sortedIntValues.get(); + Row[] src = srcData.get(); + Row[] dst = dstData.get(); + int[] count = new int[10]; // Count the number of times each number appears. - for (int i = 0; i < dataSize; i++) { - count[digits[i]]++; + for (int i = 0; i < size; i++) { + count[digs[i]]++; } // Calculate cumulative count. @@ -164,34 +186,41 @@ private static void countingSortByDigit(boolean ascending) { } // Build the output array from back to front (to ensure stability). - for (int i = dataSize - 1; i >= 0; i--) { - int index = --count[digits[i]]; - dstData[index] = srcData[i]; - sortedIntValues[index] = intValues[i]; + for (int i = size - 1; i >= 0; i--) { + int index = --count[digs[i]]; + dst[index] = src[i]; + sortedIntVals[index] = intVals[i]; } - int[] intTmp = intValues; - intValues = sortedIntValues; - sortedIntValues = intTmp; + int[] intTmp = intVals; + intValues.set(sortedIntVals); + sortedIntValues.set(intTmp); - Row[] rowTmp = srcData; - srcData = dstData; - dstData = rowTmp; + Row[] rowTmp = src; + srcData.set(dst); + dstData.set(rowTmp); } /** * Sort by the specified number of digits (string). */ private static void countingSortByChar(boolean ascending, int pos) { + int size = dataSize.get(); + String[] strVals = stringValues.get(); + String[] sortedStrVals = sortedStringValues.get(); + int[] charCds = charCodes.get(); + Row[] src = srcData.get(); + Row[] dst = dstData.get(); + // Precompute all strings and character codes to avoid repeated evaluate and toString. int minChar = Integer.MAX_VALUE; int maxChar = Integer.MIN_VALUE; - for (int i = 0; i < dataSize; i++) { - String value = stringValues[i]; + for (int i = 0; i < size; i++) { + String value = strVals[i]; if (pos < value.length()) { int charCode = value.codePointAt(pos); - charCodes[i] = charCode; + charCds[i] = charCode; minChar = Math.min(minChar, charCode); maxChar = Math.max(maxChar, charCode); } @@ -199,13 +228,13 @@ private static void countingSortByChar(boolean ascending, int pos) { int range = maxChar - minChar + 2; int[] count = new int[range]; - for (int i = 0; i < dataSize; i++) { - if (pos < stringValues[i].length()) { - charCodes[i] -= (minChar - 1); + for (int i = 0; i < size; i++) { + if (pos < strVals[i].length()) { + charCds[i] -= (minChar - 1); } else { - charCodes[i] = 0; // null character + charCds[i] = 0; // null character } - count[charCodes[i]]++; + count[charCds[i]]++; } if (ascending) { @@ -218,18 +247,18 @@ private static void countingSortByChar(boolean ascending, int pos) { } } - for (int i = dataSize - 1; i >= 0; i--) { - int index = --count[charCodes[i]]; - dstData[index] = srcData[i]; - sortedStringValues[index] = stringValues[i]; + for (int i = size - 1; i >= 0; i--) { + int index = --count[charCds[i]]; + dst[index] = src[i]; + sortedStrVals[index] = strVals[i]; } - String[] stringTmp = stringValues; - stringValues = sortedStringValues; - sortedStringValues = stringTmp; + String[] stringTmp = strVals; + stringValues.set(sortedStrVals); + sortedStringValues.set(stringTmp); - Row[] rowTmp = srcData; - srcData = dstData; - dstData = rowTmp; + Row[] rowTmp = src; + srcData.set(dst); + dstData.set(rowTmp); } -} +} \ No newline at end of file