Skip to content

Conversation

@SandishKumarHN
Copy link
Contributor

@SandishKumarHN SandishKumarHN commented Dec 5, 2022

Oneof fields allow a message to contain one and only one of a defined set of field types, while recursive fields provide a way to define messages that can refer to themselves, allowing for the creation of complex and nested data structures. with this change users will be able to use protobuf OneOf fields with spark-protobuf, making it a more complete and useful tool for processing protobuf data.

Support for circularReferenceDepth:
The recursive.fields.max.depth parameter can be specified in the from_protobuf options to control the maximum allowed recursion depth for a field. Setting recursive.fields.max.depth to 0 drops all-recursive fields, setting it to 1 allows it to be recursed once, and setting it to 2 allows it to be recursed twice. Attempting to set the recursive.fields.max.depth to a value greater than 10 is not allowed. If the recursive.fields.max.depth is not specified, it will default to -1; recursive fields are not permitted. if a protobuf record has more depth for recursive fields than the allowed value, it will be truncated and some fields may be discarded. This check is based on the fully qualified field type.
SQL Schema for the protobuf message

message Person { 
    string name = 1;
    Person bff = 2
}

will vary based on the value of recursive.fields.max.depth.

0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...

What changes were proposed in this pull request?

  • Add support for protobuf oneof field
  • Stop recursion at the first level when a recursive field is encountered. (instead of throwing an error)

Why are the changes needed?

Stop recursion at the first level and handle nulltype in deserilization.

Does this PR introduce any user-facing change?

NA

How was this patch tested?

Added Unit tests for OneOf field support and recursion checks.
Tested full support for nested OneOf fields and message types using real data from Kafka on a real cluster

cc: @rangadi @mposdev21

nullable = false))
case MESSAGE =>
// Stop recursion at the first level when a recursive field is encountered.
// TODO: The user should be given the option to set the recursion level to 1, 2, or 3
Copy link
Contributor Author

@SandishKumarHN SandishKumarHN Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi @mposdev21 Instead of limiting the recursion to only one level, the user should be able to choose a recursion level of 1, 2, or 3. Going beyond 3 levels of recursion should not be allowed. any thoughts?

spark.protobuf.recursion.level

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it is useful. Users may not be able to remove recursive references, but might be willing to limit recursion.
I think the default should be an error with a clear message about how users can set configuration.
Also, I don't think it should be spark config, but rather an option passed in.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to add selectable recursion depth here or in a follow up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi planning to add the selectable recursion depth in this PR.


case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)

case (MESSAGE, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? For handling limited recursion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, correct.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment about we might be dropping data here? It will not be easy to see for a future reader.
We could have an option to error our if the actual data has more recursion than the configure.

nullable = false))
case MESSAGE =>
// Stop recursion at the first level when a recursive field is encountered.
// TODO: The user should be given the option to set the recursion level to 1, 2, or 3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to add selectable recursion depth here or in a follow up?

}
}

message OneOfEvent {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you testing more OneOf and recusion in the same message? Could you split them into separate messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi I see a lot of use cases for the "payload" Oneof the field and recursive fields in it. So I thought combining Oneof with recursion would be a good test. will separate

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined one is fine, we could keep it. Better to have a simpler separate tests as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

parameters = Map("descFilePath" -> testFileDescriptor))
}

test("Unit tests for OneOf field support and recursion checks") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets separate these two into separate tests with separate protobuf message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do that.


