diff --git a/clients/pom.xml b/clients/pom.xml index 52d29981c6f..526982f208a 100644 --- a/clients/pom.xml +++ b/clients/pom.xml @@ -35,5 +35,6 @@ hmsbridge deltalake tests + spark-extensions diff --git a/clients/spark-extensions/pom.xml b/clients/spark-extensions/pom.xml new file mode 100644 index 00000000000..d57af4c57bf --- /dev/null +++ b/clients/spark-extensions/pom.xml @@ -0,0 +1,240 @@ + + + + 4.0.0 + + + org.projectnessie + nessie-clients + 0.7.1-SNAPSHOT + + + nessie-spark-extensions + + Nessie - Client - Spark - Extensions + + + + org.projectnessie + nessie-client + ${project.version} + + + org.antlr + antlr4-runtime + + + org.apache.spark + spark-hive_2.12 + ${spark3.version} + provided + + + org.apache.avro + avro + + + org.apache.arrow + arrow-vector + + + commons-logging + commons-logging + + + + + org.apache.iceberg + iceberg-spark3-runtime + test + + + org.junit.jupiter + junit-jupiter-api + test + + + ch.qos.logback + logback-classic + test + + + org.slf4j + log4j-over-slf4j + test + + + org.projectnessie + nessie-client-tests + ${project.version} + test + + + org.assertj + assertj-core + test + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + true + + + + org.antlr + antlr4-maven-plugin + + + antlr + + antlr4 + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + compile + + add-source + compile + + + + scala-test-compile + test-compile + + testCompile + + + + + + org.projectnessie + nessie-apprunner-maven-plugin + ${project.version} + + ${skipTests} + org.projectnessie:nessie-quarkus:${project.version} + + 0 + + + quarkus.http.test-port + + + + + org.projectnessie + nessie-quarkus + ${project.version} + + + + + start + pre-integration-test + start + + + stop + post-integration-test + stop + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + ${quarkus.http.test-port} + + + ${project.build.directory}/test-sources + + true + + + + + integration-test + verify + + + + + + org.apache.maven.plugins + maven-shade-plugin + + false + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/jandex.idx + META-INF/LICENSE + LICENSE + META-INF/LICENSE.md + META-INF/LICENSE.txt + META-INF/NOTIC* + META-INF/DEPENDENCIES + + + + + + org.projectnessie + org.antlr + + + + + org.antlr + org.projectnessie.shaded.org.antlr + + + + + + package + + shade + + + + + + + + diff --git a/clients/spark-extensions/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/NessieSqlExtensions.g4 b/clients/spark-extensions/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/NessieSqlExtensions.g4 new file mode 100644 index 00000000000..e7e73e2c440 --- /dev/null +++ b/clients/spark-extensions/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/NessieSqlExtensions.g4 @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +grammar NessieSqlExtensions; + +@lexer::members { + /** + * Verify whether current token is a valid decimal token (which contains dot). + * Returns true if the character that follows the token is not a digit or letter or underscore. + * + * For example: + * For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'. + * For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'. + * For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'. + * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is followed + * by a space. 34.E2 is a valid decimal token because it is followed by symbol '+' + * which is not a digit or letter or underscore. + */ + public boolean isValidDecimal() { + int nextChar = _input.LA(1); + if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; + } else { + return true; + } + } + + /** + * This method will be called when we see '/*' and try to match it as a bracketed comment. + * If the next character is '+', it should be parsed as hint later, and we cannot match + * it as a bracketed comment. + * + * Returns true if the next character is '+'. + */ + public boolean isHint() { + int nextChar = _input.LA(1); + if (nextChar == '+') { + return true; + } else { + return false; + } + } +} + +singleStatement + : statement EOF + ; + +statement + : CREATE (BRANCH|TAG) identifier (IN catalog=identifier)? (AS reference=identifier)? #nessieCreateRef + | DROP (BRANCH|TAG) identifier (IN catalog=identifier)? #nessieDropRef + | USE REFERENCE identifier (AT ts=identifier)? (IN catalog=identifier)? #nessieUseRef + | LIST REFERENCES (IN catalog=identifier)? #nessieListRef + | SHOW REFERENCE (IN catalog=identifier)? #nessieShowRef + | MERGE BRANCH (identifier)? (INTO toRef=identifier)? (IN catalog=identifier)? #nessieMergeRef + | SHOW LOG (identifier)? (IN catalog=identifier)? #nessieShowLog + | ASSIGN (BRANCH|TAG) (identifier)? (AS toRef=identifier)? (IN catalog=identifier)? #nessieAssignRef + // add collect gc action + // add purge gc action + ; + +identifier + : IDENTIFIER #unquotedIdentifier + | quotedIdentifier #quotedIdentifierAlternative + | nonReserved #unquotedIdentifier + ; + +quotedIdentifier + : BACKQUOTED_IDENTIFIER + ; + +nonReserved + : AS | ASSIGN | AT | BRANCH | CREATE| DROP | IN | TAG | LOG | USE | REFERENCE | REFERENCES + | SHOW | LIST | MERGE | INTO + ; + +LOG: 'LOG'; +USE: 'USE'; +REFERENCE: 'REFERENCE'; +REFERENCES: 'REFERENCES'; +SHOW: 'SHOW'; +LIST: 'LIST'; +MERGE: 'MERGE'; +AS: 'AS'; +AT: 'AT'; +BRANCH: 'BRANCH'; +CREATE: 'CREATE'; +DROP: 'DROP'; +IN: 'IN'; +INTO: 'INTO'; +TAG: 'TAG'; +ASSIGN: 'ASSIGN'; +PLUS: '+'; +MINUS: '-'; + +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : DIGIT+ 'L' + ; + +SMALLINT_LITERAL + : DIGIT+ 'S' + ; + +TINYINT_LITERAL + : DIGIT+ 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +EXPONENT_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT {isValidDecimal()}? + ; + +DECIMAL_VALUE + : DECIMAL_DIGITS {isValidDecimal()}? + ; + +FLOAT_LITERAL + : DIGIT+ EXPONENT? 'F' + | DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}? + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +SIMPLE_COMMENT + : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN) + ; + +BRACKETED_COMMENT + : '/*' {!isHint()}? (BRACKETED_COMMENT|.)*? '*/' -> channel(HIDDEN) + ; + +WS + : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for anything we can't recognize. +// We use this to be able to ignore and recover all the text +// when splitting statements with DelimiterLexer +UNRECOGNIZED + : . + ; diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/NessieSparkSqlExtensionsParser.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/NessieSparkSqlExtensionsParser.scala new file mode 100644 index 00000000000..7461b31d273 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/NessieSparkSqlExtensionsParser.scala @@ -0,0 +1,266 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.parser.extensions + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.atn.PredictionMode +import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} +import org.apache.spark.sql.types.{DataType, StructType} + +import java.util.Locale + +class NessieSparkSqlExtensionsParser(delegate: ParserInterface) + extends ParserInterface { + + private lazy val substitutor = new VariableSubstitution(SQLConf.get) + private lazy val astBuilder = new NessieSqlExtensionsAstBuilder(delegate) + + /** + * Parse a string to a DataType. + */ + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + /** + * Parse a string to a raw DataType without CHAR/VARCHAR replacement. + */ + override def parseRawDataType(sqlText: String): DataType = { + delegate.parseRawDataType(sqlText) + } + + /** + * Parse a string to an Expression. + */ + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + /** + * Parse a string to a TableIdentifier. + */ + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + /** + * Parse a string to a FunctionIdentifier. + */ + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + /** + * Parse a string to a multi-part identifier. + */ + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + /** + * Creates StructType for a given SQL string, which is a comma separated list of field + * definitions which will preserve the correct Hive metadata. + */ + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + /** + * Parse a string to a LogicalPlan. + */ + override def parsePlan(sqlText: String): LogicalPlan = { + val sqlTextAfterSubstitution = substitutor.substitute(sqlText) + if (isNessieCommand(sqlTextAfterSubstitution)) { + parse(sqlTextAfterSubstitution) { parser => + astBuilder.visit(parser.singleStatement()) + }.asInstanceOf[LogicalPlan] + } else { + delegate.parsePlan(sqlText) + } + } + + private def isNessieCommand(sqlText: String): Boolean = { + val normalized = sqlText.toLowerCase(Locale.ROOT).trim() + normalized.startsWith("create branch") || normalized.startsWith( + "create tag" + ) || + normalized.startsWith("drop branch") || normalized.startsWith("drop tag") || + normalized.startsWith("use reference") || normalized.startsWith( + "list reference" + ) || + normalized.startsWith("show reference") || normalized.startsWith("show log") || + normalized.startsWith("merge branch") || normalized.startsWith( + "assign branch" + ) || + normalized.startsWith("assign tag") + } + + protected def parse[T]( + command: String + )(toResult: NessieSqlExtensionsParser => T): T = { + val lexer = new NessieSqlExtensionsLexer( + new UpperCaseCharStream(CharStreams.fromString(command)) + ) + lexer.removeErrorListeners() + lexer.addErrorListener(NessieParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = new NessieSqlExtensionsParser(tokenStream) + parser.removeErrorListeners() + parser.addErrorListener(NessieParseErrorListener) + + try { + try { + // first, try parsing with potentially faster SLL mode + parser.getInterpreter.setPredictionMode(PredictionMode.SLL) + toResult(parser) + } catch { + case _: ParseCancellationException => + // if we fail, parse with LL mode + tokenStream.seek(0) // rewind input stream + parser.reset() + + // Try Again. + parser.getInterpreter.setPredictionMode(PredictionMode.LL) + toResult(parser) + } + } catch { + case e: NessieParseException if e.command.isDefined => + throw e + case e: NessieParseException => + throw e.withCommand(command) + case e: AnalysisException => + val position = Origin(e.line, e.startPosition) + throw new NessieParseException( + Option(command), + e.message, + position, + position + ) + } + } +} + +/* Copied from Apache Spark's to avoid dependency on Spark Internals */ +class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { + override def consume(): Unit = wrapped.consume + override def getSourceName(): String = wrapped.getSourceName + override def index(): Int = wrapped.index + override def mark(): Int = wrapped.mark + override def release(marker: Int): Unit = wrapped.release(marker) + override def seek(where: Int): Unit = wrapped.seek(where) + override def size(): Int = wrapped.size + + override def getText(interval: Interval): String = { + // ANTLR 4.7's CodePointCharStream implementations have bugs when + // getText() is called with an empty stream, or intervals where + // the start > end. See + // https://github.com/antlr/antlr4/commit/ac9f7530 for one fix + // that is not yet in a released ANTLR artifact. + if (size() > 0 && (interval.b - interval.a >= 0)) { + wrapped.getText(interval) + } else { + "" + } + } + + // scalastyle:off + override def LA(i: Int): Int = { + val la = wrapped.LA(i) + if (la == 0 || la == IntStream.EOF) la + else Character.toUpperCase(la) + } + // scalastyle:on +} + +/* Partially copied from Apache Spark's Parser to avoid dependency on Spark Internals */ +case object NessieParseErrorListener extends BaseErrorListener { + override def syntaxError( + recognizer: Recognizer[_, _], + offendingSymbol: scala.Any, + line: Int, + charPositionInLine: Int, + msg: String, + e: RecognitionException + ): Unit = { + val (start, stop) = offendingSymbol match { + case token: CommonToken => + val start = Origin(Some(line), Some(token.getCharPositionInLine)) + val length = token.getStopIndex - token.getStartIndex + 1 + val stop = + Origin(Some(line), Some(token.getCharPositionInLine + length)) + (start, stop) + case _ => + val start = Origin(Some(line), Some(charPositionInLine)) + (start, start) + } + throw new NessieParseException(None, msg, start, stop) + } +} + +/** + * Copied from Apache Spark + * A [[ParseException]] is an [[AnalysisException]] that is thrown during the parse process. It + * contains fields and an extended error message that make reporting and diagnosing errors easier. + */ +class NessieParseException( + val command: Option[String], + message: String, + val start: Origin, + val stop: Origin +) extends AnalysisException(message, start.line, start.startPosition) { + + def this(message: String, ctx: ParserRuleContext) = { + this( + Option(NessieParserUtils.command(ctx)), + message, + NessieParserUtils.position(ctx.getStart), + NessieParserUtils.position(ctx.getStop) + ) + } + + override def getMessage: String = { + val builder = new StringBuilder + builder ++= "\n" ++= message + start match { + case Origin(Some(l), Some(p)) => + builder ++= s"(line $l, pos $p)\n" + command.foreach { cmd => + val (above, below) = cmd.split("\n").splitAt(l) + builder ++= "\n== SQL ==\n" + above.foreach(builder ++= _ += '\n') + builder ++= (0 until p).map(_ => "-").mkString("") ++= "^^^\n" + below.foreach(builder ++= _ += '\n') + } + case _ => + command.foreach { cmd => + builder ++= "\n== SQL ==\n" ++= cmd + } + } + builder.toString + } + + def withCommand(cmd: String): NessieParseException = { + new NessieParseException(Option(cmd), message, start, stop) + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/NessieSqlExtensionsAstBuilder.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/NessieSqlExtensionsAstBuilder.scala new file mode 100644 index 00000000000..8a27a4ee840 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/NessieSqlExtensionsAstBuilder.scala @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.parser.extensions + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.misc.Interval +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.extensions.NessieParserUtils.withOrigin +import org.apache.spark.sql.catalyst.parser.extensions.NessieSqlExtensionsParser._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin} + +class NessieSqlExtensionsAstBuilder(delegate: ParserInterface) + extends NessieSqlExtensionsBaseVisitor[AnyRef] { + + override def visitNessieCreateRef( + ctx: NessieCreateRefContext + ): CreateReferenceCommand = withOrigin(ctx) { + val isBranch = ctx.TAG == null + val refName = ctx.identifier(0).getText + val catalogName = asText(ctx.catalog) + val createdFrom = asText(ctx.reference) + CreateReferenceCommand(refName, isBranch, catalogName, createdFrom) + } + + override def visitNessieDropRef( + ctx: NessieDropRefContext + ): DropReferenceCommand = withOrigin(ctx) { + val isBranch = ctx.TAG == null + val refName = ctx.identifier(0).getText + val catalogName = asText(ctx.catalog) + DropReferenceCommand(refName, isBranch, catalogName) + } + + override def visitNessieUseRef( + ctx: NessieUseRefContext + ): UseReferenceCommand = + withOrigin(ctx) { + val refName = ctx.identifier(0).getText + val timestamp = asText(ctx.ts) + val catalogName = asText(ctx.catalog) + UseReferenceCommand(refName, timestamp, catalogName) + } + + override def visitNessieListRef( + ctx: NessieListRefContext + ): ListReferenceCommand = withOrigin(ctx) { + val catalogName = asText(ctx.catalog) + ListReferenceCommand(catalogName) + } + + override def visitNessieShowRef( + ctx: NessieShowRefContext + ): ShowReferenceCommand = withOrigin(ctx) { + val catalogName = asText(ctx.catalog) + ShowReferenceCommand(catalogName) + } + + override def visitNessieMergeRef( + ctx: NessieMergeRefContext + ): MergeBranchCommand = withOrigin(ctx) { + val refName = asText(ctx.identifier(0)) + val toRefName = asText(ctx.toRef) + val catalogName = asText(ctx.catalog) + MergeBranchCommand(refName, toRefName, catalogName) + } + + override def visitNessieShowLog(ctx: NessieShowLogContext): ShowLogCommand = + withOrigin(ctx) { + val refName = asText(ctx.identifier(0)) + val catalogName = asText(ctx.catalog) + ShowLogCommand(refName, catalogName) + } + + override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = + withOrigin(ctx) { + visit(ctx.statement).asInstanceOf[LogicalPlan] + } + + override def visitNessieAssignRef( + ctx: NessieAssignRefContext + ): AssignReferenceCommand = + withOrigin(ctx) { + val isBranch = ctx.TAG == null + val refName = ctx.identifier(0).getText + val toRefName = asText(ctx.toRef) + val catalogName = asText(ctx.catalog) + AssignReferenceCommand(refName, isBranch, toRefName, catalogName) + } + + private def asText(parameter: IdentifierContext): Option[String] = { + Option(parameter).map(x => x.getText) + } +} + +/* Partially copied from Apache Spark's Parser to avoid dependency on Spark Internals */ +object NessieParserUtils { + + private[sql] def withOrigin[T](ctx: ParserRuleContext)(f: => T): T = { + val current = CurrentOrigin.get + CurrentOrigin.set(position(ctx.getStart)) + try { + f + } finally { + CurrentOrigin.set(current) + } + } + + private[sql] def position(token: Token): Origin = { + val opt = Option(token) + Origin(opt.map(_.getLine), opt.map(_.getCharPositionInLine)) + } + + /** Get the command which created the token. */ + private[sql] def command(ctx: ParserRuleContext): String = { + val stream = ctx.getStart.getInputStream + stream.getText(Interval.of(0, stream.size() - 1)) + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AssignReferenceCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AssignReferenceCommand.scala new file mode 100644 index 00000000000..9ec7c9eb5e1 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AssignReferenceCommand.scala @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +case class AssignReferenceCommand( + reference: String, + isBranch: Boolean, + toRefName: Option[String], + catalog: Option[String] +) extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "refType", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "name", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "hash", + DataTypes.StringType, + nullable = false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"AssignReference ${reference}" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateReferenceCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateReferenceCommand.scala new file mode 100644 index 00000000000..527aea3a45a --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateReferenceCommand.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.DataTypes +import org.apache.spark.sql.types.Metadata +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType + +case class CreateReferenceCommand( + reference: String, + isBranch: Boolean, + catalog: Option[String], + fromReference: Option[String] +) extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "refType", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "name", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "hash", + DataTypes.StringType, + nullable = false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"CreateReference ${reference}" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropReferenceCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropReferenceCommand.scala new file mode 100644 index 00000000000..4c4de1736c8 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropReferenceCommand.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +case class DropReferenceCommand( + reference: String, + isBranch: Boolean, + catalog: Option[String] +) extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "status", + DataTypes.StringType, + nullable = false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"DropReference ${reference}" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ListReferenceCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ListReferenceCommand.scala new file mode 100644 index 00000000000..df112a8af66 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ListReferenceCommand.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +case class ListReferenceCommand(catalog: Option[String]) extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "refType", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "name", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "hash", + DataTypes.StringType, + nullable = false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"ListReference" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeBranchCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeBranchCommand.scala new file mode 100644 index 00000000000..c8d57ddb1ec --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeBranchCommand.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +case class MergeBranchCommand( + branch: Option[String], + toRefName: Option[String], + catalog: Option[String] +) extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "name", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "hash", + DataTypes.StringType, + nullable = false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"MergeBranch ${branch}" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ShowLogCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ShowLogCommand.scala new file mode 100644 index 00000000000..3ab783a834d --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ShowLogCommand.scala @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +case class ShowLogCommand(refName: Option[String], catalog: Option[String]) + extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "author", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "committer", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "hash", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "message", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "signedOffBy", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "authorTime", + DataTypes.TimestampType, + nullable = false, + Metadata.empty + ), + StructField( + "committerTime", + DataTypes.TimestampType, + false, + Metadata.empty + ), + StructField( + "properties", + DataTypes + .createMapType(DataTypes.StringType, DataTypes.StringType, false), + false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"ShowLog ${refName}" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ShowReferenceCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ShowReferenceCommand.scala new file mode 100644 index 00000000000..03aa4a0c044 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ShowReferenceCommand.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +case class ShowReferenceCommand(catalog: Option[String]) extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "refType", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "name", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "hash", + DataTypes.StringType, + nullable = false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"ShowReference" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UseReferenceCommand.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UseReferenceCommand.scala new file mode 100644 index 00000000000..0b3bde0a04e --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UseReferenceCommand.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +case class UseReferenceCommand( + branch: String, + ts: Option[String], + catalog: Option[String] +) extends Command { + + override lazy val output: Seq[Attribute] = new StructType( + Array[StructField]( + StructField( + "refType", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "name", + DataTypes.StringType, + nullable = false, + Metadata.empty + ), + StructField( + "hash", + DataTypes.StringType, + nullable = false, + Metadata.empty + ) + ) + ).toAttributes + + override def simpleString(maxFields: Int): String = { + s"UseReference ${branch}" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AssignReferenceExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AssignReferenceExec.scala new file mode 100644 index 00000000000..e2d6af1669f --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AssignReferenceExec.scala @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.NessieClient +import org.projectnessie.model.{ + Branch, + ImmutableBranch, + ImmutableHash, + ImmutableMerge, + ImmutableTag, + Tag +} + +case class AssignReferenceExec( + output: Seq[Attribute], + branch: String, + isBranch: Boolean, + currentCatalog: CatalogPlugin, + toRefName: Option[String], + catalog: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + val toHash = toRefName + .map(r => nessieClient.getTreeApi.getReferenceByName(r).getHash) + .getOrElse(nessieClient.getTreeApi.getDefaultBranch.getHash) + val hash = nessieClient.getTreeApi.getReferenceByName(branch).getHash + if (isBranch) { + nessieClient.getTreeApi.assignBranch( + branch, + hash, + Branch.of(branch, toHash) + ) + } else { + nessieClient.getTreeApi.assignTag(branch, hash, Tag.of(branch, toHash)) + } + + val ref = nessieClient.getTreeApi.getReferenceByName(branch) + + val refType = ref match { + case _: ImmutableHash => NessieUtils.HASH + case _: ImmutableBranch => NessieUtils.BRANCH + case _: ImmutableTag => NessieUtils.TAG + } + + Seq( + InternalRow( + UTF8String.fromString(refType), + UTF8String.fromString(ref.getName), + UTF8String.fromString(ref.getHash) + ) + ) + } + + override def simpleString(maxFields: Int): String = { + s"AssignReferenceExec ${catalog.getOrElse(currentCatalog.name())} ${branch} " + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateReferenceExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateReferenceExec.scala new file mode 100644 index 00000000000..d64dff89f16 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateReferenceExec.scala @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.NessieClient +import org.projectnessie.model._ + +case class CreateReferenceExec( + output: Seq[Attribute], + branch: String, + currentCatalog: CatalogPlugin, + isBranch: Boolean, + catalog: Option[String], + createdFrom: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + val hash = createdFrom + .map(nessieClient.getTreeApi.getReferenceByName) + .orElse(Option(nessieClient.getTreeApi.getDefaultBranch)) + .map(x => x.getHash) + .orNull + val ref = if (isBranch) Branch.of(branch, hash) else Tag.of(branch, hash) + nessieClient.getTreeApi.createReference(ref) + val branchResult = nessieClient.getTreeApi.getReferenceByName(ref.getName) + val refType = branchResult match { + case _: ImmutableHash => NessieUtils.HASH + case _: ImmutableBranch => NessieUtils.BRANCH + case _: ImmutableTag => NessieUtils.TAG + } + Seq( + InternalRow( + UTF8String.fromString(refType), + UTF8String.fromString(branchResult.getName), + UTF8String.fromString(branchResult.getHash) + ) + ) + } + + override def simpleString(maxFields: Int): String = { + s"CreateReferenceExec ${catalog.getOrElse(currentCatalog.name())} ${if (isBranch) "BRANCH" + else "TAG"} ${branch} " + + s"${createdFrom}" + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropReferenceExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropReferenceExec.scala new file mode 100644 index 00000000000..b778f2896df --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropReferenceExec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.NessieClient + +case class DropReferenceExec( + output: Seq[Attribute], + branch: String, + currentCatalog: CatalogPlugin, + isBranch: Boolean, + catalog: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + val hash = nessieClient.getTreeApi.getReferenceByName(branch).getHash + if (isBranch) { + nessieClient.getTreeApi.deleteBranch(branch, hash) + } else { + nessieClient.getTreeApi.deleteTag(branch, hash) + } + Seq(InternalRow(UTF8String.fromString("OK"))) + } + + override def simpleString(maxFields: Int): String = { + s"DropReferenceExec ${catalog.getOrElse(currentCatalog.name())} ${if (isBranch) "BRANCH" + else "TAG"} ${branch} " + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala new file mode 100644 index 00000000000..2df60b420f1 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.plans.logical.{ + AssignReferenceCommand, + CreateReferenceCommand, + DropReferenceCommand, + ListReferenceCommand, + LogicalPlan, + MergeBranchCommand, + ShowLogCommand, + ShowReferenceCommand, + UseReferenceCommand +} +import org.apache.spark.sql.execution.SparkPlan + +case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + + case c @ CreateReferenceCommand(branch, isBranch, catalog, reference) => + CreateReferenceExec( + c.output, + branch, + spark.sessionState.catalogManager.currentCatalog, + isBranch, + catalog, + reference + ) :: Nil + + case c @ DropReferenceCommand(branch, isBranch, catalog) => + DropReferenceExec( + c.output, + branch, + spark.sessionState.catalogManager.currentCatalog, + isBranch, + catalog + ) :: Nil + + case c @ UseReferenceCommand(branch, ts, catalog) => + UseReferenceExec( + c.output, + branch, + spark.sessionState.catalogManager.currentCatalog, + ts, + catalog + ) :: Nil + + case c @ ListReferenceCommand(catalog) => + ListReferenceExec( + c.output, + spark.sessionState.catalogManager.currentCatalog, + catalog + ) :: Nil + + case c @ ShowReferenceCommand(catalog) => + ShowReferenceExec( + c.output, + spark.sessionState.catalogManager.currentCatalog, + catalog + ) :: Nil + + case c @ MergeBranchCommand(branch, toRefName, catalog) => + MergeBranchExec( + c.output, + branch, + spark.sessionState.catalogManager.currentCatalog, + toRefName, + catalog + ) :: Nil + + case c @ ShowLogCommand(refName, catalog) => + ShowLogExec( + c.output, + refName, + spark.sessionState.catalogManager.currentCatalog, + catalog + ) :: Nil + + case c @ AssignReferenceCommand(reference, isBranch, toRefName, catalog) => + AssignReferenceExec( + c.output, + reference, + isBranch, + spark.sessionState.catalogManager.currentCatalog, + toRefName, + catalog + ) :: Nil + case _ => Nil + } + +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ListReferenceExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ListReferenceExec.scala new file mode 100644 index 00000000000..7b06f70ba8a --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ListReferenceExec.scala @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.NessieClient +import org.projectnessie.model.Branch + +import scala.collection.JavaConverters._ + +case class ListReferenceExec( + output: Seq[Attribute], + currentCatalog: CatalogPlugin, + catalog: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + nessieClient.getTreeApi.getAllReferences.asScala.map(ref => { + InternalRow( + UTF8String.fromString( + if (ref.isInstanceOf[Branch]) NessieUtils.BRANCH else NessieUtils.TAG + ), + UTF8String.fromString(ref.getName), + UTF8String.fromString(ref.getHash) + ) + }) + } + + override def simpleString(maxFields: Int): String = { + s"ListReferenceExec ${catalog.getOrElse(currentCatalog.name())} " + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeBranchExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeBranchExec.scala new file mode 100644 index 00000000000..cf871010ac7 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeBranchExec.scala @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.NessieClient +import org.projectnessie.model.{Branch, ImmutableMerge} + +case class MergeBranchExec( + output: Seq[Attribute], + branch: Option[String], + currentCatalog: CatalogPlugin, + toRefName: Option[String], + catalog: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + nessieClient.getTreeApi.mergeRefIntoBranch( + toRefName.getOrElse(nessieClient.getTreeApi.getDefaultBranch.getName), + toRefName + .map(r => nessieClient.getTreeApi.getReferenceByName(r).getHash) + .getOrElse(nessieClient.getTreeApi.getDefaultBranch.getHash), + ImmutableMerge.builder + .fromHash( + nessieClient.getTreeApi + .getReferenceByName( + branch.getOrElse( + NessieUtils.getCurrentRef(currentCatalog, catalog).getName + ) + ) + .getHash + ) + .build + ) + val ref = nessieClient.getTreeApi.getReferenceByName( + toRefName.getOrElse(nessieClient.getTreeApi.getDefaultBranch.getName) + ) + + Seq( + InternalRow( + UTF8String.fromString(ref.getName), + UTF8String.fromString(ref.getHash) + ) + ) + } + + override def simpleString(maxFields: Int): String = { + s"MergeBranchExec ${catalog.getOrElse(currentCatalog.name())} ${branch} " + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/NessieExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/NessieExec.scala new file mode 100644 index 00000000000..53ae2bda04a --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/NessieExec.scala @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.projectnessie.client.NessieClient + +abstract class NessieExec( + currentCatalog: CatalogPlugin, + catalog: Option[String] +) extends V2CommandExec { + + protected def runInternal(nessieClient: NessieClient): Seq[InternalRow] + + override protected def run(): Seq[InternalRow] = { + val nessieClient = NessieUtils.nessieClient(currentCatalog, catalog); + try { + runInternal(nessieClient) + } finally { + nessieClient.close() + } + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/NessieUtils.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/NessieUtils.scala new file mode 100644 index 00000000000..b4bae19060d --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/NessieUtils.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.projectnessie.client.{NessieClient, StreamingUtil} +import org.projectnessie.error.NessieNotFoundException +import org.projectnessie.model.{Hash, Reference} + +import java.time.{LocalDateTime, ZoneOffset} +import java.util.OptionalInt +import scala.collection.JavaConverters._ + +object NessieUtils { + + val BRANCH: String = "Branch" + val TAG: String = "Tag" + val HASH: String = "Hash" + + def calculateRef( + branch: String, + ts: Option[String], + nessieClient: NessieClient + ): Reference = { + //todo we are assuming always in UTC. ignoring tz set by spark etc + val timestamp = ts + .map(x => x.replaceAll("`", "")) + .map(x => LocalDateTime.parse(x).atZone(ZoneOffset.UTC).toInstant) + .orNull + if (timestamp == null) { + nessieClient.getTreeApi.getReferenceByName(branch) + } else { + val cm = Option( + StreamingUtil + .getCommitLogStream( + nessieClient.getTreeApi, + branch, + OptionalInt.empty(), + String + .format( + "timestamp(commit.commitTime) < timestamp('%s')", + timestamp + ) + ) + .findFirst() + .orElse(null) + ).map(x => Hash.of(x.getHash)) + + cm match { + case Some(value) => value + case None => + throw new NessieNotFoundException( + String.format("Cannot find a hash before %s.", timestamp) + ) + } + } + } + + def nessieClient( + currentCatalog: CatalogPlugin, + catalog: Option[String] + ): NessieClient = { + val catalogName = catalog.getOrElse(currentCatalog.name) + val catalogConf = SparkSession.active.sparkContext.conf + .getAllWithPrefix(s"spark.sql.catalog.$catalogName.") + .toMap + NessieClient + .builder() + .fromConfig(x => catalogConf.getOrElse(x.replace("nessie.", ""), null)) + .build() + } + + def setCurrentRef( + currentCatalog: CatalogPlugin, + catalog: Option[String], + ref: Reference + ): Reference = { + val catalogName = catalog.getOrElse(currentCatalog.name) + val catalogImpl = + SparkSession.active.sessionState.catalogManager.catalog(catalogName) + SparkSession.active.sparkContext.conf + .set(s"spark.sql.catalog.$catalogName.ref", ref.getName) + val catalogConf = SparkSession.active.sparkContext.conf + .getAllWithPrefix(s"spark.sql.catalog.$catalogName.") + .toMap + .asJava + catalogImpl.initialize( + catalogName, + new CaseInsensitiveStringMap(catalogConf) + ) + getCurrentRef(currentCatalog, catalog) + } + + def getCurrentRef( + currentCatalog: CatalogPlugin, + catalog: Option[String] + ): Reference = { + val catalogName = catalog.getOrElse(currentCatalog.name) + val refName = SparkSession.active.sparkContext.conf + .get(s"spark.sql.catalog.$catalogName.ref") + nessieClient(currentCatalog, catalog).getTreeApi.getReferenceByName(refName) + } + +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowLogExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowLogExec.scala new file mode 100644 index 00000000000..6a491a507bf --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowLogExec.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, MapData} +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.{NessieClient, StreamingUtil} + +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.OptionalInt +import scala.collection.JavaConverters._ + +case class ShowLogExec( + output: Seq[Attribute], + branch: Option[String], + currentCatalog: CatalogPlugin, + catalog: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + val refName = branch.getOrElse( + NessieUtils.getCurrentRef(currentCatalog, catalog).getName + ) + val stream = StreamingUtil.getCommitLogStream( + nessieClient.getTreeApi, + refName, + OptionalInt.empty(), + null + ) + + stream.iterator.asScala + .map( + cm => + InternalRow( + convert(cm.getAuthor), + convert(cm.getCommitter), + convert(cm.getHash), + convert(cm.getMessage), + convert(cm.getSignedOffBy), + convert(cm.getAuthorTime), + convert(cm.getCommitTime), + convert(cm.getProperties) + ) + ) + .toSeq + } + + override def simpleString(maxFields: Int): String = { + s"ShowLogExec ${catalog.getOrElse(currentCatalog.name())} ${branch} " + } + + private def convert(input: String): UTF8String = { + UTF8String.fromString(if (input == null) "" else input) + } + + private def convert(input: Instant): Long = { + ChronoUnit.MICROS.between(Instant.EPOCH, input) + } + + private def convert(input: java.util.Map[String, String]): MapData = { + ArrayBasedMapData(input, x => x, x => x) + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowReferenceExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowReferenceExec.scala new file mode 100644 index 00000000000..526a9a5caf4 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowReferenceExec.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.NessieClient +import org.projectnessie.model.Branch + +case class ShowReferenceExec( + output: Seq[Attribute], + currentCatalog: CatalogPlugin, + catalog: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + + val ref = NessieUtils.getCurrentRef(currentCatalog, catalog) + // todo have to figure out if this is delta or iceberg and extract the ref accordingly + Seq( + InternalRow( + UTF8String + .fromString( + if (ref.isInstanceOf[Branch]) NessieUtils.BRANCH + else NessieUtils.TAG + ), + UTF8String.fromString(ref.getName), + UTF8String.fromString(ref.getHash) + ) + ) + } + + override def simpleString(maxFields: Int): String = { + s"ShowReferenceExec ${catalog.getOrElse(currentCatalog.name())} " + } +} diff --git a/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UseReferenceExec.scala b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UseReferenceExec.scala new file mode 100644 index 00000000000..bdb045173b1 --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UseReferenceExec.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.unsafe.types.UTF8String +import org.projectnessie.client.NessieClient +import org.projectnessie.model.Branch + +case class UseReferenceExec( + output: Seq[Attribute], + branch: String, + currentCatalog: CatalogPlugin, + ts: Option[String], + catalog: Option[String] +) extends NessieExec(catalog = catalog, currentCatalog = currentCatalog) { + + override protected def runInternal( + nessieClient: NessieClient + ): Seq[InternalRow] = { + + val ref = NessieUtils.setCurrentRef( + currentCatalog, + catalog, + NessieUtils.calculateRef(branch, ts, nessieClient) + ) + + Seq( + InternalRow( + UTF8String + .fromString( + if (ref.isInstanceOf[Branch]) NessieUtils.BRANCH + else NessieUtils.TAG + ), + UTF8String.fromString(ref.getName), + UTF8String.fromString(ref.getHash) + ) + ) + } + + override def simpleString(maxFields: Int): String = { + s"UseReferenceExec ${catalog.getOrElse(currentCatalog.name())} ${branch} " + } +} diff --git a/clients/spark-extensions/src/main/scala/org/projectnessie/spark/extensions/NessieSparkSessionExtensions.scala b/clients/spark-extensions/src/main/scala/org/projectnessie/spark/extensions/NessieSparkSessionExtensions.scala new file mode 100644 index 00000000000..6e60072e86c --- /dev/null +++ b/clients/spark-extensions/src/main/scala/org/projectnessie/spark/extensions/NessieSparkSessionExtensions.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.spark.extensions + +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.parser.extensions.NessieSparkSqlExtensionsParser +import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy + +class NessieSparkSessionExtensions extends (SparkSessionExtensions => Unit) { + + override def apply(extensions: SparkSessionExtensions): Unit = { + // parser extensions + extensions.injectParser { + case (_, parser) => new NessieSparkSqlExtensionsParser(parser) + } + + // planner extensions + extensions.injectPlannerStrategy { spark => + ExtendedDataSourceV2Strategy(spark) + } + } +} diff --git a/clients/spark-extensions/src/test/java/org/projectnessie/spark/extensions/ITNessieStatements.java b/clients/spark-extensions/src/test/java/org/projectnessie/spark/extensions/ITNessieStatements.java new file mode 100644 index 00000000000..c1822812489 --- /dev/null +++ b/clients/spark-extensions/src/test/java/org/projectnessie/spark/extensions/ITNessieStatements.java @@ -0,0 +1,369 @@ +/* + * Copyright (C) 2020 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.projectnessie.client.tests.AbstractSparkTest; +import org.projectnessie.error.BaseNessieClientServerException; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.ContentsKey; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.ImmutableCommitMeta; +import org.projectnessie.model.ImmutableOperations; +import org.projectnessie.model.Operation; +import org.projectnessie.model.Operations; +import org.projectnessie.model.Tag; + +public class ITNessieStatements extends AbstractSparkTest { + + private String hash; + private final String refName = "testBranch"; + + @BeforeAll + protected static void createDelta() { + conf.set( + "spark.sql.extensions", "org.projectnessie.spark.extensions.NessieSparkSessionExtensions"); + } + + @BeforeEach + void getHash() throws NessieNotFoundException { + hash = nessieClient.getTreeApi().getDefaultBranch().getHash(); + } + + @AfterEach + void removeBranches() throws NessieConflictException, NessieNotFoundException { + for (String s : Arrays.asList(refName, "main")) { + try { + nessieClient + .getTreeApi() + .deleteBranch(s, nessieClient.getTreeApi().getReferenceByName(s).getHash()); + } catch (BaseNessieClientServerException e) { + // pass + } + } + nessieClient.getTreeApi().createReference(Branch.of("main", null)); + } + + @Test + void testCreateBranchIn() throws NessieNotFoundException { + + List result = sql("CREATE BRANCH %s IN nessie", refName); + assertEquals("created branch", row("Branch", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Branch.of(refName, hash)); + result = sql("DROP BRANCH %s IN nessie", refName); + assertEquals("deleted branch", row("OK"), result); + } + + @Test + void testCreateTagIn() throws NessieNotFoundException { + List result = sql("CREATE TAG %s IN nessie", refName); + assertEquals("created tag", row("Tag", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Tag.of(refName, hash)); + result = sql("DROP TAG %s IN nessie", refName); + assertEquals("deleted tag", row("OK"), result); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + } + + @Test + void testCreateBranchInAs() throws NessieNotFoundException { + List result = sql("CREATE BRANCH %s IN nessie AS main", refName); + assertEquals("created branch", row("Branch", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Branch.of(refName, hash)); + result = sql("DROP BRANCH %s IN nessie", refName); + assertEquals("deleted branch", row("OK"), result); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + } + + @Test + void testCreateTagInAs() throws NessieNotFoundException { + List result = sql("CREATE TAG %s IN nessie AS main", refName); + assertEquals("created tag", row("Tag", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Tag.of(refName, hash)); + result = sql("LIST REFERENCES IN nessie"); + List listResult = new ArrayList<>(); + listResult.add(row("Branch", "main", hash)); + listResult.add(row("Tag", refName, hash)); + assertEquals("created branch", listResult, result); + result = sql("DROP TAG %s IN nessie", refName); + assertEquals("deleted tag", row("OK"), result); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + } + + @Disabled("until release of 0.12.0 of iceberg") + @Test + void testCreateBranch() throws NessieNotFoundException { + String catalog = spark.sessionState().catalogManager().currentCatalog().name(); + spark.sessionState().catalogManager().setCurrentCatalog("nessie"); + List result = sql("CREATE BRANCH %s", refName); + assertEquals("created branch", row("Branch", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Branch.of(refName, hash)); + result = sql("DROP BRANCH %s", refName); + assertEquals("deleted branch", row("OK"), result); + spark.sessionState().catalogManager().setCurrentCatalog(catalog); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + } + + @Disabled("until release of 0.12.0 of iceberg") + @Test + void testCreateTag() throws NessieNotFoundException { + String catalog = spark.sessionState().catalogManager().currentCatalog().name(); + spark.sessionState().catalogManager().setCurrentCatalog("nessie"); + List result = sql("CREATE TAG %s", refName); + assertEquals("created branch", row("Tag", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Tag.of(refName, hash)); + result = sql("LIST REFERENCES"); + assertThat(result) + .containsExactlyInAnyOrder(row("Tag", refName, hash), row("Branch", "main", hash)); + result = sql("DROP TAG %s", refName); + assertEquals("deleted branch", row("OK"), result); + spark.sessionState().catalogManager().setCurrentCatalog(catalog); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + } + + @Disabled("until release of 0.12.0 of iceberg") + @Test + void useShowReferencesIn() throws NessieNotFoundException { + List result = sql("CREATE BRANCH %s IN nessie AS main", refName); + assertEquals("created branch", row("Branch", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Branch.of(refName, hash)); + + result = sql("USE REFERENCE %s IN nessie", refName); + assertEquals("use branch", row("Branch", refName, hash), result); + result = sql("SHOW REFERENCE IN nessie"); + assertEquals("show branch", row("Branch", refName, hash), result); + + result = sql("DROP BRANCH %s IN nessie", refName); + assertEquals("deleted branch", row("OK"), result); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + } + + @Disabled("until release of 0.12.0 of iceberg") + @Test + void useShowReferencesAt() throws NessieNotFoundException { + List result = sql("CREATE BRANCH %s IN nessie AS main", refName); + assertEquals("created branch", row("Branch", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Branch.of(refName, hash)); + + result = sql("USE REFERENCE %s AT %s IN nessie ", refName, "`2012-06-01T14:14:14`"); + assertEquals("use branch", row("Branch", refName, hash), result); + result = sql("SHOW REFERENCE IN nessie"); + assertEquals("show branch", row("Branch", refName, hash), result); + + result = sql("DROP BRANCH %s IN nessie", refName); + assertEquals("deleted branch", row("OK"), result); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + } + + @Disabled("until release of 0.12.0 of iceberg") + @Test + void useShowReferences() throws NessieNotFoundException { + List result = sql("CREATE BRANCH %s IN nessie AS main", refName); + assertEquals("created branch", row("Branch", refName, hash), result); + assertThat(nessieClient.getTreeApi().getReferenceByName(refName)) + .isEqualTo(Branch.of(refName, hash)); + + String catalog = spark.sessionState().catalogManager().currentCatalog().name(); + spark.sessionState().catalogManager().setCurrentCatalog("nessie"); + result = sql("USE REFERENCE %s", refName); + assertEquals("use branch", row("Branch", refName, hash), result); + result = sql("SHOW REFERENCE"); + assertEquals("show branch", row("Branch", refName, hash), result); + + result = sql("DROP BRANCH %s IN nessie", refName); + assertEquals("deleted branch", row("OK"), result); + assertThatThrownBy(() -> nessieClient.getTreeApi().getReferenceByName(refName)) + .isInstanceOf(NessieNotFoundException.class) + .hasMessage("Unable to find reference [testBranch]."); + spark.sessionState().catalogManager().setCurrentCatalog(catalog); + } + + @Test + void mergeReferencesIntoMain() throws NessieConflictException, NessieNotFoundException { + List resultList = commitAndReturnLog(refName); + + sql("MERGE BRANCH %s INTO main IN nessie", refName); + List result = sql("SHOW LOG main IN nessie", refName); + // here we are skipping commit time as its variable + assertEquals( + "log", + result.stream().map(ITNessieStatements::convert).collect(Collectors.toList()), + resultList); + } + + @Test + void mergeReferencesIn() throws NessieConflictException, NessieNotFoundException { + List resultList = commitAndReturnLog(refName); + + sql("MERGE BRANCH %s IN nessie", refName); + List result = sql("SHOW LOG main IN nessie", refName); + // here we are skipping commit time as its variable + assertEquals( + "log", + result.stream().map(ITNessieStatements::convert).collect(Collectors.toList()), + resultList); + } + + @Disabled("until release of 0.12.0 of iceberg") + @Test + void mergeReferences() throws NessieConflictException, NessieNotFoundException { + String catalog = spark.sessionState().catalogManager().currentCatalog().name(); + spark.sessionState().catalogManager().setCurrentCatalog("nessie"); + List resultList = commitAndReturnLog(refName); + sql("USE REFERENCE %s", refName); + sql("MERGE BRANCH"); + List result = sql("SHOW LOG %s", refName); + assertEquals( + "log", + result.stream().map(ITNessieStatements::convert).collect(Collectors.toList()), + resultList); + spark.sessionState().catalogManager().setCurrentCatalog(catalog); + } + + @Test + void showLogIn() throws NessieConflictException, NessieNotFoundException { + List resultList = commitAndReturnLog(refName); + List result = sql("SHOW LOG %s IN nessie", refName); + // here we are skipping commit time as its variable + assertEquals( + "log", + result.stream().map(ITNessieStatements::convert).collect(Collectors.toList()), + resultList); + } + + private List commitAndReturnLog(String branch) + throws NessieConflictException, NessieNotFoundException { + sql("CREATE BRANCH %s IN nessie", branch); + ContentsKey key = ContentsKey.of("table", "name"); + CommitMeta cm1 = + ImmutableCommitMeta.builder() + .author("sue") + .authorTime(Instant.ofEpochMilli(1)) + .message("1") + .build(); + + CommitMeta cm2 = + ImmutableCommitMeta.builder() + .author("janet") + .authorTime(Instant.ofEpochMilli(10)) + .message("2") + .build(); + + CommitMeta cm3 = + ImmutableCommitMeta.builder() + .author("alice") + .authorTime(Instant.ofEpochMilli(100)) + .message("3") + .build(); + Operations ops = + ImmutableOperations.builder() + .addOperations(Operation.Put.of(key, IcebergTable.of("foo"))) + .commitMeta(cm1) + .build(); + Operations ops2 = + ImmutableOperations.builder() + .addOperations(Operation.Put.of(key, IcebergTable.of("bar"))) + .commitMeta(cm2) + .build(); + Operations ops3 = + ImmutableOperations.builder() + .addOperations(Operation.Put.of(key, IcebergTable.of("baz"))) + .commitMeta(cm3) + .build(); + + Branch ref1 = nessieClient.getTreeApi().commitMultipleOperations(branch, hash, ops); + Branch ref2 = nessieClient.getTreeApi().commitMultipleOperations(branch, ref1.getHash(), ops2); + Branch ref3 = nessieClient.getTreeApi().commitMultipleOperations(branch, ref2.getHash(), ops3); + + List resultList = new ArrayList<>(); + resultList.add(cmToRow(cm3, ref3.getHash())); + resultList.add(cmToRow(cm2, ref2.getHash())); + resultList.add(cmToRow(cm1, ref1.getHash())); + return resultList; + } + + @Disabled("until release of 0.12.0 of iceberg") + @Test + void showLog() throws NessieConflictException, NessieNotFoundException { + String catalog = spark.sessionState().catalogManager().currentCatalog().name(); + spark.sessionState().catalogManager().setCurrentCatalog("nessie"); + List resultList = commitAndReturnLog(refName); + List result = sql("SHOW LOG %s", refName); + + // here we are skipping commit time as its variable + assertEquals( + "log", + result.stream().map(ITNessieStatements::convert).collect(Collectors.toList()), + resultList); + spark.sessionState().catalogManager().setCurrentCatalog(catalog); + } + + private static Object[] convert(Object[] object) { + return new Object[] { + object[0], object[1], object[2], object[3], object[4], object[5], object[7] + }; + } + + private Object[] cmToRow(CommitMeta cm, String hash) { + return new Object[] { + cm.getAuthor(), + "", + hash, + cm.getMessage(), + "", + cm.getAuthorTime() == null ? null : Timestamp.from(cm.getAuthorTime()), + new HashMap<>() + }; + } +} diff --git a/clients/spark-extensions/src/test/resources/logback-test.xml b/clients/spark-extensions/src/test/resources/logback-test.xml new file mode 100644 index 00000000000..c1f87f9456d --- /dev/null +++ b/clients/spark-extensions/src/test/resources/logback-test.xml @@ -0,0 +1,31 @@ + + + + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + diff --git a/clients/tests/src/main/java/org/projectnessie/client/tests/AbstractSparkTest.java b/clients/tests/src/main/java/org/projectnessie/client/tests/AbstractSparkTest.java index 2f25173fe79..b7e2b6c57d2 100644 --- a/clients/tests/src/main/java/org/projectnessie/client/tests/AbstractSparkTest.java +++ b/clients/tests/src/main/java/org/projectnessie/client/tests/AbstractSparkTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -88,6 +89,11 @@ protected static List transform(Dataset table) { .collect(Collectors.toList()); } + protected static void assertEquals( + String context, Object[] expectedRow, List actualRows) { + assertEquals(context, Collections.singletonList(expectedRow), actualRows); + } + protected static void assertEquals( String context, List expectedRows, List actualRows) { Assertions.assertEquals( diff --git a/code-coverage/pom.xml b/code-coverage/pom.xml index 4c1f89cf265..ce240234e33 100644 --- a/code-coverage/pom.xml +++ b/code-coverage/pom.xml @@ -157,6 +157,11 @@ nessie-versioned-tiered-tests ${project.version} + + org.projectnessie + nessie-spark-extensions + ${project.version} + diff --git a/pom.xml b/pom.xml index 1e8c720e958..520543adf24 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,7 @@ + 4.7.1 3.20.2 2.16.90 1.66 @@ -498,6 +499,11 @@ assertj-core ${assertj.version} + + org.antlr + antlr4-runtime + ${antlr.version} + @@ -645,6 +651,14 @@ + + org.antlr + antlr4-maven-plugin + ${antlr.version} + + true + + com.github.eirslett frontend-maven-plugin @@ -1012,6 +1026,7 @@ limitations under the License. SLASHSTAR_STYLE SLASHSTAR_STYLE XML_STYLE + SLASHSTAR_STYLE