Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ private[kafka010] class KafkaDataWriter(

def abort(): Unit = {}

def close(): Unit = {
def close(): Unit = {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe; previous implementation cleans up the instance from the cache immediately so it actually helps a bit, but no big deal even we don't do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to adding the close API?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there's conflict on naming; test code calls it as close(). If we would like to keep the code as it is, I can rename previous method as invalidateProducer() and leave it as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the life cycle of kafka producers? IIRC they were cached before, but that patch gets reverted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there's no "return" in current Kafka producer cache and the cache evicts the expired producer on policy. Previously we force invalidating the Kafka producer when close() is explicitly called as callers of close() are temporarily using the producer (instead of running some query), and current code just let cache expire the producer on policy for all cases.

Copy link
Contributor

@cloud-fan cloud-fan Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM then, if the life cycle of producers are controled by the cache policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I just renamed origin close() method to invalidateProducer() to avoid the effect on Kafka side.


/** explicitly invalidate producer from pool. only for testing. */
private[kafka010] def invalidateProducer(): Unit = {
checkForErrors()
if (producer != null) {
producer.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ class KafkaContinuousSinkSuite extends KafkaSinkStreamingSuiteBase {
iter.foreach(writeTask.write(_))
writeTask.commit()
} finally {
writeTask.close()
writeTask.invalidateProducer()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.connector.write;

import java.io.Closeable;
import java.io.IOException;

import org.apache.spark.annotation.Evolving;
Expand All @@ -31,8 +32,9 @@
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle
* is over and Spark will not use it again.
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, Spark will
* call {@link #close()} to let DataWriter doing resource cleanup. After calling {@link #close()},
* its lifecycle is over and Spark will not use it again.
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
Expand All @@ -56,7 +58,7 @@
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}.
*/
@Evolving
public interface DataWriter<T> {
public interface DataWriter<T> extends Closeable {

/**
* Writes one record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,6 @@ private class BufferWriter extends DataWriter[InternalRow] {
override def commit(): WriterCommitMessage = buffer

override def abort(): Unit = {}

override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ abstract class FileFormatDataWriter(
committer.abortTask(taskAttemptContext)
}
}

override def close(): Unit = {}
}

/** FileFormatWriteTask for empty partitions */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ object FileFormatWriter extends Logging {
// If there is an error, abort the task
dataWriter.abort()
logError(s"Job $jobId aborted.")
}, finallyBlock = {
dataWriter.close()
})
} catch {
case e: FetchFailedException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] {
override def write(record: InternalRow): Unit = {}
override def commit(): WriterCommitMessage = null
override def abort(): Unit = {}
override def close(): Unit = {}
}

private[noop] object NoopStreamingWrite extends StreamingWrite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ object DataWritingSparkTask extends Logging {
dataWriter.abort()
logError(s"Aborted commit for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
}, finallyBlock = {
dataWriter.close()
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writerFactory: StreamingDat
logError(s"Writer for partition ${context.partitionId()} is aborting.")
if (dataWriter != null) dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
}, finallyBlock = {
dataWriter.close()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ForeachDataWriter[T](

// If open returns false, we should skip writing rows.
private val opened = writer.open(partitionId, epochId)
private var closeCalled: Boolean = false
private var errorOrNull: Throwable = _

override def write(record: InternalRow): Unit = {
if (!opened) return
Expand All @@ -144,25 +144,24 @@ class ForeachDataWriter[T](
writer.process(rowConverter(record))
} catch {
case t: Throwable =>
closeWriter(t)
errorOrNull = t
throw t
}

}

override def commit(): WriterCommitMessage = {
closeWriter(null)
ForeachWriterCommitMessage
}

override def abort(): Unit = {
closeWriter(new SparkException("Foreach writer has been aborted due to a task failure"))
if (errorOrNull == null) {
errorOrNull = new SparkException("Foreach writer has been aborted due to a task failure")
}
}

private def closeWriter(errorOrNull: Throwable): Unit = {
if (!closeCalled) {
closeCalled = true
writer.close(errorOrNull)
}
override def close(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change clearly shows the difference; DataWriter implementations don't need to deal with possible double resource cleanup.

writer.close(errorOrNull)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ class PackedRowDataWriter() extends DataWriter[InternalRow] with Logging {
override def write(row: InternalRow): Unit = data.append(row.copy())

override def commit(): PackedRowCommitMessage = {
val msg = PackedRowCommitMessage(data.toArray)
data.clear()
msg
PackedRowCommitMessage(data.toArray)
}

override def abort(): Unit = data.clear()
override def abort(): Unit = {}

override def close(): Unit = {
data.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ class MemoryDataWriter(partition: Int, schema: StructType)
}

override def abort(): Unit = {}

override def close(): Unit = {}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,6 @@ class CSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[InternalRow]
fs.delete(file, false)
}
}

override def close(): Unit = {}
}