Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier

class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
class CreateMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
Expand Down Expand Up @@ -67,14 +67,14 @@ class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with S
Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)"))
}

override def build = new MetadataCreateProcedure()
override def build = new CreateMetadataTableProcedure()
}

object MetadataCreateProcedure {
val NAME = "metadata_create"
object CreateMetadataTableProcedure {
val NAME = "create_metadata_table"
var metadataBaseDirectory: Option[String] = None

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataCreateProcedure()
override def get() = new CreateMetadataTableProcedure()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier

class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "commit_Time", DataTypes.StringType, None),
ProcedureParameter.required(1, "commit_time", DataTypes.StringType, None),
ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "comments", DataTypes.StringType, "")
)
Expand Down Expand Up @@ -75,14 +75,14 @@ class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with
Seq(Row(result))
}

override def build: Procedure = new CreateSavepointsProcedure()
override def build: Procedure = new CreateSavepointProcedure()
}

object CreateSavepointsProcedure {
val NAME: String = "create_savepoints"
object CreateSavepointProcedure {
val NAME: String = "create_savepoint"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): CreateSavepointsProcedure = new CreateSavepointsProcedure()
override def get(): CreateSavepointProcedure = new CreateSavepointProcedure()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.{Failure, Success, Try}
class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_Time", DataTypes.StringType, None)
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier

class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
Expand Down Expand Up @@ -58,14 +58,14 @@ class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with S
Seq(Row("Removed Metadata Table from " + metadataPath))
}

override def build = new MetadataDeleteProcedure()
override def build = new DeleteMetadataTableProcedure()
}

object MetadataDeleteProcedure {
val NAME = "metadata_delete"
object DeleteMetadataTableProcedure {
val NAME = "delete_metadata_table"
var metadataBaseDirectory: Option[String] = None

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataDeleteProcedure()
override def get() = new DeleteMetadataTableProcedure()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier

class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
Expand Down Expand Up @@ -74,14 +74,14 @@ class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with
Seq(Row(result))
}

override def build: Procedure = new DeleteSavepointsProcedure()
override def build: Procedure = new DeleteSavepointProcedure()
}

object DeleteSavepointsProcedure {
val NAME: String = "delete_savepoints"
object DeleteSavepointProcedure {
val NAME: String = "delete_savepoint"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): DeleteSavepointsProcedure = new DeleteSavepointsProcedure()
override def get(): DeleteSavepointProcedure = new DeleteSavepointProcedure()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L

private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "localFolder", DataTypes.StringType, None),
ProcedureParameter.required(1, "local_folder", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1),
ProcedureParameter.optional(3, "actions", DataTypes.StringType, defaultActions),
ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ import scala.language.higherKinds
class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
ProcedureParameter.required(2, "srcPath", DataTypes.StringType, None),
ProcedureParameter.required(3, "targetPath", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKey", DataTypes.StringType, None),
ProcedureParameter.required(5, "partitionKey", DataTypes.StringType, None),
ProcedureParameter.required(6, "schemaFilePath", DataTypes.StringType, None),
ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
ProcedureParameter.required(2, "src_path", DataTypes.StringType, None),
ProcedureParameter.required(3, "target_path", DataTypes.StringType, None),
ProcedureParameter.required(4, "row_key", DataTypes.StringType, None),
ProcedureParameter.required(5, "partition_key", DataTypes.StringType, None),
ProcedureParameter.required(6, "schema_file_path", DataTypes.StringType, None),
ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"),
ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"),
ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0),
ProcedureParameter.optional(10, "parallelism", DataTypes.IntegerType, jsc.defaultParallelism),
ProcedureParameter.optional(11, "propsFilePath", DataTypes.StringType, "")
ProcedureParameter.optional(11, "props_file_path", DataTypes.StringType, "")
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ object HoodieProcedures {
val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder()
mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
mapBuilder.put(CreateSavepointsProcedure.NAME, CreateSavepointsProcedure.builder)
mapBuilder.put(DeleteSavepointsProcedure.NAME, DeleteSavepointsProcedure.builder)
mapBuilder.put(RollbackSavepointsProcedure.NAME, RollbackSavepointsProcedure.builder)
mapBuilder.put(CreateSavepointProcedure.NAME, CreateSavepointProcedure.builder)
mapBuilder.put(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
mapBuilder.put(RollbackToSavepointProcedure.NAME, RollbackToSavepointProcedure.builder)
mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
Expand Down Expand Up @@ -66,13 +66,13 @@ object HoodieProcedures {
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
mapBuilder.put(ListMetadataFilesProcedure.NAME, ListMetadataFilesProcedure.builder)
mapBuilder.put(ListMetadataPartitionsProcedure.NAME, ListMetadataPartitionsProcedure.builder)
mapBuilder.put(MetadataCreateProcedure.NAME, MetadataCreateProcedure.builder)
mapBuilder.put(MetadataDeleteProcedure.NAME, MetadataDeleteProcedure.builder)
mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder)
mapBuilder.put(ShowMetadataStatsProcedure.NAME, ShowMetadataStatsProcedure.builder)
mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder)
mapBuilder.put(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
mapBuilder.put(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
mapBuilder.put(DeleteMetadataTableProcedure.NAME, DeleteMetadataTableProcedure.builder)
mapBuilder.put(InitMetadataTableProcedure.NAME, InitMetadataTableProcedure.builder)
mapBuilder.put(ShowMetadataTableStatsProcedure.NAME, ShowMetadataTableStatsProcedure.builder)
mapBuilder.put(ValidateMetadataTableFilesProcedure.NAME, ValidateMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier

class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
class InitMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "readOnly", DataTypes.BooleanType, false)
ProcedureParameter.optional(1, "read_only", DataTypes.BooleanType, false)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down Expand Up @@ -71,14 +71,14 @@ class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with Spa
Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "sec)"))
}

override def build = new MetadataInitProcedure()
override def build = new InitMetadataTableProcedure()
}

object MetadataInitProcedure {
val NAME = "metadata_init"
object InitMetadataTableProcedure {
val NAME = "init_metadata_table"
var metadataBaseDirectory: Option[String] = None

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataInitProcedure()
override def get() = new InitMetadataTableProcedure()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier

class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
Expand Down Expand Up @@ -74,14 +74,14 @@ class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder wi
Seq(Row(result))
}

override def build: Procedure = new RollbackSavepointsProcedure()
override def build: Procedure = new RollbackToSavepointProcedure()
}

object RollbackSavepointsProcedure {
val NAME: String = "rollback_savepoints"
object RollbackToSavepointProcedure {
val NAME: String = "rollback_to_savepoint"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): RollbackSavepointsProcedure = new RollbackSavepointsProcedure()
override def get(): RollbackToSavepointProcedure = new RollbackToSavepointProcedure()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ import java.util.function.Supplier
class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
ProcedureParameter.required(2, "bootstrapPath", DataTypes.StringType, None),
ProcedureParameter.required(3, "basePath", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKeyField", DataTypes.StringType, None),
ProcedureParameter.optional(5, "baseFileFormat", DataTypes.StringType, "PARQUET"),
ProcedureParameter.optional(6, "partitionPathField", DataTypes.StringType, ""),
ProcedureParameter.optional(7, "bootstrapIndexClass", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
ProcedureParameter.optional(8, "selectorClass", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
ProcedureParameter.optional(9, "keyGeneratorClass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"),
ProcedureParameter.optional(10, "fullBootstrapInputProvider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
ProcedureParameter.optional(11, "schemaProviderClass", DataTypes.StringType, ""),
ProcedureParameter.optional(12, "payloadClass", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
ProcedureParameter.required(2, "bootstrap_path", DataTypes.StringType, None),
ProcedureParameter.required(3, "base_path", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKey_field", DataTypes.StringType, None),
ProcedureParameter.optional(5, "base_file_format", DataTypes.StringType, "PARQUET"),
ProcedureParameter.optional(6, "partition_path_field", DataTypes.StringType, ""),
ProcedureParameter.optional(7, "bootstrap_index_class", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
ProcedureParameter.optional(8, "selector_class", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
ProcedureParameter.optional(9, "key_generator_glass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"),
ProcedureParameter.optional(10, "full_bootstrap_input_provider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
ProcedureParameter.optional(11, "schema_provider_class", DataTypes.StringType, ""),
ProcedureParameter.optional(12, "payload_class", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500),
ProcedureParameter.optional(14, "enableHiveSync", DataTypes.BooleanType, false),
ProcedureParameter.optional(15, "propsFilePath", DataTypes.StringType, ""),
ProcedureParameter.optional(16, "bootstrapOverwrite", DataTypes.BooleanType, false)
ProcedureParameter.optional(14, "enable_hive_sync", DataTypes.BooleanType, false),
ProcedureParameter.optional(15, "props_file_path", DataTypes.StringType, ""),
ProcedureParameter.optional(16, "bootstrap_overwrite", DataTypes.BooleanType, false)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging

private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "skipLocking", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "scheduleInLine", DataTypes.BooleanType, true),
ProcedureParameter.optional(3, "cleanPolicy", DataTypes.StringType, None),
ProcedureParameter.optional(4, "retainCommits", DataTypes.IntegerType, 10)
ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, true),
ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None),
ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(2, "startTs", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "endTs", DataTypes.StringType, "")
ProcedureParameter.optional(2, "start_ts", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "end_ts", DataTypes.StringType, "")
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand All @@ -63,7 +63,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_logblocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty),
Expand Down
Loading