Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.table.connector.source.abilities;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.partitioning.Partitioning;

/**
* Enables {@link ScanTableSource} to discover source partitions and inform the optimizer
* accordingly.
*
* <p>Partitions split the data stored in an external system into smaller portions that are
* identified by one or more string-based partition keys.
*
* <p>For example, data can be partitioned by region and within a region partitioned by month. The
* order of the partition keys (in the example: first by region then by month) is defined by the
* catalog table. A list of partitions could be:
*
* <pre>
* List(
* ['region'='europe', 'month'='2020-01'],
* ['region'='europe', 'month'='2020-02'],
* ['region'='asia', 'month'='2020-01'],
* ['region'='asia', 'month'='2020-02']
* )
* </pre>
*
* <p>In the above case (data is partitioned w.r.t. region and month) the optimizer might utilize
* this pre-partitioned data source to eliminate possible shuffle operation. For example, for a
* query SELECT region, month, AVG(age) from MyTable GROUP BY region, month the optimizer takes the
* advantage of pre-partitioned source and avoids partitioning the data w.r.t. [region,month]
*/
// TODO -- remove this once this function can be imported from flink libraries
@PublicEvolving
public interface SupportsPartitioning {

/** Returns the output data partitioning that this reader guarantees. */
Partitioning outputPartitioning();

/** Applies partitioned reading to the source operator. */
void applyPartitionedRead();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.table.connector.source.partitioning;

import org.apache.flink.table.expressions.TransformExpression;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

/**
* TODO Consider relaxing this constraint in a future version Preconditions: 1. keys are ordered by
* the partition columns defined in the table schema. 2. the partition values are ordered by the
* values in Row, comparing the values from 1st to last. for example: if a table is partitioned by
* (dt, bucket(128, user_id)) then the partition keys = [dt, bucket(128, user_id)]. It cannot be
* [bucket(128, user_id), dt]. the partition values can be ("2023-10-01", 0), ("2023-10-01", 1),
* ("2023-10-02", 0), ... it cannot be ("2023-10-01", 1), ("2023-10-01", 0), ("2023-10-02", 0), ...
*/
// TODO -- remove this once this function can be imported from flink libraries
public class KeyGroupedPartitioning implements Partitioning {
private final TransformExpression[] keys;
private final int numPartitions;
private final Row[] partitionValues;

// bucket(128, user_id)
// partitioned by (dt, bucket(128, user_id)
// dt=2023-10-01/user_id_bucket=0/ => InternalRow("2023-10-01", 0)

public KeyGroupedPartitioning(
TransformExpression[] keys, Row[] partitionValues, int numPartitions) {
this.keys = keys;
this.numPartitions = numPartitions;
this.partitionValues = partitionValues;
}

/** Returns the partition transform expressions for this partitioning. */
public TransformExpression[] keys() {
return keys;
}

public Row[] getPartitionValues() {
return partitionValues;
}

@Override
public int numPartitions() {
return numPartitions;
}

boolean isPartitionedByKeys(String keysString) {
for (TransformExpression key : keys) {
if (key.getKey().equals(keysString)) {
return true;
}
}
return false;
}

/**
* * Checks if this partitioning is compatible with another KeyGroupedPartitioning. conditions: 1.
* numPartitions is the same 2. keys length is the same and for each key,keys are compatible 3.
* RowData length is the same. values are the same.
*
* @param other the other KeyGroupedPartitioning to check compatibility with
* @return true if compatible, false otherwise
*/
public boolean isCompatible(KeyGroupedPartitioning other) {
if (other == null) {
return false;
}

// 1. Check numPartitions is the same
if (this.numPartitions != other.numPartitions) {
return false;
}

// 2. Check keys length is the same and each key is compatible
if (this.keys.length != other.keys.length) {
return false;
}

for (int i = 0; i < this.keys.length; i++) {
if (!this.keys[i].isCompatible(other.keys[i])) {
return false;
}
}

// 3. Check RowData length and values are the same
if (this.partitionValues.length != other.partitionValues.length) {
return false;
}

for (int i = 0; i < this.partitionValues.length; i++) {
Row thisRow = this.partitionValues[i];
Row otherRow = other.partitionValues[i];

if (thisRow.getArity() != otherRow.getArity()) {
return false;
}

for (int j = 0; j < thisRow.getArity(); j++) {
// filed in row cannot be null
Preconditions.checkArgument(thisRow.getField(j) != null);
if (!thisRow.getField(j).equals(otherRow.getField(j))) {
return false;
}
}
}

return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.table.connector.source.partitioning;

import org.apache.flink.annotation.PublicEvolving;

// TODO -- remove this once this function can be imported from flink libraries
@PublicEvolving
public interface Partitioning {
/** Returns the number of partitions that the data is split across. */
int numPartitions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.table.expressions;

import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;

/**
* Represents a transform expression that can be used for partitioning or other transformations. It
* consists of a key, an optional function name, and an optional number of buckets.
*/
// TODO -- remove this once this function can be imported from flink libraries
@PublicEvolving
public class TransformExpression {
private final String key;
private final Optional<String> functionName;
private final Optional<Integer> numBucketsOpt;

/**
* Creates a new TransformExpression with the given key, function name, and number of buckets.
*
* @param key the key to be transformed
* @param functionName the name of the transform function, can be null
* @param numBuckets the number of buckets for bucket transforms, can be null
*/
public TransformExpression(
@Nonnull String key, @Nullable String functionName, @Nullable Integer numBuckets) {
this.key = Preconditions.checkNotNull(key, "Key must not be null");
this.functionName = Optional.ofNullable(functionName);
this.numBucketsOpt = Optional.ofNullable(numBuckets);
}

/**
* Returns the key to be transformed.
*
* @return the key
*/
public String getKey() {
return key;
}

/**
* Returns the name of the transform function, if present.
*
* @return the function name, or empty if not set
*/
public Optional<String> getFunctionName() {
return functionName;
}

/**
* Returns the number of buckets if this is a bucket transform, or empty otherwise.
*
* @return the number of buckets, or empty if not a bucket transform
*/
public Optional<Integer> getNumBucketsOpt() {
return numBucketsOpt;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TransformExpression that = (TransformExpression) o;
return key.equals(that.key)
&& functionName.equals(that.functionName)
&& numBucketsOpt.equals(that.numBucketsOpt);
}

@Override
public int hashCode() {
return Objects.hash(key, functionName, numBucketsOpt);
}

@Override
public String toString() {
if (functionName.isPresent()) {
StringBuilder builder =
new StringBuilder().append(functionName.get()).append("(").append(key);
if (numBucketsOpt.isPresent()) {
builder.append(", ").append(numBucketsOpt.get());
}
return builder.append(")").toString();
}
return key;
}

/**
* * Checks if this TransformExpression is compatible with another TransformExpression.
* Compatibility is defined by having the same function name and number of buckets. examples: -
* bucket(128, user_id) is compatible with bucket(64, user_id) - year(dt) is compatible with
* year(dt) but not compatible with month(dt)
*/
public boolean isCompatible(TransformExpression other) {
return this.functionName.equals(other.functionName)
&& this.numBucketsOpt.equals(other.numBucketsOpt);
}
}
Loading