Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[protobuf] case class ProtobufDataToCatalyst(
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)

override lazy val dataType: DataType = {
val dt = SchemaConverters.toSqlType(messageDescriptor).dataType
val dt = SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType
parseMode match {
// With PermissiveMode, the output Catalyst row might contain columns of null values for
// corrupt records, even if some of the columns are not nullable in the user-provided schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ private[sql] class ProtobufDeserializer(
(protoType.getJavaType, catalystType) match {

case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)
// It is possible that this will result in data being dropped, This is intentional,
// to catch recursive fields and drop them as necessary.
case (MESSAGE, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? For handling limited recursion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, correct.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment about we might be dropping data here? It will not be easy to see for a future reader.
We could have an option to error our if the actual data has more recursion than the configure.


// TODO: we can avoid boxing if future version of Protobuf provide primitive accessors.
case (BOOLEAN, BooleanType) =>
Expand All @@ -171,7 +174,7 @@ private[sql] class ProtobufDeserializer(
(updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])

case (
BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
MESSAGE | BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)

Expand Down Expand Up @@ -235,9 +238,6 @@ private[sql] class ProtobufDeserializer(
writeRecord(new RowUpdater(row), value.asInstanceOf[DynamicMessage])
updater.set(ordinal, row)

case (MESSAGE, ArrayType(st: StructType, containsNull)) =>
newArrayWriter(protoType, protoPath, catalystPath, st, containsNull)

case (ENUM, StringType) =>
(updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ private[sql] class ProtobufOptions(

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)

// Setting the `recursive.fields.max.depth` to 0 drops all recursive fields,
// 1 allows it to be recurse once, and 2 allows it to be recursed twice and so on.
// A value of `recursive.fields.max.depth` greater than 10 is not permitted. If it is not
// specified, the default value is -1; recursive fields are not permitted. If a protobuf
// record has more depth than the allowed value for recursive fields, it will be truncated
// and some fields may be discarded.
val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option name may need a bit more discussion. Usually data source options do not have long names, and don't contains dot. See JSONOptions. How about maxRecursiveFieldDepth?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan this is in line with options for Kafka source. e.g. 'kafka.' prefix allows setting Kafka clientconfigs.

In addition, we will be passing more options. E.g. for schema registry auth configs. They will have a prefix like 'confluent.schemaregistry.[actual registry client conf]'

}

private[sql] object ProtobufOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,27 @@ object SchemaConverters {
*
* @since 3.4.0
*/
def toSqlType(descriptor: Descriptor): SchemaType = {
toSqlTypeHelper(descriptor)
def toSqlType(
descriptor: Descriptor,
protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = {
toSqlTypeHelper(descriptor, protobufOptions)
}

def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
def toSqlTypeHelper(
descriptor: Descriptor,
protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but why would we lock ScalaReflectionLock here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just noticed. Not sure if if we need.
@SandishKumarHN could we remove this in a follow up?

SchemaType(
StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, Set.empty)).toArray),
StructType(descriptor.getFields.asScala.flatMap(
structFieldFor(_,
Map(descriptor.getFullName -> 1),
protobufOptions: ProtobufOptions)).toArray),
nullable = true)
}

def structFieldFor(
fd: FieldDescriptor,
existingRecordNames: Set[String]): Option[StructField] = {
existingRecordNames: Map[String, Int],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add comments to explain what map key and value means here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan added a comment.

protobufOptions: ProtobufOptions): Option[StructField] = {
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
val dataType = fd.getJavaType match {
case INT => Some(IntegerType)
Expand Down Expand Up @@ -81,9 +89,17 @@ object SchemaConverters {
fd.getMessageType.getFields.forEach { field =>
field.getName match {
case "key" =>
keyType = structFieldFor(field, existingRecordNames).get.dataType
keyType =
structFieldFor(
field,
existingRecordNames,
protobufOptions).get.dataType
case "value" =>
valueType = structFieldFor(field, existingRecordNames).get.dataType
valueType =
structFieldFor(
field,
existingRecordNames,
protobufOptions).get.dataType
}
}
return Option(
Expand All @@ -92,17 +108,35 @@ object SchemaConverters {
MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType,
nullable = false))
case MESSAGE =>
if (existingRecordNames.contains(fd.getFullName)) {
// If the `recursive.fields.max.depth` value is not specified, it will default to -1;
// recursive fields are not permitted. Setting it to 0 drops all recursive fields,
// 1 allows it to be recursed once, and 2 allows it to be recursed twice and so on.
// A value greater than 10 is not allowed, and if a protobuf record has more depth for
// recursive fields than the allowed value, it will be truncated and some fields may be
// discarded.
// SQL Schema for the protobuf message `message Person { string name = 1; Person bff = 2}`
// will vary based on the value of "recursive.fields.max.depth".
// 0: struct<name: string, bff: null>
// 1: struct<name string, bff: <name: string, bff: null>>
// 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
val recordName = fd.getMessageType.getFullName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val recordName = fd.getMessageType.getFullName
val recordName = fd.getFullName

are they same? The previous code uses fd.getFullName

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think the previous code was incorrect. We need to verify if a same Protobuf type was seen before in this DFS traversal.
@SandishKumarHN what was the unit test that verified recursion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan fd.getFullName gives a fully qualified name along with a field name, we needed the fully qualified type name. we made this decision above.

here is the difference.

println(s"${fd.getFullName} : ${fd.getMessageType.getFullName}")

org.apache.spark.sql.protobuf.protos.Employee.ic : org.apache.spark.sql.protobuf.protos.IC
org.apache.spark.sql.protobuf.protos.IC.icManager : org.apache.spark.sql.protobuf.protos.Employee
org.apache.spark.sql.protobuf.protos.Employee.ic : org.apache.spark.sql.protobuf.protos.IC
org.apache.spark.sql.protobuf.protos.IC.icManager : org.apache.spark.sql.protobuf.protos.Employee
org.apache.spark.sql.protobuf.protos.Employee.em : org.apache.spark.sql.protobuf.protos.EM

@rangadi previous code fd.getFullName fully qualified name along with a field name works to find out recursion. so before we just use to throw errors on any recursion field.

val recursiveDepth = existingRecordNames.getOrElse(recordName, 0)
val recursiveFieldMaxDepth = protobufOptions.recursiveFieldMaxDepth
if (existingRecordNames.contains(recordName) && (recursiveFieldMaxDepth < 0 ||
recursiveFieldMaxDepth > 10)) {
throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
} else if (existingRecordNames.contains(recordName) &&
recursiveDepth > recursiveFieldMaxDepth) {
Some(NullType)
} else {
val newRecordNames = existingRecordNames + (recordName -> (recursiveDepth + 1))
Option(
fd.getMessageType.getFields.asScala
.flatMap(structFieldFor(_, newRecordNames, protobufOptions))
.toSeq)
.filter(_.nonEmpty)
.map(StructType.apply)
}
val newRecordNames = existingRecordNames + fd.getFullName

Option(
fd.getMessageType.getFields.asScala
.flatMap(structFieldFor(_, newRecordNames))
.toSeq)
.filter(_.nonEmpty)
.map(StructType.apply)
case other =>
throw QueryCompilationErrors.protobufTypeUnsupportedYetError(other.toString)
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,87 @@ message timeStampMsg {
message durationMsg {
string key = 1;
Duration duration = 2;
}
}

message OneOfEvent {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you testing more OneOf and recusion in the same message? Could you split them into separate messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi I see a lot of use cases for the "payload" Oneof the field and recursive fields in it. So I thought combining Oneof with recursion would be a good test. will separate

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined one is fine, we could keep it. Better to have a simpler separate tests as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

string key = 1;
oneof payload {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do one-of fields look like in spark schema? Could you give an example? I could not see the schema in the unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi the "Oneof" field is of message type, Oneof will be converted to a struct type.

int32 col_1 = 2;
string col_2 = 3;
int64 col_3 = 4;
}
repeated string col_4 = 5;
}

message EventWithRecursion {
int32 key = 1;
messageA a = 2;
}
message messageA {
EventWithRecursion a = 1;
messageB b = 2;
}
message messageB {
EventWithRecursion aa = 1;
messageC c = 2;
}
message messageC {
EventWithRecursion aaa = 1;
int32 key= 2;
}

message Employee {
string firstName = 1;
string lastName = 2;
oneof role {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need so many fields for 'OneOf'? How about just 2 or 3? It will simplify testing.

IC ic = 3;
EM em = 4;
EM2 em2 = 5;
}
}

message IC {
repeated string skills = 1;
Employee icManager = 2;
}

message EM {
int64 teamsize = 1;
Employee emManager = 2;
}

message EM2 {
int64 teamsize = 1;
Employee em2Manager = 2;
}

message EventPerson {
string name = 1;
EventPerson bff = 2;
}

message OneOfEventWithRecursion {
string key = 1;
oneof payload {
EventRecursiveA recursiveA = 3;
EventRecursiveB recursiveB = 6;
}
string value = 7;
}

message EventRecursiveA {
OneOfEventWithRecursion recursiveA = 1;
string key = 2;
}

message EventRecursiveB {
string key = 1;
string value = 2;
OneOfEventWithRecursion recursiveA = 3;
}

message Status {
int32 id = 1;
Timestamp trade_time = 2;
Status status = 3;
}
Loading