message OneOfEvent {
string key = 1;
oneof payload {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do one-of fields look like in spark schema? Could you give an example? I could not see the schema in the unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi the "Oneof" field is of message type, Oneof will be converted to a struct type.

@SandishKumarHN
Copy link
Contributor Author

#38922 (comment)

@rangadi made the below changes.

  • Added selectable recursion depth option to from_protobuf.
  • Added two unit tests for Oneof type, a simple one for Oneof field, and a complex Onefield with recursionDepth=2.
  • Existing unit tests should cover foundRecursionInProtobufSchema if recursionDepth is not set and a recursion field is discovered.

@rangadi
Copy link

rangadi commented Dec 7, 2022

Added selectable recursion depth option to from_protobuf.

Do we need to this for 'to_protobuf()' too? What would happen in that case?

@SandishKumarHN
Copy link
Contributor Author

SandishKumarHN commented Dec 7, 2022

Added selectable recursion depth option to from_protobuf.

Do we need to this for 'to_protobuf()' too? What would happen in that case?

@rangadi
The source dataframe struct field should match the protobuf recursion message for "to protobuf." It will convert until the recursion level is matched. like struct within a struct to recursive message. This is true even for existing code.

@rangadi
Copy link

rangadi commented Dec 7, 2022

The source dataframe struct field should match the protobuf recursion message for "to protobuf." It will convert until the recursion level is matched. like struct within a struct to recursive message. This is true even for existing code.

Interesting. So we would make that null after some depth. Could you add test for this?

@SandishKumarHN
Copy link
Contributor Author

The source dataframe struct field should match the protobuf recursion message for "to protobuf." It will convert until the recursion level is matched. like struct within a struct to recursive message. This is true even for existing code.

Interesting. So we would make that null after some depth. Could you add test for this?

@rangadi will add a test for the above case.
A Spark dataframe with complex nested structures should typically be converted to a protobuf message. It is the user's responsibility to specify right .proto(.desc) file that corresponds to the source dataframe.

@rangadi
Copy link

rangadi commented Dec 7, 2022

file that corresponds to the source dataframe.

They might have used from_protobuf() to get that schema, which supports recursive fields. They should be able to do to_protobuf() with the same protobuf definition.

@SandishKumarHN
Copy link
Contributor Author

file that corresponds to the source dataframe.

They might have used from_protobuf() to get that schema, which supports recursive fields. They should be able to do to_protobuf() with the same protobuf definition.

This case is already been covered in the unit tests. will add a unit test for direct struct to protobuf conversion.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@baganokodo2022
Copy link

Hi @SandishKumarHN,

For the recursionDepth option, could we consider naming it as CircularReferenceTolerance or CircularReferenceDepth for clarity?
For instance, -1 (default value) will error out on any circular reference, 0 drops any circular reference field, 1 allows the same field to be entered twice, and on.

Besides, can we also support a "CircularReferenceType" option with a enum value of [FIELD_NAME, FIELD_TYPE]. The reason is because navigation can go very deep before the same fully-qualified FIELD_NAME is encountered again. While FIELD_TYPE stops recursive navigation much faster. We could make FIELD_NAME the default option. In my test cases, with FIELD_TYPE, a circular reference can repeat 3 times before the executor hit OOM, while FIELD_NAME hit OOM when CircularReferenceTolerance is set to 1.

Please let me know your thoughts.

cc @rangadi

Thank you

Xinyu Liu

@SandishKumarHN
Copy link
Contributor Author

@baganokodo2022 Circler type(specially MESSAGE) occurs frequently in a single message. The user won't be able to distinguish it and fix it (imagine which field should user keep or remove). because each type will have a unique field name that is valid. The user can verify and fix the circular reference in the full field name scenario.

anyways, I have made the initial change as per your idea. have a look at it.

cc: @rangadi

if (existingRecordNames.contains(fd.getFullName)) {
throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
// User can set circularReferenceDepth of 0 or 1 or 2.
// Going beyond 3 levels of recursion is not allowed.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a justification for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi The user can specify the maximum allowed recursion depth for a field by setting the circularReferenceDepth property to 0, 1, or 2. Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed thrice. Attempting to set the circularReferenceDepth to a value greater than 2 is not allowed. If the circularReferenceDepth is not specified, it will default to -1, which disables recursive fields.

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)

val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN @baganokodo2022 moving the discussion here (for threading).

Besides, can we also support a "CircularReferenceType" option with a enum value of [FIELD_NAME, FIELD_TYPE]. The reason is because navigation can go very deep before the same fully-qualified FIELD_NAME is encountered again. While FIELD_TYPE stops recursive navigation much faster. ...

I didn't quite follow the motivation here. Could you give a concrete examples for the two difference cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi we already know about field_name circusive check. using fd.getFullName we detect the recursion and throw and error. another option is to detect recursion through field type. example below.

message A {
B b;
}

message B {
A c;
}

in the case of field_name recursive check it is A.B.C no recursion.
in the case of field_type recursive check. it is MESSAGE.MESSAGE.MESSAGE recursion will be found and throw an error or drop the certain recursive depth.
but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE

message A {
B b = 1;
}

message B {
D d = 1;
}

message D {
E e = 1;
}

message E {
int32 key = 1;
}

@baganokodo2022 argument is field_type base check will give users an option to drop recursion more quickly because with complex nested schema recursive field_name can be found at very deep. before hitting this we might see OOM. field_type base check finds the circle reference more quickly.

@baganokodo2022 please correct me if I'm wrong.

Copy link

@rangadi rangadi Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case of field_name recursive check it is A.B.C no recursion.

The first example is clearly recursion. What is 'C' here?

but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE

Why is this recursion?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are our unit tests showing these cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have @baganokodo2022 give more details on the field type case.

We have not yet added unit tests for the field-type case. would like to discuss this before adding unit tests.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread would be A.B.A.aa.D.d.A.aaa.E

What is this thread?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this discussion, let's write down functionality and examples, before we implement so that we are all on the same page.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi fd.fullName is able to detect the recursive field with different field names. add a unit test. now I'm confused.
Fail for recursion field with different field names

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) yeah, field names should not matter at all.
We can do video chat to clarify all this.

Copy link
Contributor Author

@SandishKumarHN SandishKumarHN Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi @baganokodo2022 thanks for the quick meet. meeting conclusion was to use descriptor type full name and added unit tests with some complex schema.

val recordName = fd.getMessageType.getFullName

(protobufOptions.circularReferenceDepth < 0 ||
protobufOptions.circularReferenceDepth >= 3)) {
throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
} else if (existingRecordTypes.contains(fd.getType.name()) &&
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name or full name?
also what keeps track of the recursion depth?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi we have two maps with incremental counters, one for field_name base check and one for field_type.

Copy link

@baganokodo2022 baganokodo2022 Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN and @rangadi , should we error out on -1 the default value unless users specifically override?
0 (tolerance) -> drop all recursed fields once encountered
1 (tolerance) -> allowed the same field name (type) to be entered twice.
2 (tolerance) -> allowed the same field name (type) to be entered 3 timce.

thoughts?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my back-ported branch,

        val recordName = circularReferenceType match {
          case CircularReferenceTypes.FIELD_NAME =>
            fd.getFullName
          case CircularReferenceTypes.FIELD_TYPE =>
            fd.getFullName().substring(0, fd.getFullName().lastIndexOf(".")) 
        }
        
        if (circularReferenceTolerance < 0 && existingRecordNames(recordName) > 0) {
          // no tolerance on circular reference
          logError(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}")
          throw new IllegalStateException(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}")
        }

        if (existingRecordNames(recordName) > (circularReferenceTolerance max 0) ) {
          // stop navigation and drop the repetitive field
          logInfo(s"circular reference in protobuf schema detected [max tolerance breached] field dropped - ${recordName} = ${existingRecordNames(recordName)}")
          Some(NullType)
        } else {
          val newRecordNames: Map[String, Int] = existingRecordNames +  
            (recordName -> (1 + existingRecordNames(recordName)))
          Option(
            fd.getMessageType.getFields.asScala
              .flatMap(structFieldFor(_, newRecordNames, protobufOptions))
              .toSeq)
            .filter(_.nonEmpty)
            .map(StructType.apply)
        }```

assert(expectedFields.contains(f.getName))
})

val schema = StructType(Seq(StructField("sample",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, using `val schema = DataType.fromJson("json string") is lot more readable.
Optional we could update many of these in follow up PRs.

parameters = Map("descFilePath" -> testFileDescriptor))
}

