Skip to content

Commit

Permalink
GEOMESA-3383 Accumulo - Bulk copy returns status (#3208)
Browse files Browse the repository at this point in the history
* Add redundant close check
* Add flush to clone table op
* Update LazyCloseable to use Closeable implicits
  • Loading branch information
elahrvivaz authored Oct 3, 2024
1 parent 2ddc413 commit f685295
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.hadoop.tools.DistCp
import org.geotools.api.data.DataStoreFinder
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreParams}
import org.locationtech.geomesa.accumulo.util.SchemaCopier.{Cluster, ClusterConfig, CopyOptions, PartitionId, PartitionName, PartitionValue}
import org.locationtech.geomesa.accumulo.util.SchemaCopier._
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex
import org.locationtech.geomesa.index.conf.partition.TablePartition
Expand All @@ -29,8 +29,8 @@ import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose}
import java.io.{Closeable, File, IOException}
import java.nio.charset.StandardCharsets
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ListBuffer
import java.util.concurrent.{Callable, ConcurrentHashMap}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Try
import scala.util.control.NonFatal

Expand All @@ -54,14 +54,15 @@ class SchemaCopier(
indices: Seq[String],
partitions: Seq[PartitionId],
options: CopyOptions,
) extends Runnable with Closeable with StrictLogging {
) extends Callable[Set[CopyResult]] with Closeable with StrictLogging {

import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

import scala.collection.JavaConverters._

private val tryFrom = Try(Cluster(fromCluster))
private val tryTo = Try(Cluster(toCluster))
private var closed = false

// note: all other class variables are lazy, so that we can instantiate an instance and then clean up connections on close()

Expand Down Expand Up @@ -100,9 +101,9 @@ class SchemaCopier(
sft
}

lazy private val fromIndices = {
lazy private val indexPairs: Seq[(GeoMesaFeatureIndex[_, _], GeoMesaFeatureIndex[_, _])] = {
val all = from.ds.manager.indices(sft)
if (indices.isEmpty) { all } else {
val fromIndices = if (indices.isEmpty) { all } else {
val builder = Seq.newBuilder[GeoMesaFeatureIndex[_, _]]
indices.foreach { ident =>
val filtered = all.filter(_.identifier.contains(ident))
Expand All @@ -115,6 +116,7 @@ class SchemaCopier(
}
builder.result.distinct
}
fromIndices.map(from => from -> to.ds.manager.index(sft, from.identifier))
}

// these get passed into our index method calls - for partitioned schemas, it must be a Seq[Some[_]],
Expand All @@ -134,7 +136,7 @@ class SchemaCopier(
}
if (builder.isEmpty) {
logger.debug("No partitions specified - loading all partitions from store")
builder ++= fromIndices.flatMap(_.getPartitions)
builder ++= indexPairs.flatMap(_._1.getPartitions)
}
builder.result.distinct.sorted.map(Option.apply)
} else {
Expand All @@ -145,38 +147,50 @@ class SchemaCopier(
}
}

// planned copies
lazy val plans: Set[CopyPlan] =
fromPartitions.flatMap { partition =>
indexPairs.map { case (fromIndex, _) =>
CopyPlan(fromIndex.identifier, partition)
}
}.toSet

/**
* Execute the copy
*
* @return results
*/
override def run(): Unit = run(false)
override def call(): Set[CopyResult] = call(false)

/**
* Execute the copy
*
* @param resume resume from a previously interrupted run, vs overwrite any existing output
* @return results
*/
def run(resume: Boolean): Unit = {
def call(resume: Boolean): Set[CopyResult] = {
val results = Collections.newSetFromMap(new ConcurrentHashMap[CopyResult, java.lang.Boolean]())
CachedThreadPool.executor(options.tableConcurrency) { executor =>
fromPartitions.foreach { partition =>
fromIndices.foreach { fromIndex =>
val toIndex = to.ds.manager.index(sft, fromIndex.identifier)
indexPairs.foreach { case (fromIndex, toIndex) =>
val partitionLogId = s"${partition.fold(s"index")(p => s"partition $p")} ${fromIndex.identifier}"
val runnable: Runnable = () => {
try {
logger.info(s"Copying $partitionLogId")
copy(fromIndex, toIndex, partition, resume, partitionLogId)
logger.info(s"Bulk copy complete for $partitionLogId")
} catch {
// catch Throwable so NoClassDefFound still gets logged
case e: Throwable =>
logger.info(s"Copying $partitionLogId")
val result = copy(fromIndex, toIndex, partition, resume, partitionLogId)
result.error match {
case None =>
logger.info(s"Bulk copy complete for $partitionLogId")
case Some(e) =>
logger.error(s"Error copying $partitionLogId: ${e.getMessage}")
logger.debug(s"Error copying $partitionLogId", e)
}
results.add(result)
}
executor.submit(runnable)
}
}
}
results.asScala.toSet
}

/**
Expand All @@ -187,19 +201,50 @@ class SchemaCopier(
* @param partition partition name - must be Some if schema is partitioned
* @param resume use any partial results from a previous run, if present
* @param partitionLogId identifier for log messages
* @return result
*/
private def copy(
fromIndex: GeoMesaFeatureIndex[_, _],
toIndex: GeoMesaFeatureIndex[_, _],
partition: Option[String],
resume: Boolean,
partitionLogId: String): Unit = {
partitionLogId: String): CopyResult = {
val start = System.currentTimeMillis()
val files = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
var fromTable: String = ""
// lazy so that table, files and finish time are filled in appropriately
lazy val result =
CopyResult(fromIndex.identifier, partition, fromTable, files.asScala.toSeq, None, start, System.currentTimeMillis())
try {
fromTable = fromIndex.getTableName(partition)
copy(fromTable, toIndex, partition, resume, partitionLogId, files)
result
} catch {
// catch Throwable so NoClassDefFound still gets logged
case e: Throwable => result.withError(e)
}
}

require(sft.isPartitioned == partition.isDefined) // sanity check - this should always be true due to our setup
/**
* Copy a single index + partition
*
* @param fromTable from table
* @param toIndex to index
* @param partition partition name - must be Some if schema is partitioned
* @param resume use any partial results from a previous run, if present
* @param partitionLogId identifier for log messages
* @param fileResults set to hold files that we've copied successfully
* @return result
*/
private def copy(
fromTable: String,
toIndex: GeoMesaFeatureIndex[_, _],
partition: Option[String],
resume: Boolean,
partitionLogId: String,
fileResults: java.util.Set[String]): Unit = {

val fromTable = try { fromIndex.getTableName(partition) } catch {
case NonFatal(e) => throw new RuntimeException("Could not get source table", e)
}
require(sft.isPartitioned == partition.isDefined) // sanity check - this should always be true due to our setup

val completeMarker = new Path(exportPath, s"$fromTable.complete")
if (exportFs.exists(completeMarker)) {
Expand Down Expand Up @@ -228,7 +273,7 @@ class SchemaCopier(
logger.debug(s"Checking for existence and deleting any existing cloned table $cloneTable")
from.ds.adapter.deleteTable(cloneTable) // no-op if table doesn't exist
logger.debug(s"Cloning $fromTable to $cloneTable")
from.tableOps.clone(fromTable, cloneTable, false, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility
from.tableOps.clone(fromTable, cloneTable, true, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility
logger.debug(s"Taking $cloneTable offline")
from.tableOps.offline(cloneTable, true)
}
Expand Down Expand Up @@ -268,7 +313,7 @@ class SchemaCopier(
to.tableOps.addSplits(toTable, splits)
}

val hadCopyError = new AtomicBoolean(false)
val copyErrors = Collections.newSetFromMap(new ConcurrentHashMap[Throwable, java.lang.Boolean]())
// read the distcp.txt file produced by the table export
// consumer: (src, dest) => Unit
def distCpConsumer(threads: Int)(consumer: (Path, Path) => Unit): Unit = {
Expand All @@ -289,7 +334,7 @@ class SchemaCopier(
} catch {
// catch Throwable so NoClassDefFound still gets logged
case e: Throwable =>
hadCopyError.set(true)
copyErrors.add(e)
logger.error(s"Failed to copy $path to $copy: ${e.getMessage}")
logger.debug(s"Failed to copy $path to $copy", e)
}
Expand All @@ -302,15 +347,22 @@ class SchemaCopier(

if (options.distCp) {
var inputPath = distcpPath
val distCpFiles = ArrayBuffer.empty[String]
if (resume) {
logger.debug(s"Checking copy status of files in $distcpPath")
inputPath = new Path(tableExportPath, "distcp-remaining.txt")
WithClose(exportFs.create(inputPath, true)) { out =>
distCpConsumer(1) { (path, _) =>
logger.debug(s"Adding $path to distcp")
out.writeUTF(s"$path\n")
distCpFiles += path.getName
}
}
} else {
logger.debug(s"Checking file list at $distcpPath")
distCpConsumer(1) { (path, _) =>
distCpFiles += path.getName
}
}
val job = new DistCp(from.conf, DistributedCopyOptions(inputPath, copyToDir)).execute()
logger.info(s"Tracking available at ${job.getStatus.getTrackingUrl}")
Expand All @@ -319,22 +371,28 @@ class SchemaCopier(
}
if (job.isSuccessful) {
logger.info(s"Successfully copied data to $copyToDir")
fileResults.addAll(distCpFiles.asJava)
} else {
hadCopyError.set(true)
logger.error(s"Job failed with state ${job.getStatus.getState} due to: ${job.getStatus.getFailureInfo}")
val msg = s"DistCp job failed with state ${job.getStatus.getState} due to: ${job.getStatus.getFailureInfo}"
copyErrors.add(new RuntimeException(msg))
logger.error(msg)
}
} else {
distCpConsumer(options.fileConcurrency) { (path, copy) =>
logger.debug(s"Copying $path to $copy")
val fs = path.getFileSystem(from.conf)
if (!FileUtil.copy(fs, path, exportFs, copy, false, true, to.conf)) {
if (FileUtil.copy(fs, path, exportFs, copy, false, true, to.conf)) {
fileResults.add(path.getName)
} else {
// consolidate error handling in the catch block
throw new IOException(s"Failed to copy $path to $copy, copy returned false")
}
}
}
if (hadCopyError.get) {
throw new RuntimeException("Error copying data files")
if (!copyErrors.isEmpty) {
val e = new RuntimeException("Error copying data files")
copyErrors.asScala.foreach(e.addSuppressed)
throw e
}

logger.debug(s"Loading rfiles from $copyToDir to $toTable")
Expand All @@ -357,9 +415,12 @@ class SchemaCopier(
from.tableOps.delete(cloneTable)
}

override def close(): Unit = {
CloseWithLogging(tryFrom.toOption)
CloseWithLogging(tryTo.toOption)
override def close(): Unit = synchronized {
if (!closed) {
closed = true
CloseWithLogging(tryFrom.toOption)
CloseWithLogging(tryTo.toOption)
}
}
}

Expand Down Expand Up @@ -395,6 +456,36 @@ object SchemaCopier {
*/
case class CopyOptions(tableConcurrency: Int = 1, fileConcurrency: Int = 4, distCp: Boolean = false)

/**
* Planned copy operations
*
* @param index index id planned to copy
* @param partition partition planned to copy
*/
case class CopyPlan(index: String, partition: Option[String])

/**
* Result of a copy operation
*
* @param index index id being copied
* @param partition partition being copied, if table is partitioned
* @param table table being copied
* @param files list of files that were successfully copied
* @param error error, if any
* @param start start of operation, in unix time
* @param finish end of operation, in unix time
*/
case class CopyResult(
index: String,
partition: Option[String],
table: String,
files: Seq[String],
error: Option[Throwable],
start: Long,
finish: Long) {
def withError(e: Throwable): CopyResult = copy(error = Option(e))
}

/**
* Holds state for a given Accumulo cluster
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AccumuloBulkCopyCommand extends Command with StrictLogging {

WithClose(new SchemaCopier(fromCluster, toCluster, params.featureName, params.exportPath, indices, partitions, opts)) { copier =>
try {
copier.run(params.resume)
copier.call(params.resume)
} catch {
case e: IllegalArgumentException => throw new ParameterException(e.getMessage)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

package org.locationtech.geomesa.utils.concurrent

import org.locationtech.geomesa.utils.io.IsCloseable

import java.io.Closeable

class LazyCloseable[T <: Closeable](create: => T) extends Closeable {
class LazyCloseable[T: IsCloseable](create: => T) extends Closeable {

@volatile
private var initialized = false
Expand All @@ -22,7 +24,7 @@ class LazyCloseable[T <: Closeable](create: => T) extends Closeable {

override def close(): Unit = {
if (initialized) {
instance.close()
implicitly[IsCloseable[T]].close(instance).get
}
}
}

0 comments on commit f685295

Please sign in to comment.