Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c6177a3
review changes nit
SandishKumarHN Oct 18, 2022
8fad018
adding spark-protobuf error class into spark error-class framework
SandishKumarHN Oct 21, 2022
dac2fa8
adding spark-protobuf error class into spark error-class framework
SandishKumarHN Oct 21, 2022
d1c9b1e
adding spark-protobuf error class into spark error-class framework, i…
SandishKumarHN Oct 21, 2022
03b918f
Merge remote-tracking branch 'remote-spark/master' into SPARK-40777
SandishKumarHN Oct 21, 2022
60c2122
adding spark-protobuf error class into spark error-class framework, i…
SandishKumarHN Oct 21, 2022
7c866de
Merge branch 'SPARK-40777' into SPARK-40777-ProtoErrorCls
SandishKumarHN Oct 22, 2022
e6f3cab
spark-protobuf error clss frameworks, import support, timestamp and d…
SandishKumarHN Oct 22, 2022
70a5983
fixing typos
SandishKumarHN Oct 24, 2022
26e471b
review changes, name protobuf error class and comment in pom.xml
SandishKumarHN Oct 24, 2022
68f87c1
review chages rm includeMavenTypes and import own protos
SandishKumarHN Oct 24, 2022
9cdf4d5
review changes: nested import support, nit, build fix
SandishKumarHN Oct 25, 2022
4e1080e
review changes style changes, nit
SandishKumarHN Oct 25, 2022
dbaf24d
Merge remote-tracking branch 'remote-spark/master' into SPARK-40777-P…
SandishKumarHN Oct 26, 2022
6045ffe
review changes: search messages on all imports
SandishKumarHN Oct 26, 2022
3fc59b5
review changes: use prettyName
SandishKumarHN Oct 27, 2022
e5482a5
Merge remote-tracking branch 'remote-spark/master' into SPARK-40777-P…
SandishKumarHN Oct 27, 2022
dd63be8
review changes: nit, option, runtime error
SandishKumarHN Oct 28, 2022
e5140b0
error class name changes, more details to error message
SandishKumarHN Oct 28, 2022
3037415
Merge remote-tracking branch 'remote-spark/master' into SPARK-40777-P…
SandishKumarHN Oct 29, 2022
be02c9e
NO_UDF_INTERFACE_ERROR to NO_UDF_INTERFACE
SandishKumarHN Oct 29, 2022
d8cce82
review changes scala style, find, parseFileDescriptorSet
SandishKumarHN Nov 1, 2022
ad1f7e1
Merge remote-tracking branch 'remote-spark/master' into SPARK-40777-P…
SandishKumarHN Nov 1, 2022
48bcb5c
review changes buildDescriptor suggested changes
SandishKumarHN Nov 1, 2022
87918a1
review changes, error classes
SandishKumarHN Nov 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,26 @@ private[sql] object ProtobufUtils extends Logging {
.asInstanceOf[Descriptor]
}

// TODO: Revisit to ensure that messageName is searched through all imports
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is missing? Looks fairly complete to me.
Better to state the problem here.

def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
desc.getName == messageName || desc.getFullName == messageName
val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: use find() rather than map().filter().

(you can use findLast() if there is a reason to use the last match).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi makes sense to use find and return, fixed.

fileDescriptor.getMessageTypes.asScala.find { desc =>
desc.getName == messageName || desc.getFullName == messageName
}
}).filter(f => !f.isEmpty)

if (descriptorList.isEmpty) {
throw QueryCompilationErrors.noProtobufMessageTypeReturnError(messageName)
}

descriptor match {
descriptorList.last match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment on why we are picking the last one? Will be useful for future readers as well.

case Some(d) => d
case None =>
throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
}
}

private def parseFileDescriptor(descFilePath: String): Descriptors.FileDescriptor = {
private def parseFileDescriptor(descFilePath: String): List[Descriptors.FileDescriptor] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to parseFileDescriptorSet (otherwise it sounds like it is parsing just one file descriptor).

var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null
try {
val dscFile = new BufferedInputStream(new FileInputStream(descFilePath))
Expand All @@ -200,12 +207,11 @@ private[sql] object ProtobufUtils extends Logging {
}
try {
val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
val fileDescriptor: Descriptors.FileDescriptor =
buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
if (fileDescriptor.getMessageTypes().isEmpty()) {
throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
}
fileDescriptor
val fileDescriptorList: List[Descriptors.FileDescriptor] =
fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto =>
buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex)
).toList
fileDescriptorList
} catch {
case e: Descriptors.DescriptorValidationException =>
throw QueryCompilationErrors.failedParsingDescriptorError(e)
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri

test("roundtrip in from_protobuf and to_protobuf - Repeated Message Once") {
val repeatedMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "RepeatedMessage")
val basicMessageDesc = ProtobufUtils.buildDescriptor(
testFile("basicmessage.desc").replace("file:/", "/"),
"BasicMessage")
val basicMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "BasicMessage")

val basicMessage = DynamicMessage
.newBuilder(basicMessageDesc)
Expand Down Expand Up @@ -173,9 +171,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri

test("roundtrip in from_protobuf and to_protobuf - Repeated Message Twice") {
val repeatedMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "RepeatedMessage")
val basicMessageDesc = ProtobufUtils.buildDescriptor(
testFile("basicmessage.desc").replace("file:/", "/"),
"BasicMessage")
val basicMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "BasicMessage")

val basicMessage1 = DynamicMessage
.newBuilder(basicMessageDesc)
Expand Down Expand Up @@ -573,7 +569,6 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
}

test("from_protobuf filter to_protobuf") {
val testFileDesc = testFile("basicmessage.desc").replace("file:/", "/")
val basicMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "BasicMessage")

val basicMessage = DynamicMessage
Expand Down