test("Unit test for Protobuf OneOf field") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a short description of the test at the top. It improves readability. What is this verifying?

Remove "Unit test for", this is already a unit test :).

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there tests for recursive fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi yes,
Handle recursive fields in Protobuf schema, C->D->Array(C) and
Handle recursive fields in Protobuf schema, A->B->A

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move that to different tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi I didn't understand. these are already two different tests.

Copy link

@baganokodo2022 baganokodo2022 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the PR

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)

val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @SandishKumarHN you are right. That is discovered from a very complex Proto schema shared across many micro services.

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)

val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rangadi , under certain circumstances dropping fields with data seems inevitable when dealing with circular references. We can't tell which fields are intended to be kept. One example is the parent-child relationship in a RDB data model, considering IC -> EM -> EM2 -> Director -> Senior Director -> VP -> CTO -> CEO, which are all Employee type, assuming the relationship is bi-directional. The longest path for level-1 circular reference on FIELD_NAME is IC -> EM -> EM2 -> Director -> Senior Director -> VP -> CTO -> CEO -> CTO -> VP -> Senior Director -> Director -> EM2 -> EM -> IC. In reality, data scientists may just want to keep 2 levels of circular reference on FIELD_TYPE , IC -> EM -> EM2, or EM2 -> Director -> Senior Director. This greatly reduces redundant data in the warehouse.

Hope it make sense

Thanks
Xinyu

