Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2471,6 +2471,12 @@
],
"sqlState" : "22003"
},
"INVALID_ARTIFACT_PATH" : {
"message" : [
"Artifact with name <name> is invalid. The name must be a relative path and cannot reference parent/sibling/nephew directories."
],
"sqlState" : "22023"
},
"INVALID_ATTRIBUTE_NAME_SYNTAX" : {
"message" : [
"Syntax error in the attribute name: <name>. Check that backticks appear in pairs, a quoted string is a complete name part and use a backtick only inside quoted name parts."
Expand Down Expand Up @@ -3459,6 +3465,16 @@
"expects an integer literal, but got <invalidValue>."
]
},
"INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID" : {
"message" : [
"INTERRUPT_TYPE_OPERATION_ID requested, but no operation_id provided."
]
},
"INTERRUPT_TYPE_TAG_REQUIRES_TAG" : {
"message" : [
"INTERRUPT_TYPE_TAG requested, but no operation_tag provided."
]
},
"LENGTH" : {
"message" : [
"Expects `length` greater than or equal to 0, but got <length>."
Expand Down Expand Up @@ -3489,6 +3505,11 @@
"Expects a positive or a negative value for `start`, but got 0."
]
},
"STREAMING_LISTENER_COMMAND_MISSING" : {
"message" : [
"Missing command in StreamingQueryListenerBusCommand."
]
},
"STRING" : {
"message" : [
"expects a string literal, but got <invalidValue>."
Expand Down Expand Up @@ -6345,6 +6366,11 @@
"INSERT INTO <tableName> with IF NOT EXISTS in the PARTITION spec."
]
},
"INTERRUPT_TYPE" : {
"message" : [
"Unsupported interrupt type: <interruptType>."
]
},
"LAMBDA_FUNCTION_WITH_PYTHON_UDF" : {
"message" : [
"Lambda function with Python UDF <funcName> in a higher order function."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.control.NonFatal

import io.grpc.stub.StreamObserver

import org.apache.spark.SparkSQLException
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
Expand Down Expand Up @@ -117,7 +118,10 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
return
}
case StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET =>
throw new IllegalArgumentException("Missing command in StreamingQueryListenerBusCommand")
throw new SparkSQLException(
errorClass = "INVALID_PARAMETER_VALUE.STREAMING_LISTENER_COMMAND_MISSING",
messageParameters =
Map("parameter" -> "command", "functionName" -> "StreamingQueryListenerBusCommand"))
}
executeHolder.eventsManager.postFinished()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ class SparkConnectAddArtifactsHandler(val responseObserver: StreamObserver[AddAr
ArtifactUtils.concatenatePaths(stagingDir, path)
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Artifact with name: $name is invalid. The `name` " +
s"must be a relative path and cannot reference parent/sibling/nephew directories.")
throw new SparkRuntimeException(
errorClass = "INVALID_ARTIFACT_PATH",
messageParameters = Map("name" -> name))
case NonFatal(e) => throw e
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.jdk.CollectionConverters._

import io.grpc.stub.StreamObserver

import org.apache.spark.SparkSQLException
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging

Expand All @@ -41,18 +42,23 @@ class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.Interr
sessionHolder.interruptAll()
case proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_TAG =>
if (!v.hasOperationTag) {
throw new IllegalArgumentException(
s"INTERRUPT_TYPE_TAG requested, but no operation_tag provided.")
throw new SparkSQLException(
errorClass = "INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG",
messageParameters =
Map("parameter" -> "operation_tag", "functionName" -> "interrupt"))
}
sessionHolder.interruptTag(v.getOperationTag)
case proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_OPERATION_ID =>
if (!v.hasOperationId) {
throw new IllegalArgumentException(
s"INTERRUPT_TYPE_OPERATION_ID requested, but no operation_id provided.")
throw new SparkSQLException(
errorClass = "INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID",
messageParameters = Map("parameter" -> "operation_id", "functionName" -> "interrupt"))
}
sessionHolder.interruptOperation(v.getOperationId)
case other =>
throw new UnsupportedOperationException(s"Unknown InterruptType $other!")
throw new SparkSQLException(
errorClass = "UNSUPPORTED_FEATURE.INTERRUPT_TYPE",
messageParameters = Map("interruptType" -> other.toString))
}

val response = proto.InterruptResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,7 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper {
handler.onNext(req)
}
assert(e.getStatus.getCode == Code.INTERNAL)
val statusProto = StatusProto.fromThrowable(e)
assert(statusProto.getDetailsCount == 1)
val details = statusProto.getDetails(0)
val info = details.unpack(classOf[ErrorInfo])
assert(info.getReason.contains("java.lang.IllegalArgumentException"))
assert(e.getMessage.contains("INVALID_ARTIFACT_PATH"))
}
handler.onCompleted()
} finally {
Expand All @@ -422,11 +418,7 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper {
handler.onNext(req)
}
assert(e.getStatus.getCode == Code.INTERNAL)
val statusProto = StatusProto.fromThrowable(e)
assert(statusProto.getDetailsCount == 1)
val details = statusProto.getDetails(0)
val info = details.unpack(classOf[ErrorInfo])
assert(info.getReason.contains("java.lang.IllegalArgumentException"))
assert(e.getMessage.contains("INVALID_ARTIFACT_PATH"))
}
handler.onCompleted()
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,52 @@ class SparkConnectServiceE2ESuite extends SparkConnectServerTest {
assert(error.getMessage.contains(fixedOperationId))
}
}

test("Interrupt with TAG type without operation_tag throws proper error class") {
withRawBlockingStub { stub =>
// Create an interrupt request with INTERRUPT_TYPE_TAG but no operation_tag
val request = org.apache.spark.connect.proto.InterruptRequest
.newBuilder()
.setSessionId(UUID.randomUUID().toString)
.setUserContext(org.apache.spark.connect.proto.UserContext
.newBuilder()
.setUserId(defaultUserId))
.setInterruptType(
org.apache.spark.connect.proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_TAG)
.build()

val error = intercept[io.grpc.StatusRuntimeException] {
stub.interrupt(request)
}

// Verify the error is INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG
assert(error.getMessage.contains("INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_TAG_REQUIRES_TAG"))
assert(error.getMessage.contains("operation_tag"))
}
}

test("Interrupt with OPERATION_ID type without operation_id throws proper error class") {
withRawBlockingStub { stub =>
// Create an interrupt request with INTERRUPT_TYPE_OPERATION_ID but no operation_id
val request = org.apache.spark.connect.proto.InterruptRequest
.newBuilder()
.setSessionId(UUID.randomUUID().toString)
.setUserContext(org.apache.spark.connect.proto.UserContext
.newBuilder()
.setUserId(defaultUserId))
.setInterruptType(
org.apache.spark.connect.proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_OPERATION_ID)
.build()

val error = intercept[io.grpc.StatusRuntimeException] {
stub.interrupt(request)
}

// Verify the error is INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID
assert(
error.getMessage.contains(
"INVALID_PARAMETER_VALUE.INTERRUPT_TYPE_OPERATION_ID_REQUIRES_ID"))
assert(error.getMessage.contains("operation_id"))
}
}
}