diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/ClusteredDistribution.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/ClusteredDistribution.java new file mode 100644 index 000000000000..dcc3d191461c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/ClusteredDistribution.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.distributions; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.Expression; + +/** + * A distribution where tuples that share the same values for clustering expressions are co-located + * in the same partition. + * + * @since 3.2.0 + */ +@Experimental +public interface ClusteredDistribution extends Distribution { + /** + * Returns clustering expressions. + */ + Expression[] clustering(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/Distribution.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/Distribution.java new file mode 100644 index 000000000000..95d68ea2d1ab --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/Distribution.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.distributions; + +import org.apache.spark.annotation.Experimental; + +/** + * An interface that defines how data is distributed across partitions. + * + * @since 3.2.0 + */ +@Experimental +public interface Distribution {} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/Distributions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/Distributions.java new file mode 100644 index 000000000000..da5d6f8c81a3 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/Distributions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.distributions; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.SortOrder; + +/** + * Helper methods to create distributions to pass into Spark. + * + * @since 3.2.0 + */ +@Experimental +public class Distributions { + private Distributions() { + } + + /** + * Creates a distribution where no promises are made about co-location of data. + */ + public static UnspecifiedDistribution unspecified() { + return LogicalDistributions.unspecified(); + } + + /** + * Creates a distribution where tuples that share the same values for clustering expressions are + * co-located in the same partition. + */ + public static ClusteredDistribution clustered(Expression[] clustering) { + return LogicalDistributions.clustered(clustering); + } + + /** + * Creates a distribution where tuples have been ordered across partitions according + * to ordering expressions, but not necessarily within a given partition. + */ + public static OrderedDistribution ordered(SortOrder[] ordering) { + return LogicalDistributions.ordered(ordering); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/OrderedDistribution.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/OrderedDistribution.java new file mode 100644 index 000000000000..3456178d8e64 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/OrderedDistribution.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.distributions; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.SortOrder; + +/** + * A distribution where tuples have been ordered across partitions according + * to ordering expressions, but not necessarily within a given partition. + * + * @since 3.2.0 + */ +@Experimental +public interface OrderedDistribution extends Distribution { + /** + * Returns ordering expressions. + */ + SortOrder[] ordering(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/UnspecifiedDistribution.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/UnspecifiedDistribution.java new file mode 100644 index 000000000000..ea18d8906cfd --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/distributions/UnspecifiedDistribution.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.distributions; + +import org.apache.spark.annotation.Experimental; + +/** + * A distribution where no promises are made about co-location of data. + * + * @since 3.2.0 + */ +@Experimental +public interface UnspecifiedDistribution extends Distribution {} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expressions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expressions.java index 791dc969ab00..984de6258f84 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expressions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expressions.java @@ -164,4 +164,15 @@ public static Transform hours(String column) { return LogicalExpressions.hours(Expressions.column(column)); } + /** + * Create a sort expression. + * + * @param expr an expression to produce values to sort + * @param direction direction of the sort + * @param nullOrder null order of the sort + * @return a SortOrder + */ + public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nullOrder) { + return LogicalExpressions.sort(expr, direction, nullOrder); + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/NullOrdering.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/NullOrdering.java new file mode 100644 index 000000000000..669d1c8443b1 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/NullOrdering.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions; + +import org.apache.spark.annotation.Experimental; + +/** + * A null order used in sorting expressions. + * + * @since 3.2.0 + */ +@Experimental +public enum NullOrdering { + NULLS_FIRST, NULLS_LAST; + + @Override + public String toString() { + switch (this) { + case NULLS_FIRST: + return "NULLS FIRST"; + case NULLS_LAST: + return "NULLS LAST"; + default: + throw new IllegalArgumentException("Unexpected null order: " + this); + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/SortDirection.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/SortDirection.java new file mode 100644 index 000000000000..6946032832d1 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/SortDirection.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions; + +import org.apache.spark.annotation.Experimental; + +/** + * A sort direction used in sorting expressions. + * + * @since 3.2.0 + */ +@Experimental +public enum SortDirection { + ASCENDING, DESCENDING; + + @Override + public String toString() { + switch (this) { + case ASCENDING: + return "ASC"; + case DESCENDING: + return "DESC"; + default: + throw new IllegalArgumentException("Unexpected sort direction: " + this); + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/SortOrder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/SortOrder.java new file mode 100644 index 000000000000..72252457df26 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/SortOrder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions; + +import org.apache.spark.annotation.Experimental; + +/** + * Represents a sort order in the public expression API. + * + * @since 3.2.0 + */ +@Experimental +public interface SortOrder extends Expression { + /** + * Returns the sort expression. + */ + Expression expression(); + + /** + * Returns the sort direction. + */ + SortDirection direction(); + + /** + * Returns the null ordering. + */ + NullOrdering nullOrdering(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java new file mode 100644 index 000000000000..91fd02aae883 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.distributions.Distribution; +import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution; +import org.apache.spark.sql.connector.expressions.SortOrder; + +/** + * A write that requires a specific distribution and ordering of data. + * + * @since 3.2.0 + */ +@Experimental +public interface RequiresDistributionAndOrdering extends Write { + /** + * Returns the distribution required by this write. + *
+ * Spark will distribute incoming records across partitions to satisfy the required distribution + * before passing the records to the data source table on write. + *
+ * Implementations may return {@link UnspecifiedDistribution} if they don't require any specific + * distribution of data on write. + * + * @return the required distribution + */ + Distribution requiredDistribution(); + + /** + * Returns the ordering required by this write. + *
+ * Spark will order incoming records within partitions to satisfy the required ordering + * before passing those records to the data source table on write. + *
+ * Implementations may return an empty array if they don't require any specific ordering of data + * on write. + * + * @return the required ordering + */ + SortOrder[] requiredOrdering(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java new file mode 100644 index 000000000000..873680415d44 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.write.streaming.StreamingWrite; + +/** + * A logical representation of a data source write. + *
+ * This logical representation is shared between batch and streaming write. Data sources must + * implement the corresponding methods in this interface to match what the table promises + * to support. For example, {@link #toBatch()} must be implemented if the {@link Table} that + * creates this {@link Write} returns {@link TableCapability#BATCH_WRITE} support in its + * {@link Table#capabilities()}. + * + * @since 3.2.0 + */ +@Evolving +public interface Write { + + /** + * Returns the description associated with this write. + */ + default String description() { + return this.getClass().toString(); + } + + /** + * Returns a {@link BatchWrite} to write data to batch source. By default this method throws + * exception, data sources must overwrite this method to provide an implementation, if the + * {@link Table} that creates this write returns {@link TableCapability#BATCH_WRITE} support in + * its {@link Table#capabilities()}. + */ + default BatchWrite toBatch() { + throw new UnsupportedOperationException(description() + ": Batch write is not supported"); + } + + /** + * Returns a {@link StreamingWrite} to write data to streaming source. By default this method + * throws exception, data sources must overwrite this method to provide an implementation, if the + * {@link Table} that creates this write returns {@link TableCapability#STREAMING_WRITE} support + * in its {@link Table#capabilities()}. + */ + default StreamingWrite toStreaming() { + throw new UnsupportedOperationException(description() + ": Streaming write is not supported"); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java index 5398ca46e977..bf344185118a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java @@ -23,10 +23,10 @@ import org.apache.spark.sql.connector.write.streaming.StreamingWrite; /** - * An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to + * An interface for building the {@link Write}. Implementations can mix in some interfaces to * support different ways to write data to data sources. * - * Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to + * Unless modified by a mixin interface, the {@link Write} configured by this builder is to * append data without affecting existing data. * * @since 3.0.0 @@ -35,22 +35,41 @@ public interface WriteBuilder { /** - * Returns a {@link BatchWrite} to write data to batch source. By default this method throws - * exception, data sources must overwrite this method to provide an implementation, if the - * {@link Table} that creates this write returns {@link TableCapability#BATCH_WRITE} support in - * its {@link Table#capabilities()}. + * Returns a logical {@link Write} shared between batch and streaming. + * + * @since 3.2.0 */ + default Write build() { + return new Write() { + @Override + public BatchWrite toBatch() { + return buildForBatch(); + } + + @Override + public StreamingWrite toStreaming() { + return buildForStreaming(); + } + }; + } + + /** + * Returns a {@link BatchWrite} to write data to batch source. + * + * @deprecated use {@link #build()} instead. + */ + @Deprecated default BatchWrite buildForBatch() { throw new UnsupportedOperationException(getClass().getName() + " does not support batch write"); } /** - * Returns a {@link StreamingWrite} to write data to streaming source. By default this method - * throws exception, data sources must overwrite this method to provide an implementation, if the - * {@link Table} that creates this write returns {@link TableCapability#STREAMING_WRITE} support - * in its {@link Table#capabilities()}. + * Returns a {@link StreamingWrite} to write data to streaming source. + * + * @deprecated use {@link #build()} instead. */ + @Deprecated default StreamingWrite buildForStreaming() { throw new UnsupportedOperationException(getClass().getName() + " does not support streaming write"); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/distributions/distributions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/distributions/distributions.scala new file mode 100644 index 000000000000..599f82b4dc52 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/distributions/distributions.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.distributions + +import org.apache.spark.sql.connector.expressions.{Expression, SortOrder} + +private[sql] object LogicalDistributions { + + def unspecified(): UnspecifiedDistribution = { + UnspecifiedDistributionImpl + } + + def clustered(clustering: Array[Expression]): ClusteredDistribution = { + ClusteredDistributionImpl(clustering) + } + + def ordered(ordering: Array[SortOrder]): OrderedDistribution = { + OrderedDistributionImpl(ordering) + } +} + +private[sql] object UnspecifiedDistributionImpl extends UnspecifiedDistribution { + override def toString: String = "UnspecifiedDistribution" +} + +private[sql] final case class ClusteredDistributionImpl( + clusteringExprs: Seq[Expression]) extends ClusteredDistribution { + + override def clustering: Array[Expression] = clusteringExprs.toArray + + override def toString: String = { + s"ClusteredDistribution(${clusteringExprs.map(_.describe).mkString(", ")})" + } +} + +private[sql] final case class OrderedDistributionImpl( + orderingExprs: Seq[SortOrder]) extends OrderedDistribution { + + override def ordering: Array[SortOrder] = orderingExprs.toArray + + override def toString: String = { + s"OrderedDistribution(${orderingExprs.map(_.describe).mkString(", ")})" + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala index 321ea14d376b..2863d94d198b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala @@ -54,6 +54,13 @@ private[sql] object LogicalExpressions { def days(reference: NamedReference): DaysTransform = DaysTransform(reference) def hours(reference: NamedReference): HoursTransform = HoursTransform(reference) + + def sort( + reference: Expression, + direction: SortDirection, + nullOrdering: NullOrdering): SortOrder = { + SortValue(reference, direction, nullOrdering) + } } /** @@ -110,6 +117,18 @@ private[sql] final case class BucketTransform( } private[sql] object BucketTransform { + def unapply(expr: Expression): Option[(Int, FieldReference)] = expr match { + case transform: Transform => + transform match { + case BucketTransform(n, FieldReference(parts)) => + Some((n, FieldReference(parts))) + case _ => + None + } + case _ => + None + } + def unapply(transform: Transform): Option[(Int, NamedReference)] = transform match { case NamedTransform("bucket", Seq( Lit(value: Int, IntegerType), @@ -170,6 +189,18 @@ private[sql] final case class IdentityTransform( } private[sql] object IdentityTransform { + def unapply(expr: Expression): Option[FieldReference] = expr match { + case transform: Transform => + transform match { + case IdentityTransform(ref) => + Some(ref) + case _ => + None + } + case _ => + None + } + def unapply(transform: Transform): Option[FieldReference] = transform match { case NamedTransform("identity", Seq(Ref(parts))) => Some(FieldReference(parts)) @@ -185,6 +216,18 @@ private[sql] final case class YearsTransform( } private[sql] object YearsTransform { + def unapply(expr: Expression): Option[FieldReference] = expr match { + case transform: Transform => + transform match { + case YearsTransform(ref) => + Some(ref) + case _ => + None + } + case _ => + None + } + def unapply(transform: Transform): Option[FieldReference] = transform match { case NamedTransform("years", Seq(Ref(parts))) => Some(FieldReference(parts)) @@ -200,6 +243,18 @@ private[sql] final case class MonthsTransform( } private[sql] object MonthsTransform { + def unapply(expr: Expression): Option[FieldReference] = expr match { + case transform: Transform => + transform match { + case MonthsTransform(ref) => + Some(ref) + case _ => + None + } + case _ => + None + } + def unapply(transform: Transform): Option[FieldReference] = transform match { case NamedTransform("months", Seq(Ref(parts))) => Some(FieldReference(parts)) @@ -215,6 +270,18 @@ private[sql] final case class DaysTransform( } private[sql] object DaysTransform { + def unapply(expr: Expression): Option[FieldReference] = expr match { + case transform: Transform => + transform match { + case DaysTransform(ref) => + Some(ref) + case _ => + None + } + case _ => + None + } + def unapply(transform: Transform): Option[FieldReference] = transform match { case NamedTransform("days", Seq(Ref(parts))) => Some(FieldReference(parts)) @@ -230,6 +297,18 @@ private[sql] final case class HoursTransform( } private[sql] object HoursTransform { + def unapply(expr: Expression): Option[FieldReference] = expr match { + case transform: Transform => + transform match { + case HoursTransform(ref) => + Some(ref) + case _ => + None + } + case _ => + None + } + def unapply(transform: Transform): Option[FieldReference] = transform match { case NamedTransform("hours", Seq(Ref(parts))) => Some(FieldReference(parts)) @@ -261,3 +340,20 @@ private[sql] object FieldReference { LogicalExpressions.parseReference(column) } } + +private[sql] final case class SortValue( + expression: Expression, + direction: SortDirection, + nullOrdering: NullOrdering) extends SortOrder { + + override def describe(): String = s"$expression $direction $nullOrdering" +} + +private[sql] object SortValue { + def unapply(expr: Expression): Option[(Expression, SortDirection, NullOrdering)] = expr match { + case sort: SortOrder => + Some((sort.expression, sort.direction, sort.nullOrdering)) + case _ => + None + } +}