From ecdd06ca8b7205a08662a8a32837768dc6ff157f Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 12 Apr 2024 11:10:16 +0800 Subject: [PATCH 1/7] [SPARK-47594] Connector module: Migrate logInfo with variables to structured logging framework --- .../org/apache/spark/internal/LogKey.scala | 35 +++++++++++++++++++ .../org/apache/spark/sql/avro/AvroUtils.scala | 7 ++-- .../execution/ExecuteGrpcResponseSender.scala | 32 ++++++++++------- .../execution/ExecuteResponseObserver.scala | 21 ++++++----- .../connect/planner/SparkConnectPlanner.scala | 6 ++-- .../planner/StreamingForeachBatchHelper.scala | 17 +++++---- .../StreamingQueryListenerHelper.scala | 6 ++-- .../connect/service/LoggingInterceptor.scala | 8 +++-- .../sql/connect/service/SessionHolder.scala | 12 ++++--- .../SparkConnectExecutionManager.scala | 14 ++++---- .../connect/service/SparkConnectServer.scala | 8 ++--- .../connect/service/SparkConnectService.scala | 5 +-- .../service/SparkConnectSessionManager.scala | 9 +++-- .../SparkConnectStreamingQueryCache.scala | 20 +++++++---- .../spark/sql/connect/utils/ErrorUtils.scala | 4 +-- .../kafka010/KafkaBatchPartitionReader.scala | 14 +++++--- .../sql/kafka010/KafkaContinuousStream.scala | 4 +-- .../sql/kafka010/KafkaMicroBatchStream.scala | 4 +-- .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 4 +-- .../kafka010/KafkaOffsetReaderConsumer.scala | 4 +-- .../spark/sql/kafka010/KafkaRelation.scala | 7 ++-- .../apache/spark/sql/kafka010/KafkaSink.scala | 5 +-- .../spark/sql/kafka010/KafkaSource.scala | 11 +++--- .../spark/sql/kafka010/KafkaSourceRDD.scala | 6 ++-- .../kafka010/consumer/KafkaDataConsumer.scala | 13 ++++--- .../producer/CachedKafkaProducer.scala | 5 +-- .../spark/sql/kafka010/KafkaTestUtils.scala | 10 +++--- .../kafka010/DirectKafkaInputDStream.scala | 9 +++-- .../kafka010/KafkaDataConsumer.scala | 18 ++++++---- .../spark/streaming/kafka010/KafkaRDD.scala | 12 ++++--- .../streaming/kinesis/KinesisReceiver.scala | 7 ++-- .../kinesis/KinesisRecordProcessor.scala | 12 ++++--- .../profiler/ExecutorJVMProfiler.scala | 5 +-- .../profiler/ExecutorProfilerPlugin.scala | 6 ++-- 34 files changed, 231 insertions(+), 129 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 28b06f448784..c0374f2a2091 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -26,15 +26,21 @@ object LogKey extends Enumeration { val APP_DESC = Value val APP_ID = Value val APP_STATE = Value + val BATCH_ID = Value val BLOCK_ID = Value val BLOCK_MANAGER_ID = Value val BROADCAST_ID = Value val BUCKET = Value val BYTECODE_SIZE = Value + val CACHE_AUTO_REMOVED_SIZE = Value + val CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value + val CACHE_UNTIL_LAST_PRODUCED_SIZE = Value val CATEGORICAL_FEATURES = Value val CLASS_LOADER = Value val CLASS_NAME = Value val CLUSTER_ID = Value + val CODEC_LEVEL = Value + val CODEC_NAME = Value val COLUMN_DATA_TYPE_SOURCE = Value val COLUMN_DATA_TYPE_TARGET = Value val COLUMN_DEFAULT_VALUE = Value @@ -45,9 +51,12 @@ object LogKey extends Enumeration { val CONFIG = Value val CONFIG2 = Value val CONFIG3 = Value + val CONSUMER = Value val CONTAINER = Value val CONTAINER_ID = Value val COUNT = Value + val COUNT_POLL = Value + val COUNT_RECORDS_POLL = Value val CSV_HEADER_COLUMN_NAME = Value val CSV_HEADER_COLUMN_NAMES = Value val CSV_HEADER_LENGTH = Value @@ -56,13 +65,18 @@ object LogKey extends Enumeration { val CSV_SOURCE = Value val DATA = Value val DATABASE_NAME = Value + val DATAFRAME_ID = Value + val DESCRIPTION = Value val DRIVER_ID = Value val DROPPED_PARTITIONS = Value + val DURATION = Value val END_POINT = Value val ENGINE = Value val ERROR = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value + val EXECUTE_INFO = Value + val EXECUTE_KEY = Value val EXECUTOR_ID = Value val EXECUTOR_STATE = Value val EXIT_CODE = Value @@ -72,6 +86,7 @@ object LogKey extends Enumeration { val FIELD_NAME = Value val FILE_FORMAT = Value val FILE_FORMAT2 = Value + val FROM_OFFSET = Value val FUNCTION_NAME = Value val FUNCTION_PARAMETER = Value val GROUP_ID = Value @@ -81,22 +96,29 @@ object LogKey extends Enumeration { val HOST = Value val INDEX = Value val INFERENCE_MODE = Value + val INITIAL_CAPACITY = Value + val INTERVAL = Value val JOB_ID = Value val JOIN_CONDITION = Value val JOIN_CONDITION_SUB_EXPRESSION = Value val KEY = Value + val LAST_ACCESS_TIME = Value val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value val LISTENER = Value + val LOAD_FACTOR = Value val LOG_TYPE = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value + val MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value + val MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE = Value val MAX_CAPACITY = Value val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value val MERGE_DIR_NAME = Value + val MESSAGE = Value val METHOD_NAME = Value val MIN_SIZE = Value val NEW_VALUE = Value @@ -105,6 +127,7 @@ object LogKey extends Enumeration { val OBJECT_ID = Value val OFFSET = Value val OFFSETS = Value + val OFFSET_RANGE = Value val OLD_BLOCK_MANAGER_ID = Value val OLD_VALUE = Value val OPTIMIZER_CLASS_NAME = Value @@ -120,6 +143,7 @@ object LogKey extends Enumeration { val POLICY = Value val PORT = Value val PRODUCER_ID = Value + val QUERY_CACHE = Value val QUERY_HINT = Value val QUERY_ID = Value val QUERY_PLAN = Value @@ -128,6 +152,7 @@ object LogKey extends Enumeration { val RANGE = Value val RDD_ID = Value val REASON = Value + val REATTACHABLE = Value val RECEIVED_BLOCK_INFO = Value val REDUCE_ID = Value val RELATION_NAME = Value @@ -139,10 +164,13 @@ object LogKey extends Enumeration { val RULE_BATCH_NAME = Value val RULE_NAME = Value val RULE_NUMBER_OF_RUNS = Value + val RUN_ID = Value val SCHEMA = Value val SCHEMA2 = Value val SERVICE_NAME = Value + val SESSION_HOLD_INFO = Value val SESSION_ID = Value + val SESSION_KEY = Value val SHARD_ID = Value val SHUFFLE_BLOCK_INFO = Value val SHUFFLE_ID = Value @@ -166,12 +194,19 @@ object LogKey extends Enumeration { val THREAD = Value val THREAD_NAME = Value val TID = Value + val TIME = Value val TIMEOUT = Value val TIME_UNITS = Value val TIP = Value + val TOPIC = Value val TOPIC_PARTITION = Value + val TOPIC_PARTITIONS = Value + val TOPIC_PARTITION_OFFSET = Value val TOTAL_EFFECTIVE_TIME = Value + val TOTAL_RECORDS_READ = Value + val TOTAL_SIZE = Value val TOTAL_TIME = Value + val TOTAL_TIME_READ = Value val UNSUPPORTED_EXPRESSION = Value val UNSUPPORTED_HINT_REASON = Value val UNTIL_OFFSET = Value diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index e8be11f48a2b..4bedd625e609 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{CONFIG, PATH} +import org.apache.spark.internal.LogKey.{CODEC_LEVEL, CODEC_NAME, CONFIG, PATH} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroCompressionCodec._ import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION @@ -118,7 +118,8 @@ private[sql] object AvroUtils extends Logging { if (compressed.getSupportCompressionLevel) { val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level", compressed.getDefaultCompressionLevel.toString) - logInfo(s"Compressing Avro output using the $codecName codec at level $level") + logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec " + + log"at level ${MDC(CODEC_LEVEL, level)}") val s = if (compressed == ZSTANDARD) { val bufferPoolEnabled = sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED) jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, bufferPoolEnabled) @@ -128,7 +129,7 @@ private[sql] object AvroUtils extends Logging { } jobConf.setInt(s"avro.mapred.$s.level", level.toInt) } else { - logInfo(s"Compressing Avro output using the $codecName codec") + logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec") } } case unknown => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 1139507a37a5..cd8851310f96 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -25,7 +25,7 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver} import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{INDEX, OP_ID, TOTAL_TIME, WAIT_RESULT_TIME, WAIT_SEND_TIME} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION, CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE, CONNECT_PROGRESS_REPORT_INTERVAL} @@ -182,10 +182,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( * that. 0 means start from beginning (since first response has index 1) */ def execute(lastConsumedStreamIndex: Long): Unit = { - logInfo( - s"Starting for opId=${executeHolder.operationId}, " + - s"reattachable=${executeHolder.reattachable}, " + - s"lastConsumedStreamIndex=$lastConsumedStreamIndex") + logInfo(log"Starting for opId=${MDC(OP_ID, executeHolder.operationId)}, " + + log"reattachable=${MDC(REATTACHABLE, executeHolder.reattachable)}, " + + log"lastConsumedStreamIndex=${MDC(STREAM_ID, lastConsumedStreamIndex)}") val startTime = System.nanoTime() var nextIndex = lastConsumedStreamIndex + 1 @@ -297,10 +296,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( enqueueProgressMessage() // Stream is finished and all responses have been sent logInfo( - s"Stream finished for opId=${executeHolder.operationId}, " + - s"sent all responses up to last index ${nextIndex - 1}. " + - s"totalTime=${System.nanoTime - startTime}ns " + - s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns") + log"Stream finished for opId=${MDC(OP_ID, executeHolder.operationId)}, " + + log"sent all responses up to last index ${MDC(STREAM_ID, nextIndex - 1)}. " + + log"totalTime=${MDC(LogKey.TOTAL_TIME, + (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForResults=${MDC(WAIT_RESULT_TIME, + consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") executionObserver.getError() match { case Some(t) => grpcObserver.onError(t) case None => grpcObserver.onCompleted() @@ -310,10 +312,14 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( // The stream is not complete, but should be finished now. // The client needs to reattach with ReattachExecute. logInfo( - s"Deadline reached, shutting down stream for opId=${executeHolder.operationId} " + - s"after index ${nextIndex - 1}. " + - s"totalTime=${System.nanoTime - startTime}ns " + - s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns") + log"Deadline reached, shutting down stream for " + + log"opId=${MDC(OP_ID, executeHolder.operationId)} " + + log"after index ${MDC(STREAM_ID, nextIndex - 1)}. " + + log"totalTime=${MDC(TOTAL_TIME, + (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForResults=${MDC(WAIT_RESULT_TIME, + consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") grpcObserver.onCompleted() finished = true } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 92c23c6165d2..8c5d0a8d7ec2 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -26,7 +26,8 @@ import io.grpc.stub.StreamObserver import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey import org.apache.spark.sql.connect.config.Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE import org.apache.spark.sql.connect.service.ExecuteHolder @@ -233,13 +234,17 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: def removeAll(): Unit = responseLock.synchronized { removeResponsesUntilIndex(lastProducedIndex) logInfo( - s"Release all for opId=${executeHolder.operationId}. Execution stats: " + - s"total=${totalSize} " + - s"autoRemoved=${autoRemovedSize} " + - s"cachedUntilConsumed=$cachedSizeUntilHighestConsumed " + - s"cachedUntilProduced=$cachedSizeUntilLastProduced " + - s"maxCachedUntilConsumed=${cachedSizeUntilHighestConsumed.max} " + - s"maxCachedUntilProduced=${cachedSizeUntilLastProduced.max}") + log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " + + log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " + + log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " + + log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, + cachedSizeUntilHighestConsumed)} " + + log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, + cachedSizeUntilLastProduced)} " + + log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, + cachedSizeUntilHighestConsumed.max)} " + + log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, + cachedSizeUntilLastProduced.max)}") } /** Returns if the stream is finished. */ diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 96db45c5c63e..216ab6af817a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -38,7 +38,8 @@ import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.SESSION_ID import org.apache.spark.ml.{functions => MLFunctions} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest} import org.apache.spark.sql.{Column, Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, SparkSession} @@ -3103,7 +3104,8 @@ class SparkConnectPlanner( } } catch { case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any. - logInfo(s"Removing foreachBatch worker, query failed to start for session $sessionId.") + logInfo(log"Removing foreachBatch worker, query failed to start " + + log"for session ${MDC(SESSION_ID, sessionId)}.") foreachBatchRunnerCleaner.foreach(_.close()) throw ex } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index ce75ba3eb598..30767ca621b8 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -26,7 +26,8 @@ import scala.util.control.NonFatal import org.apache.spark.SparkException import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DATAFRAME_ID, QUERY_ID, RUN_ID, SESSION_ID} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.connect.service.SessionHolder import org.apache.spark.sql.connect.service.SparkConnectService @@ -63,7 +64,8 @@ object StreamingForeachBatchHelper extends Logging { sessionHolder: SessionHolder): ForeachBatchFnType = { (df: DataFrame, batchId: Long) => { val dfId = UUID.randomUUID().toString - logInfo(s"Caching DataFrame with id $dfId") // TODO: Add query id to the log. + // TODO: Add query id to the log. + logInfo(log"Caching DataFrame with id ${MDC(DATAFRAME_ID, dfId)}") // TODO(SPARK-44462): Sanity check there is no other active DataFrame for this query. // The query id needs to be saved in the cache for this check. @@ -72,7 +74,7 @@ object StreamingForeachBatchHelper extends Logging { try { fn(FnArgsWithId(dfId, df, batchId)) } finally { - logInfo(s"Removing DataFrame with id $dfId from the cache") + logInfo(log"Removing DataFrame with id ${MDC(DATAFRAME_ID, dfId)} from the cache") sessionHolder.removeCachedDataFrame(dfId) } } @@ -133,7 +135,8 @@ object StreamingForeachBatchHelper extends Logging { try { dataIn.readInt() match { case 0 => - logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: 0)") + logInfo(log"Python foreach batch for dfId ${MDC(DATAFRAME_ID, args.dfId)} " + + log"completed (ret: 0)") case SpecialLengths.PYTHON_EXCEPTION_THROWN => val msg = PythonWorkerUtils.readUTF(dataIn) throw new PythonException( @@ -169,7 +172,8 @@ object StreamingForeachBatchHelper extends Logging { private lazy val streamingListener = { // Initialized on first registered query val listener = new StreamingRunnerCleanerListener sessionHolder.session.streams.addListener(listener) - logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}") + logInfo(log"Registered runner clean up listener for " + + log"session ${MDC(SESSION_ID, sessionHolder.sessionId)}") listener } @@ -195,7 +199,8 @@ object StreamingForeachBatchHelper extends Logging { private def cleanupStreamingRunner(key: CacheKey): Unit = { Option(cleanerCache.remove(key)).foreach { cleaner => - logInfo(s"Cleaning up runner for queryId ${key.queryId} runId ${key.runId}.") + logInfo(log"Cleaning up runner for queryId ${MDC(QUERY_ID, key.queryId)} " + + log"runId ${MDC(RUN_ID, key.runId)}.") cleaner.close() } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala index 685991dbed87..d2dd81382e54 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala @@ -21,7 +21,8 @@ import java.io.EOFException import org.apache.spark.SparkException import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.FUNCTION_NAME import org.apache.spark.sql.connect.service.{SessionHolder, SparkConnectService} import org.apache.spark.sql.streaming.StreamingQueryListener @@ -82,7 +83,8 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder try { dataIn.readInt() match { case 0 => - logInfo(s"Streaming query listener function $functionName completed (ret: 0)") + logInfo(log"Streaming query listener function ${MDC(FUNCTION_NAME, functionName)} " + + log"completed (ret: 0)") case SpecialLengths.PYTHON_EXCEPTION_THROWN => val msg = PythonWorkerUtils.readUTF(dataIn) throw new PythonException( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala index 2d848d3c8400..68494b7f1b1f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala @@ -28,7 +28,8 @@ import io.grpc.ServerCall import io.grpc.ServerCallHandler import io.grpc.ServerInterceptor -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DESCRIPTION, MESSAGE} /** * A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON. Useful for @@ -42,9 +43,10 @@ class LoggingInterceptor extends ServerInterceptor with Logging { private def logProto[T](description: String, message: T): Unit = { message match { case m: Message => - logInfo(s"$description:\n${jsonPrinter.print(m)}") + logInfo(log"${MDC(DESCRIPTION, description)}:\n${MDC(MESSAGE, jsonPrinter.print(m))}") case other => - logInfo(s"$description: (Unknown message type) $other") + logInfo(log"${MDC(DESCRIPTION, description)}: " + + log"(Unknown message type) ${MDC(MESSAGE, other)}") } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index ef79cdcce8ff..c69559e4a442 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -31,7 +31,8 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{SparkException, SparkSQLException} import org.apache.spark.api.python.PythonFunction.PythonAccumulator -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LAST_ACCESS_TIME, SESSION_ID, SESSION_KEY, TIMEOUT, USER_ID} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.common.InvalidPlanInput @@ -213,12 +214,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[connect] def updateAccessTime(): Unit = { lastAccessTimeMs = System.currentTimeMillis() - logInfo(s"Session $key accessed, time $lastAccessTimeMs.") + logInfo(log"Session ${MDC(SESSION_KEY, key)} accessed, " + + log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") } private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = { customInactiveTimeoutMs = newInactiveTimeoutMs - logInfo(s"Session $key inactive timout set to $customInactiveTimeoutMs ms.") + logInfo(log"Session ${MDC(SESSION_KEY, key)} " + + log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.") } /** @@ -243,7 +246,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio if (closedTimeMs.isDefined) { throw new IllegalStateException(s"Session $key is already closed.") } - logInfo(s"Closing session with userId: $userId and sessionId: $sessionId") + logInfo(log"Closing session with userId: ${MDC(USER_ID, userId)} and " + + log"sessionId: ${MDC(SESSION_ID, sessionId)}") closedTimeMs = Some(System.currentTimeMillis()) if (Utils.isTesting && eventManager.status == SessionStatus.Pending) { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index e52cfe64a090..5f698804b43d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -29,7 +29,7 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL} import org.apache.spark.util.ThreadUtils @@ -95,7 +95,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { sessionHolder.addExecuteHolder(executeHolder) executions.put(executeHolder.key, executeHolder) lastExecutionTimeMs = None - logInfo(s"ExecuteHolder ${executeHolder.key} is created.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, executeHolder.key)} is created.") } schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started. @@ -122,7 +122,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { if (executions.isEmpty) { lastExecutionTimeMs = Some(System.currentTimeMillis()) } - logInfo(s"ExecuteHolder $key is removed.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, key)} is removed.") } // close the execution outside the lock executeHolder.foreach { e => @@ -146,7 +146,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { } sessionExecutionHolders.foreach { case (_, executeHolder) => val info = executeHolder.getExecuteInfo - logInfo(s"Execution $info removed in removeSessionExecutions.") + logInfo(log"Execution ${MDC(LogKey.EXECUTE_INFO, info)} removed in removeSessionExecutions.") removeExecuteHolder(executeHolder.key, abandoned = true) } } @@ -199,7 +199,8 @@ private[connect] class SparkConnectExecutionManager() extends Logging { case Some(_) => // Already running. case None => val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL) - logInfo(s"Starting thread for cleanup of abandoned executions every $interval ms") + logInfo(log"Starting thread for cleanup of abandoned executions every " + + log"${MDC(LogKey.INTERVAL, interval)} ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -238,7 +239,8 @@ private[connect] class SparkConnectExecutionManager() extends Logging { // .. and remove them. toRemove.foreach { executeHolder => val info = executeHolder.getExecuteInfo - logInfo(s"Found execution $info that was abandoned and expired and will be removed.") + logInfo(log"Found execution ${MDC(LogKey.EXECUTE_INFO, info)} that was abandoned " + + log"and expired and will be removed.") removeExecuteHolder(executeHolder.key, abandoned = true) } logInfo("Finished periodic run of SparkConnectExecutionManager maintenance.") diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala index 3b42b58ae2af..c16914294882 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -21,7 +21,8 @@ import java.net.InetSocketAddress import scala.jdk.CollectionConverters._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{PORT, RPC_ADDRESS} import org.apache.spark.sql.SparkSession /** @@ -37,9 +38,8 @@ object SparkConnectServer extends Logging { SparkConnectService.start(session.sparkContext) SparkConnectService.server.getListenSockets.asScala.foreach { sa => val isa = sa.asInstanceOf[InetSocketAddress] - logInfo( - s"Spark Connect server started at: " + - s"${isa.getAddress.getHostAddress}:${isa.getPort}") + logInfo(log"Spark Connect server started at: " + + log"${MDC(RPC_ADDRESS, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") } } catch { case e: Exception => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 476254bc6e39..4b35971286dd 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -35,7 +35,8 @@ import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, SparkConnectServiceGrpc} import org.apache.spark.connect.proto.SparkConnectServiceGrpc.AsyncService -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.HOST import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} import org.apache.spark.sql.connect.execution.ConnectProgressExecutionListener @@ -346,7 +347,7 @@ object SparkConnectService extends Logging { val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) val sb = bindAddress match { case Some(hostname) => - logInfo(s"start GRPC service at: $hostname") + logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) case _ => NettyServerBuilder.forPort(port) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala index f8febbccfa6f..3d1496b4d53b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala @@ -29,7 +29,8 @@ import scala.util.control.NonFatal import com.google.common.cache.CacheBuilder import org.apache.spark.{SparkEnv, SparkSQLException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{INTERVAL, SESSION_HOLD_INFO} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.config.Connect.{CONNECT_SESSION_MANAGER_CLOSED_SESSIONS_TOMBSTONES_SIZE, CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT, CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL} import org.apache.spark.util.ThreadUtils @@ -203,7 +204,8 @@ class SparkConnectSessionManager extends Logging { case Some(_) => // Already running. case None => val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL) - logInfo(s"Starting thread for cleanup of expired sessions every $interval ms") + logInfo(log"Starting thread for cleanup of expired sessions every " + + log"${MDC(INTERVAL, interval)} ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -258,7 +260,8 @@ class SparkConnectSessionManager extends Logging { // Last chance - check expiration time and remove under lock if expired. val info = sessionHolder.getSessionHolderInfo if (shouldExpire(info, System.currentTimeMillis())) { - logInfo(s"Found session $info that expired and will be closed.") + logInfo(log"Found session ${MDC(SESSION_HOLD_INFO, info)} that expired " + + log"and will be closed.") removeSessionHolder(info.key) } else { None diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index 9690d10eba1a..874df75ba415 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -27,7 +27,7 @@ import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.util.control.NonFatal import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{NEW_VALUE, OLD_VALUE, QUERY_ID} +import org.apache.spark.internal.LogKey.{NEW_VALUE, OLD_VALUE, QUERY_CACHE, QUERY_ID, SESSION_ID} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} @@ -70,7 +70,8 @@ private[connect] class SparkConnectStreamingQueryCache( log"Query Id: ${MDC(QUERY_ID, query.id)}.Existing value ${MDC(OLD_VALUE, existing)}, " + log"new value ${MDC(NEW_VALUE, value)}.") case None => - logInfo(s"Adding new query to the cache. Query Id ${query.id}, value $value.") + logInfo(log"Adding new query to the cache. Query Id ${MDC(QUERY_ID, query.id)}, " + + log"value ${MDC(QUERY_CACHE, value)}.") } schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started. @@ -111,7 +112,8 @@ private[connect] class SparkConnectStreamingQueryCache( for ((k, v) <- queryCache) { if (v.userId.equals(sessionHolder.userId) && v.sessionId.equals(sessionHolder.sessionId)) { if (v.query.isActive && Option(v.session.streams.get(k.queryId)).nonEmpty) { - logInfo(s"Stopping the query with id ${k.queryId} since the session has timed out") + logInfo(log"Stopping the query with id ${MDC(QUERY_ID, k.queryId)} " + + log"since the session has timed out") try { v.query.stop() } catch { @@ -150,7 +152,8 @@ private[connect] class SparkConnectStreamingQueryCache( scheduledExecutor match { case Some(_) => // Already running. case None => - logInfo(s"Starting thread for polling streaming sessions every $sessionPollingPeriod") + logInfo(log"Starting thread for polling streaming sessions " + + log"every ${MDC(DURATION, sessionPollingPeriod.toMillis)}") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -180,17 +183,20 @@ private[connect] class SparkConnectStreamingQueryCache( v.expiresAtMs match { case Some(ts) if nowMs >= ts => // Expired. Drop references. - logInfo(s"Removing references for $id in session ${v.sessionId} after expiry period") + logInfo(log"Removing references for ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)} after expiry period") queryCache.remove(k) case Some(_) => // Inactive query waiting for expiration. Do nothing. - logInfo(s"Waiting for the expiration for $id in session ${v.sessionId}") + logInfo(log"Waiting for the expiration for ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)}") case None => // Active query, check if it is stopped. Enable timeout if it is stopped. val isActive = v.query.isActive && Option(v.session.streams.get(id)).nonEmpty if (!isActive) { - logInfo(s"Marking query $id in session ${v.sessionId} inactive.") + logInfo(log"Marking query ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)} inactive.") val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs))) // To consider: Clean up any runner registered for this query with the session holder diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index a53061bc1563..b1bfe71930fb 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -290,8 +290,8 @@ private[connect] object ErrorUtils extends Logging { if (events.isDefined) { // Errors thrown inside execution are user query errors, return then as INFO. logInfo( - s"Spark Connect error " + - s"during: $opType. UserId: $userId. SessionId: $sessionId.", + log"Spark Connect error during: ${MDC(OP_TYPE, opType)}. " + + log"UserId: ${MDC(USER_ID, userId)}. SessionId: ${MDC(SESSION_ID, sessionId)}.", original) } else { // Other errors are server RPC errors, return them as ERROR. diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 97c8592d1da6..d3fe3264afe1 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.metric.CustomTaskMetric @@ -47,10 +48,13 @@ private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory val taskCtx = TaskContext.get() val queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY) val batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) - logInfo(s"Creating Kafka reader topicPartition=${p.offsetRange.topicPartition} " + - s"fromOffset=${p.offsetRange.fromOffset} untilOffset=${p.offsetRange.untilOffset}, " + - s"for query queryId=$queryId batchId=$batchId taskId=${TaskContext.get().taskAttemptId()} " + - s"partitionId=${TaskContext.get().partitionId()}") + logInfo(log"Creating Kafka reader " + + log"topicPartition=${MDC(TOPIC_PARTITION, p.offsetRange.topicPartition)} " + + log"fromOffset=${MDC(FROM_OFFSET, p.offsetRange.fromOffset)}} " + + log"untilOffset=${MDC(UNTIL_OFFSET, p.offsetRange.untilOffset)}, " + + log"for query queryId=${MDC(QUERY_ID, queryId)} batchId=${MDC(BATCH_ID, batchId)} " + + log"taskId=${MDC(TASK_ATTEMPT_ID, TaskContext.get().taskAttemptId())} " + + log"partitionId=${MDC(PARTITION_ID, TaskContext.get().partitionId())}") KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs, p.failOnDataLoss, p.includeHeaders) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 9bf0a2e9e513..e5e22243a582 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.TaskContext import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, TIP} +import org.apache.spark.internal.LogKey.{ERROR, OFFSETS, TIP} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.read.InputPartition @@ -76,7 +76,7 @@ class KafkaContinuousStream( case GlobalTimestampRangeLimit(ts, strategy) => offsetReader.fetchGlobalTimestampBasedOffsets(ts, isStartingOffsets = true, strategy) } - logInfo(s"Initial offsets: $offsets") + logInfo(log"Initial offsets: ${MDC(OFFSETS, offsets)}") offsets } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index be838ddc3c80..3313d42d1a30 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, TIP} +import org.apache.spark.internal.LogKey.{ERROR, OFFSETS, TIP} import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} @@ -256,7 +256,7 @@ private[kafka010] class KafkaMicroBatchStream( isStartingOffsets = true, strategy) } metadataLog.add(0, offsets) - logInfo(s"Initial offsets: $offsets") + logInfo(log"Initial offsets: ${MDC(OFFSETS, offsets)}") offsets }.partitionToOffsets } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 433da08176e7..5ed8576e8888 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT} +import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT, TOPIC_PARTITION_OFFSET} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -459,7 +459,7 @@ private[kafka010] class KafkaOffsetReaderAdmin( () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) } - logInfo(s"Partitions added: $newPartitionInitialOffsets") + logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => reportDataLoss( s"Added partition $p starts from $o instead of 0. Some data may have been missed", diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index 2ba4a9a563df..34d44fdf1059 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT} +import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT, TOPIC_PARTITION_OFFSET} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -508,7 +508,7 @@ private[kafka010] class KafkaOffsetReaderConsumer( () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) } - logInfo(s"Partitions added: $newPartitionInitialOffsets") + logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => reportDataLoss( s"Added partition $p starts from $o instead of 0. Some data may have been missed", diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index ed3407c822b9..97b866067ea8 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.kafka010 -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.TOPIC_PARTITIONS import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} @@ -69,8 +70,8 @@ private[kafka010] class KafkaRelation( kafkaOffsetReader.close() } - logInfo("GetBatch generating RDD of offset range: " + - offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) + logInfo(log"GetBatch generating RDD of offset range: " + + log"${MDC(TOPIC_PARTITIONS, offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))}") // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. val executorKafkaParams = diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala index 6ab4e91c53b3..5a75682f54f9 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.BATCH_ID import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.streaming.Sink @@ -32,7 +33,7 @@ private[kafka010] class KafkaSink( override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= latestBatchId) { - logInfo(s"Skipping already committed batch $batchId") + logInfo(log"Skipping already committed batch ${MDC(BATCH_ID, batchId)}") } else { KafkaWriter.write(data.queryExecution, executorKafkaParams, topic) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 426672d2e458..b0ab469690e2 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, TIP} +import org.apache.spark.internal.LogKey.{ERROR, FROM_OFFSET, OFFSETS, TIP, TOPIC_PARTITIONS, UNTIL_OFFSET} import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ @@ -128,7 +128,7 @@ private[kafka010] class KafkaSource( kafkaReader.fetchGlobalTimestampBasedOffsets(ts, isStartingOffsets = true, strategy) } metadataLog.add(0, offsets) - logInfo(s"Initial offsets: $offsets") + logInfo(log"Initial offsets: ${MDC(OFFSETS, offsets)}") offsets }.partitionToOffsets } @@ -293,7 +293,8 @@ private[kafka010] class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - logInfo(s"GetBatch called with start = $start, end = $end") + logInfo(log"GetBatch called with start = ${MDC(FROM_OFFSET, start)}, " + + log"end = ${MDC(UNTIL_OFFSET, end)}") val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end) if (allDataForTriggerAvailableNow != null) { @@ -331,8 +332,8 @@ private[kafka010] class KafkaSource( .map(converter.toInternalRowWithoutHeaders) } - logInfo("GetBatch generating RDD of offset range: " + - offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) + logInfo(log"GetBatch generating RDD of offset range: " + + log"${MDC(TOPIC_PARTITIONS, offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))}") sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema, isStreaming = true) } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 547586450094..4eb73e6d39f0 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -22,6 +22,8 @@ import java.{util => ju} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.LogKey.{FROM_OFFSET, PARTITION_ID, TOPIC} +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer import org.apache.spark.storage.StorageLevel @@ -79,8 +81,8 @@ private[kafka010] class KafkaSourceRDD( s"for topic ${range.topic} partition ${range.partition}. " + "You either provided an invalid fromOffset, or the Kafka topic has been damaged") if (range.fromOffset == range.untilOffset) { - logInfo(s"Beginning offset ${range.fromOffset} is the same as ending offset " + - s"skipping ${range.topic} ${range.partition}") + logInfo(log"Beginning offset ${MDC(FROM_OFFSET, range.fromOffset)} is the same as ending " + + log"offset skipping ${MDC(TOPIC, range.topic)} ${MDC(PARTITION_ID, range.partition)}") consumer.release() Iterator.empty } else { diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 3ea7d967744c..e0f7d1558afa 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -31,8 +31,9 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{Logging, MDC, MessageWithContext} -import org.apache.spark.internal.LogKey.{ERROR, GROUP_ID, OFFSET, RANGE, TIP, TOPIC_PARTITION, UNTIL_OFFSET} +import org.apache.spark.internal.LogKey._ import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil} +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.kafka010.KafkaExceptions import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET} @@ -391,10 +392,12 @@ private[kafka010] class KafkaDataConsumer( .getOrElse("") val walTime = System.nanoTime() - startTimestampNano - logInfo( - s"From Kafka $kafkaMeta read $totalRecordsRead records through $numPolls polls (polled " + - s" out $numRecordsPolled records), taking $totalTimeReadNanos nanos, during time span of " + - s"$walTime nanos." + logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " + + log"${MDC(TOTAL_RECORDS_READ, totalRecordsRead)} records through " + + log"${MDC(COUNT_POLL, numPolls)} polls " + + log"(polled out ${MDC(COUNT_RECORDS_POLL, numRecordsPolled)} records), " + + log"taking ${MDC(TOTAL_TIME_READ, totalTimeReadNanos / NANOS_PER_MILLIS.toDouble)} ms, " + + log"during time span of ${MDC(TIME, walTime / NANOS_PER_MILLIS.toDouble)} ms." ) releaseConsumer() diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala index 83519de0d3b1..afd426694d7b 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala @@ -23,7 +23,8 @@ import scala.util.control.NonFatal import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PRODUCER_ID private[kafka010] class CachedKafkaProducer( val cacheKey: Seq[(String, Object)], @@ -32,7 +33,7 @@ private[kafka010] class CachedKafkaProducer( private[producer] def close(): Unit = { try { - logInfo(s"Closing the KafkaProducer with id: $id.") + logInfo(log"Closing the KafkaProducer with id: ${MDC(PRODUCER_ID, id)}.") producer.close() } catch { case NonFatal(e) => logWarning("Error while closing kafka producer.", e) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 8ec8f2556b9b..068e3423cd26 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -52,7 +52,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.ERROR +import org.apache.spark.internal.LogKey import org.apache.spark.kafka010.KafkaTokenUtil import org.apache.spark.util.{SecurityUtils, ShutdownHookManager, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -70,7 +70,7 @@ class KafkaTestUtils( private val JAVA_AUTH_CONFIG = "java.security.auth.login.config" private val localHostNameForURI = Utils.localHostNameForURI() - logInfo(s"Local host name is $localHostNameForURI") + logInfo(log"Local host name is ${MDC(LogKey.URI, localHostNameForURI)}") // MiniKDC uses canonical host name on host part, hence we need to provide canonical host name // on the 'host' part of the principal. @@ -333,7 +333,7 @@ class KafkaTestUtils( Utils.deleteRecursively(new File(f)) } catch { case e: IOException if Utils.isWindows => - logWarning(log"${MDC(ERROR, e.getMessage)}") + logWarning(log"${MDC(LogKey.ERROR, e.getMessage)}") } } @@ -654,13 +654,13 @@ class KafkaTestUtils( Utils.deleteRecursively(snapshotDir) } catch { case e: IOException if Utils.isWindows => - logWarning(log"${MDC(ERROR, e.getMessage)}") + logWarning(log"${MDC(LogKey.ERROR, e.getMessage)}") } try { Utils.deleteRecursively(logDir) } catch { case e: IOException if Utils.isWindows => - logWarning(log"${MDC(ERROR, e.getMessage)}") + logWarning(log"${MDC(LogKey.ERROR, e.getMessage)}") } System.clearProperty(ZOOKEEPER_AUTH_PROVIDER) } diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index d3795a194dd4..714d3751b1c4 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -27,7 +27,8 @@ import scala.jdk.CollectionConverters._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{OFFSET, OFFSET_RANGE, TIME, TOPIC_PARTITION} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ @@ -177,7 +178,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset) acc + (tp -> off) }.foreach { case (tp, off) => - logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate") + logInfo(log"poll(0) returned messages, seeking ${MDC(TOPIC_PARTITION, tp)} to " + + log"${MDC(OFFSET, off)} to compensate") c.seek(tp, off) } } @@ -325,7 +327,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( override def restore(): Unit = { batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => - logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") + logInfo(log"Restoring KafkaRDD for time ${MDC(TIME, t)} " + + log"${MDC(OFFSET_RANGE, b.mkString("[", ", ", "]"))}") generatedRDDs += t -> new KafkaRDD[K, V]( context.sparkContext, executorKafkaParams, diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 6b47e9d72f4b..91df53c9e06b 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.spark.TaskContext import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{KEY, MAX_CAPACITY} +import org.apache.spark.internal.LogKey._ import org.apache.spark.kafka010.KafkaConfigUpdater private[kafka010] sealed trait KafkaDataConsumer[K, V] { @@ -132,7 +132,8 @@ private[kafka010] class InternalKafkaConsumer[K, V]( def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { logDebug(s"Get $groupId $topicPartition nextOffset $nextOffset requested $offset") if (offset != nextOffset) { - logInfo(s"Initial fetch for $groupId $topicPartition $offset") + logInfo(log"Initial fetch for ${MDC(GROUP_ID, groupId)} " + + log"${MDC(TOPIC_PARTITION, topicPartition)} ${MDC(OFFSET, offset)}") seek(offset) poll(timeout) } @@ -145,7 +146,8 @@ private[kafka010] class InternalKafkaConsumer[K, V]( var record = buffer.next() if (record.offset != offset) { - logInfo(s"Buffer miss for $groupId $topicPartition $offset") + logInfo(log"Buffer miss for ${MDC(GROUP_ID, groupId)} " + + log"${MDC(TOPIC_PARTITION, topicPartition)} ${MDC(OFFSET, offset)}") seek(offset) poll(timeout) require(buffer.hasNext(), @@ -169,7 +171,8 @@ private[kafka010] class InternalKafkaConsumer[K, V]( logDebug(s"compacted start $groupId $topicPartition starting $offset") // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics if (offset != nextOffset) { - logInfo(s"Initial fetch for compacted $groupId $topicPartition $offset") + logInfo(log"Initial fetch for compacted ${MDC(GROUP_ID, groupId)} " + + log"${MDC(TOPIC_PARTITION, topicPartition)} ${MDC(OFFSET, offset)}") seek(offset) poll(pollTimeoutMs) } @@ -240,7 +243,8 @@ private[kafka010] object KafkaDataConsumer extends Logging { maxCapacity: Int, loadFactor: Float): Unit = synchronized { if (null == cache) { - logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") + logInfo(log"Initializing cache ${MDC(INITIAL_CAPACITY, initialCapacity)} " + + log"${MDC(MAX_CAPACITY, maxCapacity)} ${MDC(LOAD_FACTOR, loadFactor)}") cache = new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer[_, _]]( initialCapacity, loadFactor, true) { override def removeEldestEntry( @@ -356,8 +360,8 @@ private[kafka010] object KafkaDataConsumer extends Logging { // at all. This may happen if the cache was invalidate while this consumer was being used. // Just close this consumer. internalConsumer.close() - logInfo(s"Released a supposedly cached consumer that was not found in the cache " + - s"$internalConsumer") + logInfo(log"Released a supposedly cached consumer that was not found in the cache " + + log"${MDC(CONSUMER, internalConsumer)}") } } } diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 6c57091bc3c4..5bc89864cf0a 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -23,7 +23,8 @@ import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import org.apache.kafka.common.TopicPartition import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{FROM_OFFSET, PARTITION_ID, TOPIC, UNTIL_OFFSET} import org.apache.spark.internal.config.Network._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD @@ -186,12 +187,13 @@ private[spark] class KafkaRDD[K, V]( val part = thePart.asInstanceOf[KafkaRDDPartition] require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { - logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + - s"skipping ${part.topic} ${part.partition}") + logInfo(log"Beginning offset ${MDC(FROM_OFFSET, part.fromOffset)} is the same as ending " + + log"offset skipping ${MDC(TOPIC, part.topic)} ${MDC(PARTITION_ID, part.partition)}") Iterator.empty } else { - logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") + logInfo(log"Computing topic ${MDC(TOPIC, part.topic)}, partition " + + log"${MDC(PARTITION_ID, part.partition)} offsets ${MDC(FROM_OFFSET, part.fromOffset)} " + + log"-> ${MDC(UNTIL_OFFSET, part.untilOffset)}") if (compacted) { new CompactedKafkaRDDIterator[K, V]( part, diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 4ddf1d9993ed..47b03c2b7537 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -28,7 +28,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLib import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import com.amazonaws.services.kinesis.model.Record -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.WORKER_URL import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp @@ -209,7 +210,7 @@ private[kinesis] class KinesisReceiver[T]( workerThread.setDaemon(true) workerThread.start() - logInfo(s"Started receiver with workerId $workerId") + logInfo(log"Started receiver with workerId ${MDC(WORKER_URL, workerId)}") } /** @@ -225,7 +226,7 @@ private[kinesis] class KinesisReceiver[T]( } workerThread.join() workerThread = null - logInfo(s"Stopped receiver for workerId $workerId") + logInfo(log"Stopped receiver for workerId ${MDC(WORKER_URL, workerId)}") } workerId = null if (kinesisCheckpointer != null) { diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index 94e109680fbc..8424dde7d9c4 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -27,7 +27,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason import com.amazonaws.services.kinesis.model.Record import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{RETRY_INTERVAL, SHARD_ID, WORKER_URL} +import org.apache.spark.internal.LogKey.{REASON, RETRY_INTERVAL, SHARD_ID, WORKER_URL} /** * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. @@ -54,7 +54,8 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w */ override def initialize(shardId: String): Unit = { this.shardId = shardId - logInfo(s"Initialized workerId $workerId with shardId $shardId") + logInfo(log"Initialized workerId ${MDC(WORKER_URL, workerId)} " + + log"with shardId ${MDC(SHARD_ID, shardId)}") } /** @@ -99,8 +100,8 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w } } else { /* RecordProcessor has been stopped. */ - logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" + - s" and shardId $shardId. No more records will be processed.") + logInfo(log"Stopped: KinesisReceiver has stopped for workerId ${MDC(WORKER_URL, workerId)}" + + log" and shardId ${MDC(SHARD_ID, shardId)}. No more records will be processed.") } } @@ -117,7 +118,8 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w override def shutdown( checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason): Unit = { - logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") + logInfo(log"Shutdown: Shutting down workerId ${MDC(WORKER_URL, workerId)} " + + log"with reason ${MDC(REASON, reason)}") // null if not initialized before shutdown: if (shardId == null) { logWarning(log"No shardId for workerId ${MDC(WORKER_URL, workerId)}?") diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index 558f6e44a638..15ffbbd9d730 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKey.PATH +import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.util.ThreadUtils @@ -108,7 +109,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex if (fs.exists(filenamePath)) { fs.delete(filenamePath, true) } - logInfo(s"Copying executor profiling file to $profileOutputFile") + logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}") inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") threadpool.scheduleWithFixedDelay( diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala index e144092cdecd..fb9abfe59aa7 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala @@ -23,7 +23,8 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKey.EXECUTOR_ID +import org.apache.spark.internal.{Logging, MDC} /** @@ -52,7 +53,8 @@ class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging { if (codeProfilingEnabled) { codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION) if (rand.nextInt(100) * 0.01 < codeProfilingFraction) { - logInfo(s"Executor id ${pluginCtx.executorID()} selected for JVM code profiling") + logInfo(log"Executor id ${MDC(EXECUTOR_ID, pluginCtx.executorID())} " + + log"selected for JVM code profiling") profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID()) profiler.start() } From d99f60ed73330dffbcd26df52655c7397dcc42a4 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 12 Apr 2024 13:44:52 +0800 Subject: [PATCH 2/7] fix style --- .../execution/ExecuteGrpcResponseSender.scala | 35 ++++++++----------- .../execution/ExecuteResponseObserver.scala | 12 +++---- .../connect/planner/SparkConnectPlanner.scala | 5 +-- .../planner/StreamingForeachBatchHelper.scala | 15 ++++---- .../StreamingQueryListenerHelper.scala | 5 +-- .../connect/service/LoggingInterceptor.scala | 5 +-- .../sql/connect/service/SessionHolder.scala | 15 ++++---- .../SparkConnectExecutionManager.scala | 13 ++++--- .../connect/service/SparkConnectServer.scala | 5 +-- .../service/SparkConnectSessionManager.scala | 10 +++--- .../SparkConnectStreamingQueryCache.scala | 32 ++++++++++------- 11 files changed, 82 insertions(+), 70 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index cd8851310f96..3f4e47d8193f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -182,9 +182,10 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( * that. 0 means start from beginning (since first response has index 1) */ def execute(lastConsumedStreamIndex: Long): Unit = { - logInfo(log"Starting for opId=${MDC(OP_ID, executeHolder.operationId)}, " + - log"reattachable=${MDC(REATTACHABLE, executeHolder.reattachable)}, " + - log"lastConsumedStreamIndex=${MDC(STREAM_ID, lastConsumedStreamIndex)}") + logInfo( + log"Starting for opId=${MDC(OP_ID, executeHolder.operationId)}, " + + log"reattachable=${MDC(REATTACHABLE, executeHolder.reattachable)}, " + + log"lastConsumedStreamIndex=${MDC(STREAM_ID, lastConsumedStreamIndex)}") val startTime = System.nanoTime() var nextIndex = lastConsumedStreamIndex + 1 @@ -295,14 +296,11 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } else if (streamFinished) { enqueueProgressMessage() // Stream is finished and all responses have been sent - logInfo( - log"Stream finished for opId=${MDC(OP_ID, executeHolder.operationId)}, " + - log"sent all responses up to last index ${MDC(STREAM_ID, nextIndex - 1)}. " + - log"totalTime=${MDC(LogKey.TOTAL_TIME, - (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + - log"waitingForResults=${MDC(WAIT_RESULT_TIME, - consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + - log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") + logInfo(log"Stream finished for opId=${MDC(OP_ID, executeHolder.operationId)}, " + + log"sent all responses up to last index ${MDC(STREAM_ID, nextIndex - 1)}. " + + log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") executionObserver.getError() match { case Some(t) => grpcObserver.onError(t) case None => grpcObserver.onCompleted() @@ -311,15 +309,12 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } else if (deadlineLimitReached) { // The stream is not complete, but should be finished now. // The client needs to reattach with ReattachExecute. - logInfo( - log"Deadline reached, shutting down stream for " + - log"opId=${MDC(OP_ID, executeHolder.operationId)} " + - log"after index ${MDC(STREAM_ID, nextIndex - 1)}. " + - log"totalTime=${MDC(TOTAL_TIME, - (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + - log"waitingForResults=${MDC(WAIT_RESULT_TIME, - consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + - log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") + logInfo(log"Deadline reached, shutting down stream for " + + log"opId=${MDC(OP_ID, executeHolder.operationId)} " + + log"after index ${MDC(STREAM_ID, nextIndex - 1)}. " + + log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") grpcObserver.onCompleted() finished = true } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 8c5d0a8d7ec2..7df7e8cecee6 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -237,14 +237,10 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " + log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " + log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " + - log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, - cachedSizeUntilHighestConsumed)} " + - log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, - cachedSizeUntilLastProduced)} " + - log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, - cachedSizeUntilHighestConsumed.max)} " + - log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, - cachedSizeUntilLastProduced.max)}") + log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed)} " + + log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " + + log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed.max)} " + + log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced.max)}") } /** Returns if the stream is finished. */ diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 5b767b2991fb..7a3a45dadd92 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3109,8 +3109,9 @@ class SparkConnectPlanner( } } catch { case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any. - logInfo(log"Removing foreachBatch worker, query failed to start " + - log"for session ${MDC(SESSION_ID, sessionId)}.") + logInfo( + log"Removing foreachBatch worker, query failed to start " + + log"for session ${MDC(SESSION_ID, sessionId)}.") foreachBatchRunnerCleaner.foreach(_.close()) throw ex } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index 30767ca621b8..ef5faac77e3e 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -135,8 +135,9 @@ object StreamingForeachBatchHelper extends Logging { try { dataIn.readInt() match { case 0 => - logInfo(log"Python foreach batch for dfId ${MDC(DATAFRAME_ID, args.dfId)} " + - log"completed (ret: 0)") + logInfo( + log"Python foreach batch for dfId ${MDC(DATAFRAME_ID, args.dfId)} " + + log"completed (ret: 0)") case SpecialLengths.PYTHON_EXCEPTION_THROWN => val msg = PythonWorkerUtils.readUTF(dataIn) throw new PythonException( @@ -172,8 +173,9 @@ object StreamingForeachBatchHelper extends Logging { private lazy val streamingListener = { // Initialized on first registered query val listener = new StreamingRunnerCleanerListener sessionHolder.session.streams.addListener(listener) - logInfo(log"Registered runner clean up listener for " + - log"session ${MDC(SESSION_ID, sessionHolder.sessionId)}") + logInfo( + log"Registered runner clean up listener for " + + log"session ${MDC(SESSION_ID, sessionHolder.sessionId)}") listener } @@ -199,8 +201,9 @@ object StreamingForeachBatchHelper extends Logging { private def cleanupStreamingRunner(key: CacheKey): Unit = { Option(cleanerCache.remove(key)).foreach { cleaner => - logInfo(log"Cleaning up runner for queryId ${MDC(QUERY_ID, key.queryId)} " + - log"runId ${MDC(RUN_ID, key.runId)}.") + logInfo( + log"Cleaning up runner for queryId ${MDC(QUERY_ID, key.queryId)} " + + log"runId ${MDC(RUN_ID, key.runId)}.") cleaner.close() } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala index d2dd81382e54..74e9e32f208d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala @@ -83,8 +83,9 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder try { dataIn.readInt() match { case 0 => - logInfo(log"Streaming query listener function ${MDC(FUNCTION_NAME, functionName)} " + - log"completed (ret: 0)") + logInfo( + log"Streaming query listener function ${MDC(FUNCTION_NAME, functionName)} " + + log"completed (ret: 0)") case SpecialLengths.PYTHON_EXCEPTION_THROWN => val msg = PythonWorkerUtils.readUTF(dataIn) throw new PythonException( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala index 68494b7f1b1f..c82cadbd5f7a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala @@ -45,8 +45,9 @@ class LoggingInterceptor extends ServerInterceptor with Logging { case m: Message => logInfo(log"${MDC(DESCRIPTION, description)}:\n${MDC(MESSAGE, jsonPrinter.print(m))}") case other => - logInfo(log"${MDC(DESCRIPTION, description)}: " + - log"(Unknown message type) ${MDC(MESSAGE, other)}") + logInfo( + log"${MDC(DESCRIPTION, description)}: " + + log"(Unknown message type) ${MDC(MESSAGE, other)}") } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 3b2635a67efa..0151e913cb9d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -216,14 +216,16 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[connect] def updateAccessTime(): Unit = { lastAccessTimeMs = System.currentTimeMillis() - logInfo(log"Session ${MDC(SESSION_KEY, key)} accessed, " + - log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") + logInfo( + log"Session ${MDC(SESSION_KEY, key)} accessed, " + + log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") } private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = { customInactiveTimeoutMs = newInactiveTimeoutMs - logInfo(log"Session ${MDC(SESSION_KEY, key)} " + - log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.") + logInfo( + log"Session ${MDC(SESSION_KEY, key)} " + + log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.") } /** @@ -248,8 +250,9 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio if (closedTimeMs.isDefined) { throw new IllegalStateException(s"Session $key is already closed.") } - logInfo(log"Closing session with userId: ${MDC(USER_ID, userId)} and " + - log"sessionId: ${MDC(SESSION_ID, sessionId)}") + logInfo( + log"Closing session with userId: ${MDC(USER_ID, userId)} and " + + log"sessionId: ${MDC(SESSION_ID, sessionId)}") closedTimeMs = Some(System.currentTimeMillis()) if (Utils.isTesting && eventManager.status == SessionStatus.Pending) { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index 5f698804b43d..4fe7f3eceb81 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -146,7 +146,8 @@ private[connect] class SparkConnectExecutionManager() extends Logging { } sessionExecutionHolders.foreach { case (_, executeHolder) => val info = executeHolder.getExecuteInfo - logInfo(log"Execution ${MDC(LogKey.EXECUTE_INFO, info)} removed in removeSessionExecutions.") + logInfo( + log"Execution ${MDC(LogKey.EXECUTE_INFO, info)} removed in removeSessionExecutions.") removeExecuteHolder(executeHolder.key, abandoned = true) } } @@ -199,8 +200,9 @@ private[connect] class SparkConnectExecutionManager() extends Logging { case Some(_) => // Already running. case None => val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL) - logInfo(log"Starting thread for cleanup of abandoned executions every " + - log"${MDC(LogKey.INTERVAL, interval)} ms") + logInfo( + log"Starting thread for cleanup of abandoned executions every " + + log"${MDC(LogKey.INTERVAL, interval)} ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -239,8 +241,9 @@ private[connect] class SparkConnectExecutionManager() extends Logging { // .. and remove them. toRemove.foreach { executeHolder => val info = executeHolder.getExecuteInfo - logInfo(log"Found execution ${MDC(LogKey.EXECUTE_INFO, info)} that was abandoned " + - log"and expired and will be removed.") + logInfo( + log"Found execution ${MDC(LogKey.EXECUTE_INFO, info)} that was abandoned " + + log"and expired and will be removed.") removeExecuteHolder(executeHolder.key, abandoned = true) } logInfo("Finished periodic run of SparkConnectExecutionManager maintenance.") diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala index c16914294882..5567f2d6220c 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -38,8 +38,9 @@ object SparkConnectServer extends Logging { SparkConnectService.start(session.sparkContext) SparkConnectService.server.getListenSockets.asScala.foreach { sa => val isa = sa.asInstanceOf[InetSocketAddress] - logInfo(log"Spark Connect server started at: " + - log"${MDC(RPC_ADDRESS, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") + logInfo( + log"Spark Connect server started at: " + + log"${MDC(RPC_ADDRESS, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") } } catch { case e: Exception => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala index 3d1496b4d53b..1a34964932ef 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala @@ -204,8 +204,9 @@ class SparkConnectSessionManager extends Logging { case Some(_) => // Already running. case None => val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL) - logInfo(log"Starting thread for cleanup of expired sessions every " + - log"${MDC(INTERVAL, interval)} ms") + logInfo( + log"Starting thread for cleanup of expired sessions every " + + log"${MDC(INTERVAL, interval)} ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -260,8 +261,9 @@ class SparkConnectSessionManager extends Logging { // Last chance - check expiration time and remove under lock if expired. val info = sessionHolder.getSessionHolderInfo if (shouldExpire(info, System.currentTimeMillis())) { - logInfo(log"Found session ${MDC(SESSION_HOLD_INFO, info)} that expired " + - log"and will be closed.") + logInfo( + log"Found session ${MDC(SESSION_HOLD_INFO, info)} that expired " + + log"and will be closed.") removeSessionHolder(info.key) } else { None diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index 874df75ba415..289f761dc006 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -27,7 +27,7 @@ import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.util.control.NonFatal import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{NEW_VALUE, OLD_VALUE, QUERY_CACHE, QUERY_ID, SESSION_ID} +import org.apache.spark.internal.LogKey.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE, QUERY_ID, SESSION_ID} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} @@ -70,8 +70,9 @@ private[connect] class SparkConnectStreamingQueryCache( log"Query Id: ${MDC(QUERY_ID, query.id)}.Existing value ${MDC(OLD_VALUE, existing)}, " + log"new value ${MDC(NEW_VALUE, value)}.") case None => - logInfo(log"Adding new query to the cache. Query Id ${MDC(QUERY_ID, query.id)}, " + - log"value ${MDC(QUERY_CACHE, value)}.") + logInfo( + log"Adding new query to the cache. Query Id ${MDC(QUERY_ID, query.id)}, " + + log"value ${MDC(QUERY_CACHE, value)}.") } schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started. @@ -112,8 +113,9 @@ private[connect] class SparkConnectStreamingQueryCache( for ((k, v) <- queryCache) { if (v.userId.equals(sessionHolder.userId) && v.sessionId.equals(sessionHolder.sessionId)) { if (v.query.isActive && Option(v.session.streams.get(k.queryId)).nonEmpty) { - logInfo(log"Stopping the query with id ${MDC(QUERY_ID, k.queryId)} " + - log"since the session has timed out") + logInfo( + log"Stopping the query with id ${MDC(QUERY_ID, k.queryId)} " + + log"since the session has timed out") try { v.query.stop() } catch { @@ -152,8 +154,9 @@ private[connect] class SparkConnectStreamingQueryCache( scheduledExecutor match { case Some(_) => // Already running. case None => - logInfo(log"Starting thread for polling streaming sessions " + - log"every ${MDC(DURATION, sessionPollingPeriod.toMillis)}") + logInfo( + log"Starting thread for polling streaming sessions " + + log"every ${MDC(DURATION, sessionPollingPeriod.toMillis)}") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -183,20 +186,23 @@ private[connect] class SparkConnectStreamingQueryCache( v.expiresAtMs match { case Some(ts) if nowMs >= ts => // Expired. Drop references. - logInfo(log"Removing references for ${MDC(QUERY_ID, id)} in " + - log"session ${MDC(SESSION_ID, v.sessionId)} after expiry period") + logInfo( + log"Removing references for ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)} after expiry period") queryCache.remove(k) case Some(_) => // Inactive query waiting for expiration. Do nothing. - logInfo(log"Waiting for the expiration for ${MDC(QUERY_ID, id)} in " + - log"session ${MDC(SESSION_ID, v.sessionId)}") + logInfo( + log"Waiting for the expiration for ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)}") case None => // Active query, check if it is stopped. Enable timeout if it is stopped. val isActive = v.query.isActive && Option(v.session.streams.get(id)).nonEmpty if (!isActive) { - logInfo(log"Marking query ${MDC(QUERY_ID, id)} in " + - log"session ${MDC(SESSION_ID, v.sessionId)} inactive.") + logInfo( + log"Marking query ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)} inactive.") val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs))) // To consider: Clean up any runner registered for this query with the session holder From 83ae49a5e1e5175d5434502b08d01f554012e0e9 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 12 Apr 2024 16:59:32 +0800 Subject: [PATCH 3/7] fix code style --- .../sql/connect/execution/ExecuteGrpcResponseSender.scala | 4 ++++ .../spark/sql/connect/execution/ExecuteResponseObserver.scala | 2 ++ 2 files changed, 6 insertions(+) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 3f4e47d8193f..5ba47e1a47a5 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -296,11 +296,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } else if (streamFinished) { enqueueProgressMessage() // Stream is finished and all responses have been sent + // scalastyle:off line.size.limit logInfo(log"Stream finished for opId=${MDC(OP_ID, executeHolder.operationId)}, " + log"sent all responses up to last index ${MDC(STREAM_ID, nextIndex - 1)}. " + log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") + // scalastyle:on line.size.limit executionObserver.getError() match { case Some(t) => grpcObserver.onError(t) case None => grpcObserver.onCompleted() @@ -309,12 +311,14 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } else if (deadlineLimitReached) { // The stream is not complete, but should be finished now. // The client needs to reattach with ReattachExecute. + // scalastyle:off line.size.limit logInfo(log"Deadline reached, shutting down stream for " + log"opId=${MDC(OP_ID, executeHolder.operationId)} " + log"after index ${MDC(STREAM_ID, nextIndex - 1)}. " + log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") + // scalastyle:on line.size.limit grpcObserver.onCompleted() finished = true } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 7df7e8cecee6..2225371e6b5c 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -233,6 +233,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: /** Remove all cached responses */ def removeAll(): Unit = responseLock.synchronized { removeResponsesUntilIndex(lastProducedIndex) + // scalastyle:off line.size.limit logInfo( log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " + log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " + @@ -241,6 +242,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " + log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed.max)} " + log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced.max)}") + // scalastyle:on line.size.limit } /** Returns if the stream is finished. */ From a1d4d8c51869a1f4e87b3babaa47b3b9485e3bff Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 15 Apr 2024 17:05:19 +0800 Subject: [PATCH 4/7] fix --- .../src/main/scala/org/apache/spark/internal/LogKey.scala | 4 ++-- .../sql/connect/service/SparkConnectExecutionManager.scala | 4 ++-- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index c0374f2a2091..64b10da84d80 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -75,8 +75,8 @@ object LogKey extends Enumeration { val ERROR = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value + val EXECUTE_HOLDER_KEY = Value val EXECUTE_INFO = Value - val EXECUTE_KEY = Value val EXECUTOR_ID = Value val EXECUTOR_STATE = Value val EXIT_CODE = Value @@ -127,7 +127,6 @@ object LogKey extends Enumeration { val OBJECT_ID = Value val OFFSET = Value val OFFSETS = Value - val OFFSET_RANGE = Value val OLD_BLOCK_MANAGER_ID = Value val OLD_VALUE = Value val OPTIMIZER_CLASS_NAME = Value @@ -202,6 +201,7 @@ object LogKey extends Enumeration { val TOPIC_PARTITION = Value val TOPIC_PARTITIONS = Value val TOPIC_PARTITION_OFFSET = Value + val TOPIC_PARTITION_OFFSET_RANGE = Value val TOTAL_EFFECTIVE_TIME = Value val TOTAL_RECORDS_READ = Value val TOTAL_SIZE = Value diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index 4fe7f3eceb81..dba09c1631a3 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -95,7 +95,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { sessionHolder.addExecuteHolder(executeHolder) executions.put(executeHolder.key, executeHolder) lastExecutionTimeMs = None - logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, executeHolder.key)} is created.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, executeHolder.key)} is created.") } schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started. @@ -122,7 +122,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { if (executions.isEmpty) { lastExecutionTimeMs = Some(System.currentTimeMillis()) } - logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, key)} is removed.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, key)} is removed.") } // close the execution outside the lock executeHolder.foreach { e => diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 714d3751b1c4..86ee20849626 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{OFFSET, OFFSET_RANGE, TIME, TOPIC_PARTITION} +import org.apache.spark.internal.LogKey.{OFFSET, TIME, TOPIC_PARTITION, TOPIC_PARTITION_OFFSET_RANGE} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ @@ -328,7 +328,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( override def restore(): Unit = { batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => logInfo(log"Restoring KafkaRDD for time ${MDC(TIME, t)} " + - log"${MDC(OFFSET_RANGE, b.mkString("[", ", ", "]"))}") + log"${MDC(TOPIC_PARTITION_OFFSET_RANGE, b.mkString("[", ", ", "]"))}") generatedRDDs += t -> new KafkaRDD[K, V]( context.sparkContext, executorKafkaParams, From 41542f256286c150110f49df86f1df118578297e Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 16 Apr 2024 09:43:16 +0800 Subject: [PATCH 5/7] fix --- .../main/scala/org/apache/spark/internal/LogKey.scala | 10 +++++----- .../connect/service/SparkConnectExecutionManager.scala | 4 ++-- .../spark/sql/connect/service/SparkConnectServer.scala | 4 ++-- .../service/SparkConnectStreamingQueryCache.scala | 4 ++-- .../sql/kafka010/consumer/KafkaDataConsumer.scala | 4 ++-- .../main/scala/org/apache/spark/deploy/Client.scala | 6 +++--- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 64b10da84d80..a8bcbbfb2358 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -55,8 +55,6 @@ object LogKey extends Enumeration { val CONTAINER = Value val CONTAINER_ID = Value val COUNT = Value - val COUNT_POLL = Value - val COUNT_RECORDS_POLL = Value val CSV_HEADER_COLUMN_NAME = Value val CSV_HEADER_COLUMN_NAMES = Value val CSV_HEADER_LENGTH = Value @@ -75,8 +73,8 @@ object LogKey extends Enumeration { val ERROR = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value - val EXECUTE_HOLDER_KEY = Value val EXECUTE_INFO = Value + val EXECUTE_KEY = Value val EXECUTOR_ID = Value val EXECUTOR_STATE = Value val EXIT_CODE = Value @@ -94,6 +92,7 @@ object LogKey extends Enumeration { val HIVE_OPERATION_STATE = Value val HIVE_OPERATION_TYPE = Value val HOST = Value + val HOST_PORT = Value val INDEX = Value val INFERENCE_MODE = Value val INITIAL_CAPACITY = Value @@ -101,6 +100,8 @@ object LogKey extends Enumeration { val JOB_ID = Value val JOIN_CONDITION = Value val JOIN_CONDITION_SUB_EXPRESSION = Value + val KAFKA_PULLS_COUNT = Value + val KAFKA_RECORDS_PULLED_COUNT = Value val KEY = Value val LAST_ACCESS_TIME = Value val LEARNING_RATE = Value @@ -142,7 +143,7 @@ object LogKey extends Enumeration { val POLICY = Value val PORT = Value val PRODUCER_ID = Value - val QUERY_CACHE = Value + val QUERY_CACHE_VALUE = Value val QUERY_HINT = Value val QUERY_ID = Value val QUERY_PLAN = Value @@ -159,7 +160,6 @@ object LogKey extends Enumeration { val REMOTE_ADDRESS = Value val RETRY_COUNT = Value val RETRY_INTERVAL = Value - val RPC_ADDRESS = Value val RULE_BATCH_NAME = Value val RULE_NAME = Value val RULE_NUMBER_OF_RUNS = Value diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index dba09c1631a3..4fe7f3eceb81 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -95,7 +95,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { sessionHolder.addExecuteHolder(executeHolder) executions.put(executeHolder.key, executeHolder) lastExecutionTimeMs = None - logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, executeHolder.key)} is created.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, executeHolder.key)} is created.") } schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started. @@ -122,7 +122,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { if (executions.isEmpty) { lastExecutionTimeMs = Some(System.currentTimeMillis()) } - logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, key)} is removed.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, key)} is removed.") } // close the execution outside the lock executeHolder.foreach { e => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala index 5567f2d6220c..c55600886a39 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -22,7 +22,7 @@ import java.net.InetSocketAddress import scala.jdk.CollectionConverters._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{PORT, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{HOST, PORT} import org.apache.spark.sql.SparkSession /** @@ -40,7 +40,7 @@ object SparkConnectServer extends Logging { val isa = sa.asInstanceOf[InetSocketAddress] logInfo( log"Spark Connect server started at: " + - log"${MDC(RPC_ADDRESS, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") + log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") } } catch { case e: Exception => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index 289f761dc006..4c9b3baa689b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -27,7 +27,7 @@ import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.util.control.NonFatal import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE, QUERY_ID, SESSION_ID} +import org.apache.spark.internal.LogKey.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE_VALUE, QUERY_ID, SESSION_ID} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} @@ -72,7 +72,7 @@ private[connect] class SparkConnectStreamingQueryCache( case None => logInfo( log"Adding new query to the cache. Query Id ${MDC(QUERY_ID, query.id)}, " + - log"value ${MDC(QUERY_CACHE, value)}.") + log"value ${MDC(QUERY_CACHE_VALUE, value)}.") } schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started. diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index e0f7d1558afa..72ceebb700d6 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -394,8 +394,8 @@ private[kafka010] class KafkaDataConsumer( logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " + log"${MDC(TOTAL_RECORDS_READ, totalRecordsRead)} records through " + - log"${MDC(COUNT_POLL, numPolls)} polls " + - log"(polled out ${MDC(COUNT_RECORDS_POLL, numRecordsPolled)} records), " + + log"${MDC(KAFKA_PULLS_COUNT, numPolls)} polls " + + log"(polled out ${MDC(KAFKA_RECORDS_PULLED_COUNT, numRecordsPolled)} records), " + log"taking ${MDC(TOTAL_TIME_READ, totalTimeReadNanos / NANOS_PER_MILLIS.toDouble)} ms, " + log"during time span of ${MDC(TIME, walTime / NANOS_PER_MILLIS.toDouble)} ms." ) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 6cf240f12a1c..d38f94fd1ac2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, HOST_PORT} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} @@ -226,7 +226,7 @@ private class ClientEndpoint( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (!lostMasters.contains(remoteAddress)) { - logError(log"Error connecting to master ${MDC(RPC_ADDRESS, remoteAddress)}.") + logError(log"Error connecting to master ${MDC(HOST_PORT, remoteAddress)}.") lostMasters += remoteAddress // Note that this heuristic does not account for the fact that a Master can recover within // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This @@ -240,7 +240,7 @@ private class ClientEndpoint( override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { if (!lostMasters.contains(remoteAddress)) { - logError(log"Error connecting to master (${MDC(RPC_ADDRESS, remoteAddress)}).", cause) + logError(log"Error connecting to master (${MDC(HOST_PORT, remoteAddress)}).", cause) lostMasters += remoteAddress if (lostMasters.size >= masterEndpoints.size) { logError("No master is available, exiting.") From 53f56e094db8397b27d5924771fa6faa3b9d74b8 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 16 Apr 2024 11:30:55 +0800 Subject: [PATCH 6/7] fix --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 4 ++-- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bf7f91ea7ce3..ed9f47020103 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -44,7 +44,7 @@ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, HOST_NAME} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} @@ -858,7 +858,7 @@ private[spark] class ApplicationMaster( finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } else { logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, exitCode)}! " + - log"Shutting down. ${MDC(RPC_ADDRESS, remoteAddress)}") + log"Shutting down. ${MDC(HOST_NAME, remoteAddress)}") finish(FinalApplicationStatus.FAILED, exitCode) } } else { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 8404785d8e0b..ffd33e7e3d38 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{EXECUTOR_ID, REASON, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{EXECUTOR_ID, HOST_NAME, REASON} import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -307,7 +307,7 @@ private[spark] abstract class YarnSchedulerBackend( case NonFatal(e) => logWarning(log"Attempted to get executor loss reason for executor id " + log"${MDC(EXECUTOR_ID, executorId)} at RPC address " + - log"${MDC(RPC_ADDRESS, executorRpcAddress)}, but got no response. " + + log"${MDC(HOST_NAME, executorRpcAddress)}, but got no response. " + log"Marking as agent lost.", e) RemoveExecutor(executorId, ExecutorProcessLost()) }(ThreadUtils.sameThread) @@ -395,7 +395,7 @@ private[spark] abstract class YarnSchedulerBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (amEndpoint.exists(_.address == remoteAddress)) { - logWarning(log"ApplicationMaster has disassociated: ${MDC(RPC_ADDRESS, remoteAddress)}") + logWarning(log"ApplicationMaster has disassociated: ${MDC(HOST_NAME, remoteAddress)}") amEndpoint = None } } From 1ed7090d3a88c9179122615c126b7c4c45bcc861 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 16 Apr 2024 13:37:28 +0800 Subject: [PATCH 7/7] fix --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 4 ++-- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ed9f47020103..eb944244fc9d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -44,7 +44,7 @@ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, HOST_NAME} +import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, HOST_PORT} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} @@ -858,7 +858,7 @@ private[spark] class ApplicationMaster( finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } else { logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, exitCode)}! " + - log"Shutting down. ${MDC(HOST_NAME, remoteAddress)}") + log"Shutting down. ${MDC(HOST_PORT, remoteAddress)}") finish(FinalApplicationStatus.FAILED, exitCode) } } else { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index ffd33e7e3d38..d7f285aeb892 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{EXECUTOR_ID, HOST_NAME, REASON} +import org.apache.spark.internal.LogKey.{EXECUTOR_ID, HOST_PORT, REASON} import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -307,7 +307,7 @@ private[spark] abstract class YarnSchedulerBackend( case NonFatal(e) => logWarning(log"Attempted to get executor loss reason for executor id " + log"${MDC(EXECUTOR_ID, executorId)} at RPC address " + - log"${MDC(HOST_NAME, executorRpcAddress)}, but got no response. " + + log"${MDC(HOST_PORT, executorRpcAddress)}, but got no response. " + log"Marking as agent lost.", e) RemoveExecutor(executorId, ExecutorProcessLost()) }(ThreadUtils.sameThread) @@ -395,7 +395,7 @@ private[spark] abstract class YarnSchedulerBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (amEndpoint.exists(_.address == remoteAddress)) { - logWarning(log"ApplicationMaster has disassociated: ${MDC(HOST_NAME, remoteAddress)}") + logWarning(log"ApplicationMaster has disassociated: ${MDC(HOST_PORT, remoteAddress)}") amEndpoint = None } }