fd: FieldDescriptor,
existingRecordNames: Set[String]): Option[StructField] = {
existingRecordNames: Map[String, Int],
existingRecordTypes: Map[String, Int],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN since it is going to be either FIELD_NAME or FIELD_TYPE, do we need keep both 2 Maps?

(protobufOptions.circularReferenceDepth < 0 ||
protobufOptions.circularReferenceDepth >= 3)) {
throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
} else if (existingRecordTypes.contains(fd.getType.name()) &&
Copy link

@baganokodo2022 baganokodo2022 Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN and @rangadi , should we error out on -1 the default value unless users specifically override?
0 (tolerance) -> drop all recursed fields once encountered
1 (tolerance) -> allowed the same field name (type) to be entered twice.
2 (tolerance) -> allowed the same field name (type) to be entered 3 timce.

thoughts?

(protobufOptions.circularReferenceDepth < 0 ||
protobufOptions.circularReferenceDepth >= 3)) {
throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
} else if (existingRecordTypes.contains(fd.getType.name()) &&

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my back-ported branch,

        val recordName = circularReferenceType match {
          case CircularReferenceTypes.FIELD_NAME =>
            fd.getFullName
          case CircularReferenceTypes.FIELD_TYPE =>
            fd.getFullName().substring(0, fd.getFullName().lastIndexOf(".")) 
        }
        
        if (circularReferenceTolerance < 0 && existingRecordNames(recordName) > 0) {
          // no tolerance on circular reference
          logError(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}")
          throw new IllegalStateException(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}")
        }

        if (existingRecordNames(recordName) > (circularReferenceTolerance max 0) ) {
          // stop navigation and drop the repetitive field
          logInfo(s"circular reference in protobuf schema detected [max tolerance breached] field dropped - ${recordName} = ${existingRecordNames(recordName)}")
          Some(NullType)
        } else {
          val newRecordNames: Map[String, Int] = existingRecordNames +  
            (recordName -> (1 + existingRecordNames(recordName)))
          Option(
            fd.getMessageType.getFields.asScala
              .flatMap(structFieldFor(_, newRecordNames, protobufOptions))
              .toSeq)
            .filter(_.nonEmpty)
            .map(StructType.apply)
        }```

}
}

message OneOfEvent {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@HeartSaVioR
Copy link
Contributor

cc. @cloud-fan I guess there has been some demands on recursive schema already. Could you please help looking into the proposal and see whether it makes sense to you? Please add more ppl in the loop if you know others who would be interested.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 9, 2022

I guess the demand on supporting recursive schema is not specific to protobuf, it also applies to Avro. If we construct a way how to project the recursive schema into Spark SQL's schema, we may want to apply this consistently across components.

The visibility of this PR is too limited, ppl interested in protobuf would only look into this. Instead of deciding such thing within this PR, it seems like going through discussion thread in dev@ for this is not a bad idea. What do you all think?

If you have a proposal, please write it down to the doc format e.g. google doc, with several examples, and share it in the discussion thread. The description of the PR does not seem to be enough to understand what this PR (or some other) is proposing.

@SandishKumarHN
Copy link
Contributor Author

SandishKumarHN commented Dec 9, 2022

@baganokodo2022 instead of implementing a check for circular reference type in this PR, can we discuss this further and write a proposal for it before adding it to the next PR? We can share the proposal with the [email protected] mailing list for feedback and input.

@rangadi @HeartSaVioR In this PR, we will only implement a check for circular references through the full field name. Let me know if any further changes are needed.

@rangadi
Copy link

rangadi commented Dec 9, 2022

@SandishKumarHN cold you keep the discussion in the code review thread? It is hard to piece together multiple messages otherwise. I think the it is fairly straight forward what recursion means. There seems to be some confusion about that. Let's discuss in the thread.

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)

// Setting the `recursive.fields.max.depth` to 0 allows the field to be recurse once,
Copy link

@rangadi rangadi Dec 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'0' disables recursion right? Why once? This might be difference in terminology. Thats why giving a quick example is better. Could you add this example?:

Consider a simple simple recursive proto 'message Person { string name = 1; Person bff = 2}

What would be spark schema when recursion 0, 1, and 2? I think :

  • 0: struct<name: string, bff: null>
  • 1: struct<name string, bff: <name: string, bff: null>>
  • 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi Thank you for your suggestion. I have implemented it by adding a comment and a unit test to make the example more clear to users.

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Made a couple of suggestions to clarify how schema looks like with recursive limit. Both in comment and unit test.

}
}

test("Fail for recursion field with different field names without circularReferenceDepth") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix circularReferenceDepth in the name.

}
}

test("recursion field with different field names with circularReferenceDepth") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the name.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this testing? As we discussed, field name does not matter.

eventFromSparkSchema.getDescriptorForType.getFields.asScala.map(f => {
assert(expectedFields.contains(f.getName))
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add test that clearly shows the expected schema similar to my comment here: https://github.com/apache/spark/pull/38922/files#r1051292604

It is not easy to seem from these test what schema does 0 or 2 results in.

"RECURSIVE_PROTOBUF_SCHEMA" : {
"message" : [
"Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
"Found recursive reference in Protobuf schema, which can not be processed by Spark by default: <fieldDescriptor>. try setting the option `recursive.fields.max.depth` as 0 or 1 or 2. Going beyond 3 levels of recursion is not allowed."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 3 or above not alloweded? Seems pretty low. If a customer wants to set the level, they will be conscious. I think it should be at least high single digits to cover most cases. How about 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi agree.

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Thanks.

@rangadi
Copy link

rangadi commented Dec 20, 2022

Asking @HeartSaVioR to take a quick look to approve.
@cloud-fan take a look at the updated PR description for example of how spark schema would look like with the different setting for the config.

// specified, the default value is -1; recursive fields are not permitted. If a protobuf
// record has more depth than the allowed value for recursive fields, it will be truncated
// and some fields may be discarded.
val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option name may need a bit more discussion. Usually data source options do not have long names, and don't contains dot. See JSONOptions. How about maxRecursiveFieldDepth?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan this is in line with options for Kafka source. e.g. 'kafka.' prefix allows setting Kafka clientconfigs.

In addition, we will be passing more options. E.g. for schema registry auth configs. They will have a prefix like 'confluent.schemaregistry.[actual registry client conf]'

def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized {
def toSqlTypeHelper(
descriptor: Descriptor,
protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but why would we lock ScalaReflectionLock here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just noticed. Not sure if if we need.
@SandishKumarHN could we remove this in a follow up?

def structFieldFor(
fd: FieldDescriptor,
existingRecordNames: Set[String]): Option[StructField] = {
existingRecordNames: Map[String, Int],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add comments to explain what map key and value means here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan added a comment.

// 0: struct<name: string, bff: null>
// 1: struct<name string, bff: <name: string, bff: null>>
// 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
val recordName = fd.getMessageType.getFullName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val recordName = fd.getMessageType.getFullName
val recordName = fd.getFullName

are they same? The previous code uses fd.getFullName

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think the previous code was incorrect. We need to verify if a same Protobuf type was seen before in this DFS traversal.
@SandishKumarHN what was the unit test that verified recursion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan fd.getFullName gives a fully qualified name along with a field name, we needed the fully qualified type name. we made this decision above.

here is the difference.

println(s"${fd.getFullName} : ${fd.getMessageType.getFullName}")

org.apache.spark.sql.protobuf.protos.Employee.ic : org.apache.spark.sql.protobuf.protos.IC
org.apache.spark.sql.protobuf.protos.IC.icManager : org.apache.spark.sql.protobuf.protos.Employee
org.apache.spark.sql.protobuf.protos.Employee.ic : org.apache.spark.sql.protobuf.protos.IC
org.apache.spark.sql.protobuf.protos.IC.icManager : org.apache.spark.sql.protobuf.protos.Employee
org.apache.spark.sql.protobuf.protos.Employee.em : org.apache.spark.sql.protobuf.protos.EM

@rangadi previous code fd.getFullName fully qualified name along with a field name works to find out recursion. so before we just use to throw errors on any recursion field.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me except for some minor comments

@rangadi
Copy link

rangadi commented Dec 20, 2022

jenkins merge

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d33a59c Dec 21, 2022
cloud-fan pushed a commit that referenced this pull request Dec 22, 2022
…nverters

### What changes were proposed in this pull request?

Following up from PR #38922 to remove unnecessary ScalaReflectionLock from SchemaConvertors file.

cc: cloud-fan

### Why are the changes needed?

removing unnecessary code

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing unit tests

Closes #39147 from SandishKumarHN/SPARK-41639.

Authored-by: SandishKumarHN <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@rangadi
Copy link

rangadi commented Feb 14, 2023

See #40011 for a follow up tweak for this config. '0' is not supported. Fixes how the limit is applied.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants