diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 7b875f63b..9cd5f60a7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -407,9 +407,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { case (true, false) => AUTO case (false, false) => FULL case (false, true) => INCREMENTAL - case (true, true) => - throw new IllegalArgumentException( - "auto_refresh and incremental_refresh options cannot both be true") } // validate allowed options depending on refresh mode diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index adcb4c45f..106df276d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.spark.FlintSparkIndexOptions.empty +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.apache.spark.sql.catalog.Column import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -59,7 +60,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { * ignore existing index */ def create(ignoreIfExists: Boolean = false): Unit = - flint.createIndex(buildIndex(), ignoreIfExists) + flint.createIndex(validateIndex(buildIndex()), ignoreIfExists) /** * Copy Flint index with updated options. @@ -80,7 +81,24 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { val updatedMetadata = index .metadata() .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) - FlintSparkIndexFactory.create(updatedMetadata).get + validateIndex(FlintSparkIndexFactory.create(updatedMetadata).get) + } + + /** + * Pre-validate index to ensure its validity. By default, this method validates index options by + * delegating to specific index refresh (index options are mostly serving index refresh). + * Subclasses can extend this method to include additional validation logic. + * + * @param index + * Flint index to be validated + * @return + * the index or exception occurred if validation failed + */ + protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = { + FlintSparkIndexRefresh + .create(index.name(), index) // TODO: remove first argument? + .validate(flint.spark) + index } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala new file mode 100644 index 000000000..f689d9aee --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} + +/** + * Flint Spark validation helper. + */ +trait FlintSparkValidationHelper extends Logging { + + /** + * Determines whether the source table(s) for a given Flint index are supported. + * + * @param spark + * Spark session + * @param index + * Flint index + * @return + * true if all non Hive, otherwise false + */ + def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = { + // Extract source table name (possibly more than one for MV query) + val tableNames = index match { + case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) + case covering: FlintSparkCoveringIndex => Seq(covering.tableName) + case mv: FlintSparkMaterializedView => + spark.sessionState.sqlParser + .parsePlan(mv.query) + .collect { case relation: UnresolvedRelation => + qualifyTableName(spark, relation.tableName) + } + } + + // Validate if any source table is not supported (currently Hive only) + tableNames.exists { tableName => + val (catalog, ident) = parseTableName(spark, tableName) + val table = loadTable(catalog, ident).get + + // TODO: add allowed table provider list + DDLUtils.isHiveTable(Option(table.properties().get("provider"))) + } + } + + /** + * Checks whether a specified checkpoint location is accessible. Accessibility, in this context, + * means that the folder exists and the current Spark session has the necessary permissions to + * access it. + * + * @param spark + * Spark session + * @param checkpointLocation + * checkpoint location + * @return + * true if accessible, otherwise false + */ + def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { + try { + val checkpointManager = + CheckpointFileManager.create( + new Path(checkpointLocation), + spark.sessionState.newHadoopConf()) + + checkpointManager.exists(new Path(checkpointLocation)) + } catch { + case e: IOException => + logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e) + false + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index 09428f80d..35902e184 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -5,7 +5,9 @@ package org.opensearch.flint.spark.refresh -import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} +import java.util.Collections + +import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} @@ -23,10 +25,41 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} * @param index * Flint index */ -class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresh { +class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) + extends FlintSparkIndexRefresh + with FlintSparkValidationHelper { override def refreshMode: RefreshMode = AUTO + override def validate(spark: SparkSession): Unit = { + // Incremental refresh cannot enabled at the same time + val options = index.options + require( + !options.incrementalRefresh(), + "Incremental refresh cannot be enabled if auto refresh is enabled") + + // Hive table doesn't support auto refresh + require( + !isTableProviderSupported(spark, index), + "Index auto refresh doesn't support Hive table") + + // Checkpoint location is required if mandatory option set + val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) + val checkpointLocation = options.checkpointLocation() + if (flintSparkConf.isCheckpointMandatory) { + require( + checkpointLocation.isDefined, + s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled") + } + + // Checkpoint location must be accessible + if (checkpointLocation.isDefined) { + require( + isCheckpointLocationAccessible(spark, checkpointLocation.get), + s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") + } + } + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { val options = index.options val tableName = index.metadata().source diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 3c929d8e3..0c6adb0bd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -24,6 +24,20 @@ trait FlintSparkIndexRefresh extends Logging { */ def refreshMode: RefreshMode + /** + * Validates the current index refresh settings before the actual execution begins. This method + * checks for the integrity of the index refresh configurations and ensures that all options set + * for the current refresh mode are valid. This preemptive validation helps in identifying + * configuration issues before the refresh operation is initiated, minimizing runtime errors and + * potential inconsistencies. + * + * @param spark + * Spark session + * @throws IllegalArgumentException + * if any invalid or inapplicable config identified + */ + def validate(spark: SparkSession): Unit + /** * Start refreshing the index. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala index be09c2c36..b2ce2ad34 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala @@ -31,6 +31,11 @@ class FullIndexRefresh( override def refreshMode: RefreshMode = FULL + override def validate(spark: SparkSession): Unit = { + // Full refresh validates nothing for now, including Hive table validation. + // This allows users to continue using their existing Hive table with full refresh only. + } + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { logInfo(s"Start refreshing index $indexName in full mode") index diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 418ada902..8eb8d6f1f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark.refresh -import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkValidationHelper} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode} import org.apache.spark.sql.SparkSession @@ -20,18 +20,31 @@ import org.apache.spark.sql.flint.config.FlintSparkConf * Flint index */ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) - extends FlintSparkIndexRefresh { + extends FlintSparkIndexRefresh + with FlintSparkValidationHelper { override def refreshMode: RefreshMode = INCREMENTAL + override def validate(spark: SparkSession): Unit = { + // Non-Hive table is required for incremental refresh + require( + !isTableProviderSupported(spark, index), + "Index incremental refresh doesn't support Hive table") + + // Checkpoint location is required regardless of mandatory option + val options = index.options + val checkpointLocation = options.checkpointLocation() + require( + options.checkpointLocation().nonEmpty, + "Checkpoint location is required by incremental refresh") + require( + isCheckpointLocationAccessible(spark, checkpointLocation.get), + s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") + } + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { logInfo(s"Start refreshing index $indexName in incremental mode") - // TODO: move this to validation method together in future - if (index.options.checkpointLocation().isEmpty) { - throw new IllegalStateException("Checkpoint location is required by incremental refresh") - } - // Reuse auto refresh which uses AvailableNow trigger and will stop once complete val jobId = new AutoIndexRefresh(indexName, index) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/SparkHiveSupportSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/SparkHiveSupportSuite.scala new file mode 100644 index 000000000..36a0b526d --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/sql/SparkHiveSupportSuite.scala @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.sql.hive.HiveSessionStateBuilder +import org.apache.spark.sql.internal.{SessionState, StaticSQLConf} +import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} + +/** + * Flint Spark base suite with Hive support enabled. Because enabling Hive support in Spark + * configuration alone is not adequate, as [[TestSparkSession]] disregards it and consistently + * creates its own instance of [[org.apache.spark.sql.test.TestSQLSessionStateBuilder]]. We need + * to override its session state with that of Hive in the meanwhile. + * + * Note that we need to extend [[SharedSparkSession]] to call super.sparkConf() method. + */ +trait SparkHiveSupportSuite extends SharedSparkSession { + + override protected def sparkConf: SparkConf = { + super.sparkConf + // Enable Hive support + .set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") + // Use in-memory Derby as Hive metastore so no need to clean up metastore_db folder after test + .set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:metastore_db;create=true") + .set("hive.metastore.uris", "") + } + + override protected def createSparkSession: TestSparkSession = { + SparkSession.cleanupAnyExistingSession() + new FlintTestSparkSession(sparkConf) + } + + class FlintTestSparkSession(sparkConf: SparkConf) extends TestSparkSession(sparkConf) { self => + + override lazy val sessionState: SessionState = { + // Override to replace [[TestSQLSessionStateBuilder]] with Hive session state + new HiveSessionStateBuilder(spark, None).build() + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 6991e60d8..eb58de567 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -125,7 +125,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") { setFlintSparkConf(CHECKPOINT_MANDATORY, "true") try { - the[IllegalStateException] thrownBy { + the[IllegalArgumentException] thrownBy { sql(s""" | CREATE INDEX $testIndex ON $testTable | (name, age) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala new file mode 100644 index 000000000..ee7420d94 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.{Locale, UUID} + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, INCREMENTAL, RefreshMode} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.scalatest.matchers.must.Matchers.have +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} + +import org.apache.spark.sql.SparkHiveSupportSuite +import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY + +class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSupportSuite { + + // Test Hive table name + private val testTable = "spark_catalog.default.index_validation_test" + + // Test create Flint index name and DDL statement + private val skippingIndexName = FlintSparkSkippingIndex.getSkippingIndexName(testTable) + private val createSkippingIndexStatement = + s"CREATE SKIPPING INDEX ON $testTable (name VALUE_SET)" + + private val coveringIndexName = + FlintSparkCoveringIndex.getFlintIndexName("ci_test", testTable) + private val createCoveringIndexStatement = + s"CREATE INDEX ci_test ON $testTable (name)" + + private val materializedViewName = + FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default.mv_test") + private val createMaterializedViewStatement = + s"CREATE MATERIALIZED VIEW spark_catalog.default.mv_test AS SELECT * FROM $testTable" + + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create auto refresh Flint index if incremental refresh enabled: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | auto_refresh = true, + | incremental_refresh = true + | ) + |""".stripMargin) + } should have message + "requirement failed: Incremental refresh cannot be enabled if auto refresh is enabled" + } + } + } + + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create auto refresh Flint index if checkpoint location mandatory: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + try { + setFlintSparkConf(CHECKPOINT_MANDATORY, "true") + sql(s""" + | $statement + | WITH ( + | auto_refresh = true + | ) + |""".stripMargin) + } finally { + setFlintSparkConf(CHECKPOINT_MANDATORY, "false") + } + } should have message + s"requirement failed: Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled" + } + } + } + + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create incremental refresh Flint index without checkpoint location: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | incremental_refresh = true + | ) + |""".stripMargin) + } should have message + "requirement failed: Checkpoint location is required by incremental refresh" + } + } + } + + Seq( + (AUTO, createSkippingIndexStatement), + (AUTO, createCoveringIndexStatement), + (AUTO, createMaterializedViewStatement), + (INCREMENTAL, createSkippingIndexStatement), + (INCREMENTAL, createCoveringIndexStatement), + (INCREMENTAL, createMaterializedViewStatement)) + .foreach { case (refreshMode, statement) => + test( + s"should fail to create $refreshMode refresh Flint index if checkpoint location is inaccessible: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + // Generate UUID as folder name to ensure the path not exist + val checkpointDir = s"/test/${UUID.randomUUID()}" + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | ${optionName(refreshMode)} = true, + | checkpoint_location = "$checkpointDir" + | ) + |""".stripMargin) + } should have message + s"requirement failed: Checkpoint location $checkpointDir doesn't exist or no permission to access" + } + } + } + + Seq( + (AUTO, createSkippingIndexStatement), + (AUTO, createCoveringIndexStatement), + (AUTO, createMaterializedViewStatement), + (INCREMENTAL, createSkippingIndexStatement), + (INCREMENTAL, createCoveringIndexStatement), + (INCREMENTAL, createMaterializedViewStatement)) + .foreach { case (refreshMode, statement) => + test(s"should fail to create $refreshMode refresh Flint index on Hive table: $statement") { + withTempDir { checkpointDir => + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING)") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | ${optionName(refreshMode)} = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } should have message + s"requirement failed: Index ${lowercase(refreshMode)} refresh doesn't support Hive table" + } + } + } + } + + Seq( + (skippingIndexName, createSkippingIndexStatement), + (coveringIndexName, createCoveringIndexStatement), + (materializedViewName, createMaterializedViewStatement)).foreach { + case (flintIndexName, statement) => + test(s"should succeed to create full refresh Flint index on Hive table: $flintIndexName") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING)") + sql(s"INSERT INTO $testTable VALUES ('test')") + + sql(statement) + flint.refreshIndex(flintIndexName) + flint.queryIndex(flintIndexName).count() shouldBe 1 + } + } + } + + private def lowercase(mode: RefreshMode): String = mode.toString.toLowerCase(Locale.ROOT) + + private def optionName(mode: RefreshMode): String = mode match { + case AUTO => "auto_refresh" + case INCREMENTAL => "incremental_refresh" + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 16d2b0b07..83fe1546c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -43,56 +43,58 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } test("create materialized view with metadata successfully") { - val indexOptions = - FlintSparkIndexOptions( - Map( - "auto_refresh" -> "true", - "checkpoint_location" -> "s3://test/", - "watermark_delay" -> "30 Seconds")) - flint - .materializedView() - .name(testMvName) - .query(testQuery) - .options(indexOptions) - .create() + withTempDir { checkpointDir => + val indexOptions = + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "30 Seconds")) + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .options(indexOptions) + .create() - val index = flint.describeIndex(testFlintIndex) - index shouldBe defined - index.get.metadata().getContent should matchJson(s""" - | { - | "_meta": { - | "version": "${current()}", - | "name": "spark_catalog.default.mv_test_metrics", - | "kind": "mv", - | "source": "$testQuery", - | "indexedColumns": [ - | { - | "columnName": "startTime", - | "columnType": "timestamp" - | },{ - | "columnName": "count", - | "columnType": "bigint" - | }], - | "options": { - | "auto_refresh": "true", - | "incremental_refresh": "false", - | "checkpoint_location": "s3://test/", - | "watermark_delay": "30 Seconds" - | }, - | "latestId": "$testLatestId", - | "properties": {} - | }, - | "properties": { - | "startTime": { - | "type": "date", - | "format": "strict_date_optional_time_nanos" - | }, - | "count": { - | "type": "long" - | } - | } - | } - |""".stripMargin) + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + index.get.metadata().getContent should matchJson(s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "spark_catalog.default.mv_test_metrics", + | "kind": "mv", + | "source": "$testQuery", + | "indexedColumns": [ + | { + | "columnName": "startTime", + | "columnType": "timestamp" + | },{ + | "columnName": "count", + | "columnType": "bigint" + | }], + | "options": { + | "auto_refresh": "true", + | "incremental_refresh": "false", + | "checkpoint_location": "${checkpointDir.getAbsolutePath}", + | "watermark_delay": "30 Seconds" + | }, + | "latestId": "$testLatestId", + | "properties": {} + | }, + | "properties": { + | "startTime": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "count": { + | "type": "long" + | } + | } + | } + |""".stripMargin) + } } test("full refresh materialized view") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 8b724fde7..8c7c5be82 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -141,36 +141,39 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("create skipping index with index options successfully") { - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("address") - .options(FlintSparkIndexOptions(Map( - "auto_refresh" -> "true", - "refresh_interval" -> "1 Minute", - "checkpoint_location" -> "s3a://test/", - "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) - .create() + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options(FlintSparkIndexOptions(Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) + .create() - val index = flint.describeIndex(testIndex) - index shouldBe defined - val optionJson = compact(render(parse(index.get.metadata().getContent) \ "_meta" \ "options")) - optionJson should matchJson(""" - | { - | "auto_refresh": "true", - | "incremental_refresh": "false", - | "refresh_interval": "1 Minute", - | "checkpoint_location": "s3a://test/", - | "index_settings": "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" - | } - |""".stripMargin) + val index = flint.describeIndex(testIndex) + index shouldBe defined + val optionJson = + compact(render(parse(index.get.metadata().getContent) \ "_meta" \ "options")) + optionJson should matchJson(s""" + | { + | "auto_refresh": "true", + | "incremental_refresh": "false", + | "refresh_interval": "1 Minute", + | "checkpoint_location": "${checkpointDir.getAbsolutePath}", + | "index_settings": "{\\"number_of_shards\\": 3,\\"number_of_replicas\\": 2}" + | } + |""".stripMargin) - // Load index options from index mapping (verify OS index setting in SQL IT) - index.get.options.autoRefresh() shouldBe true - index.get.options.refreshInterval() shouldBe Some("1 Minute") - index.get.options.checkpointLocation() shouldBe Some("s3a://test/") - index.get.options.indexSettings() shouldBe - Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}") + // Load index options from index mapping (verify OS index setting in SQL IT) + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("1 Minute") + index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + index.get.options.indexSettings() shouldBe + Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}") + } } test("should not have ID column in index data") { @@ -233,16 +236,14 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should fail if incremental refresh without checkpoint location") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) - .create() - - assertThrows[IllegalStateException] { - flint.refreshIndex(testIndex) - } + the[IllegalArgumentException] thrownBy { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + .create() + } should have message "requirement failed: Checkpoint location is required by incremental refresh" } test("auto refresh skipping index successfully") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 53d08bda7..cddda3122 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -178,7 +178,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") { setFlintSparkConf(CHECKPOINT_MANDATORY, "true") try { - the[IllegalStateException] thrownBy { + the[IllegalArgumentException] thrownBy { sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( year PARTITION ) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index c5ac0ab95..c72b06fbf 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -199,12 +199,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "false", "incremental_refresh" -> "true", - "refresh_interval" -> "1 Minute"), + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> "s3a://test/"), Map( "auto_refresh" -> false, "incremental_refresh" -> true, "refresh_interval" -> Some("1 Minute"), - "checkpoint_location" -> None, + "checkpoint_location" -> Some("s3a://test/"), "watermark_delay" -> None)), ( Map("auto_refresh" -> "true"), @@ -223,12 +224,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "false", "incremental_refresh" -> "true", + "checkpoint_location" -> "s3a://test/", "watermark_delay" -> "1 Minute"), Map( "auto_refresh" -> false, "incremental_refresh" -> true, "refresh_interval" -> None, - "checkpoint_location" -> None, + "checkpoint_location" -> Some("s3a://test/"), "watermark_delay" -> Some("1 Minute"))))), ( "convert to auto refresh with allowed options", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index db92cf78f..047afb64c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,6 +12,7 @@ object Dependencies { "org.apache.spark" %% "spark-core" % sparkVersion % "provided" withSources (), "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" withSources (), "org.json4s" %% "json4s-native" % "3.7.0-M5" % "test", + "org.apache.spark" %% "spark-hive" % sparkVersion % "test", "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests")