Skip to content

Conversation

@redsk
Copy link
Contributor

@redsk redsk commented Oct 17, 2019

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-29500

KafkaRowWriter now supports setting the Kafka partition by reading a "partition" column in the input dataframe.

Code changes in commit nr. 1.
Test changes in commit nr. 2.
Doc changes in commit nr. 3.

@tcondie @dongjinleekr @srowen

Why are the changes needed?

While it is possible to configure a custom Kafka Partitioner with
.option("kafka.partitioner.class", "my.custom.Partitioner"), this is not enough for certain use cases. See the Jira issue.

Does this PR introduce any user-facing change?

No, as this behaviour is optional.

How was this patch tested?

Two new UT were added and one was updated.

is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
If a partition column is not specified then the partition is calculated by the Kafka producer
(using ```org.apache.kafka.clients.producer.internals.DefaultPartitioner```).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to avoid mentioning default of Kafka directly, as Kafka could change it and the information becomes stale then. I feel that's OK to just mention it will follow the configuration of Kafka.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on not mentioning the Kafka internal class, just saying default partitioner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

)
}

test("batch - partition column vs default Kafka partitioner") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we add this test to check the functionality to specify partition, I think we don't need to change above test.

Copy link
Contributor Author

@redsk redsk Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it because a previous PR (headers) changed it as well. But I can revert it to the previous state if so desired. Shall I?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally yes, but we can hear more voices on this as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think test("batch - write to kafka") should be the simplest test and shouldn't contain neither headers nor partitions (I know for headers it's a bit late).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through the test but not sure if it covers all. Here are my thought:

  • Not sure why 100 partitions necessary?
  • Don't we need the following tests? 1. kafka.partitioner.class overrides default 2. column overrides kafka.partitioner.class and default
  • I think this test can be formed more simple. Put fixed key into a topic, get the partition (just like it is now but see my next comment). In another topic put all the data into a partition which is not the formerly get partition. Finally read back and double check.
  • .collect().toList.head maybe not enough because it would be good to make sure the data is in one partition.
  • There are couple of copy-pastes here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.collect() docstring says "Returns an array that contains all rows in this Dataset. Running collect requires moving all the data into the application's driver process, and doing so on a very large dataset can crash the driver process with OutOfMemoryError." Does collect() work on a per-partition base? If so maybe I could use .coalesce(1) before calling collect().

What I mean something like this:
createKafkaReader(keyTopic).select("partition").map(_.getInt(0)).collect().toSet.size() === 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, they are also all over the file.

Yes, the code can be enhanced several places but that doesn't mean we have to do the same copy-paste.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean something like this:
createKafkaReader(keyTopic).select("partition").map(_.getInt(0)).collect().toSet.size() === 1

I'm not sure I understand. Do you want me to assert that? Is this necessary considering that I send just one record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the code can be enhanced several places but that doesn't mean we have to do the same copy-paste.

There are different opinions on copy-paste in UTs and I just copied the pattern in use in the file. But I also lean towards not copy-pasting. I can write a small function for that. Shall I apply it to other tests in the same function or just my contribution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order not to have feature creep I would suggest to keep this for you change set. Later we can file another PR for the rest...


test("batch - partition column vs default Kafka partitioner") {
val fixedKey = "fixed_key"
val nrPartitions = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't matter if we just set number of partitions to 10 - we have flaky test failures where we cannot get metadata from Kafka in timeout, so I'd rather not make pressure on Kafka side.

)
}

test("batch - non-existing partitions trigger standard Kafka exception") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would avoid depending on the behavior of Kafka - I'd feel we don't need to have this test, but let's hear more voices.

Copy link
Contributor Author

@redsk redsk Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main question was: "what happens if I specify partitions that don't exist?" And the answer is that Kafka throws an exception with msg: Topic $topic not present in metadata after $producerTimeout ms which is not particularly clear.

I could have called producer.partitionsFor(topic) to check if the specified partition is in range (and throw a more meaningful exception if not) but it's an expensive call for each message. I could cache the result but the number of partitions could change in between calls and I didn't want to complicate the implementation. Hence the need to see what the standard Kafka producer does (which is "not much").

And the rationale is: if Kafka implementation changes, this test will fail and we'll have a chance to provide a more meaningful exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to know what happens when such case happens. We've tested it and it's good.
From Spark test perspective we must ensure that our code works as expected and I think we don't have to pull in Kafka internals (I mean messages and timeouts) unless it's super important. I think it's not the case here which makes this test unnecessary from my point of view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll remove it.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good feature.

  • Just to double check won't this change the schema of a stateful query?
  • The continuous test coverage can be increased

is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
If a partition column is not specified then the partition is calculated by the Kafka producer
(using ```org.apache.kafka.clients.producer.internals.DefaultPartitioner```).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on not mentioning the Kafka internal class, just saying default partitioner.

option is set i.e., the "topic" configuration option overrides the topic column.
If a partition column is not specified then the partition is calculated by the Kafka producer
(using ```org.apache.kafka.clients.producer.internals.DefaultPartitioner```).
This can be overridden in Spark by setting the ```kafka.partitioner.class``` option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly the fallback scenario is the following and the later entries can override the former?

  • Default
  • kafka.partitioner.class provided
  • Partition id column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. But this is KafkaProducer standard behaviour:

  • it uses ProducerRecord partition field. if null, fall backs to:
  • kafka.partitioner.class provided. If not set:
  • use default partitioner.

I don't believe we need a test for this (otherwise we would be testing Kafka API) but maybe we should explicitly state it in the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would like to test here is that Spark parameterizes producer properly.
These testing points has nothing to do with producer internals:

  • kafka.partitioner.class config reaches producer instances and takes effect
  • Partition field is set under some circumstances

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would like to test here is that Spark parameterizes producer properly.
These testing points has nothing to do with producer internals:
* kafka.partitioner.class config reaches producer instances and takes effect

I actually manually tested this and it does because they are passed down by other classes (see val specifiedKafkaParams = kafkaParamsForProducer(caseInsensitiveParameters), line 177 of KafkaSourceProvider. Any config that starts with kafka. will be passed down to the producer, actually. The problem is that this is not stated in the doc (it only mentions two optional configs). Maybe I can update the doc in this respect?

* Partition field is set under some circumstances

Don't the other test I added prove this point already?

In any case, I'm fine with adding two more tests:

  1. kafka.partitioner.class overrides default partitioner.
  2. partition column overrides kafka.partitioner.class.

I can write a simple Kafka Partitioner that always spits 0 and test it with the same pattern
(collect() and then two topics as you suggested).

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any config that starts with kafka. will be passed down to the producer, actually.

There are exceptions, please see them either in the doc or in the code. If such thing is not checked by a test then it can be broken easily. Just a personal thought: if something is not covered in a test later it's super hard to find out it's a bug or a feature...

Don't the other test I added prove this point already?

If we add the mentioned simplified test then you're right it will cover the partition field part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need test 1 (kafka.partitioner.class overrides default partitioner)?

throw new NullPointerException(s"null topic present in the data. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
}
val partition: java.lang.Integer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Int is not enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to put null into parameter : Scala Int doesn't allow null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks. This case Integer is enough, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.Integer allows null so it's enough. If I understood you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean: val partition: Integer =...

assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this copy-paste started earlier but it's a good opportunity to reduce it.
As I see only the select expression and the message is changing here, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just submitted a patch #26158 to handle refactor to old ones: depending on which one to merge first, we can apply it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, started to review it...

)
}

test("batch - partition column vs default Kafka partitioner") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think test("batch - write to kafka") should be the simplest test and shouldn't contain neither headers nor partitions (I know for headers it's a bit late).

)
}

test("batch - partition column vs default Kafka partitioner") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through the test but not sure if it covers all. Here are my thought:

  • Not sure why 100 partitions necessary?
  • Don't we need the following tests? 1. kafka.partitioner.class overrides default 2. column overrides kafka.partitioner.class and default
  • I think this test can be formed more simple. Put fixed key into a topic, get the partition (just like it is now but see my next comment). In another topic put all the data into a partition which is not the formerly get partition. Finally read back and double check.
  • .collect().toList.head maybe not enough because it would be good to make sure the data is in one partition.
  • There are couple of copy-pastes here as well.

)
}

test("batch - non-existing partitions trigger standard Kafka exception") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to know what happens when such case happens. We've tested it and it's good.
From Spark test perspective we must ensure that our code works as expected and I think we don't have to pull in Kafka internals (I mean messages and timeouts) unless it's super important. I think it's not the case here which makes this test unnecessary from my point of view.

@HeartSaVioR
Copy link
Contributor

Just to double check won't this change the schema of a stateful query?

The partition attribute is only added on writer schema in this patch; it won't affect the schema of a stateful query. The partition attribute already exists in reader schema.

@redsk
Copy link
Contributor Author

redsk commented Oct 18, 2019

* The continuous test coverage can be increased

What would you suggest? Aren't batch, streaming, and continuous streaming using the same KafkaRowWriter class under the hood?

@gaborgsomogyi
Copy link
Contributor

What would you suggest? Aren't batch, streaming, and continuous streaming using the same KafkaRowWriter class under the hood?

I've had a deeper look and I tend to agree to depend on micro-batch tests.

@redsk
Copy link
Contributor Author

redsk commented Oct 18, 2019

I've had a deeper look and I tend to agree to depend on micro-batch tests.

So no more tests need on this front, right?

@gaborgsomogyi
Copy link
Contributor

Yep.

@redsk
Copy link
Contributor Author

redsk commented Oct 19, 2019

@gaborgsomogyi I've addressed requests for doc and code.

I've simplified the test batch - partition column sets partition in kafka writes as you suggested
I've added the test batch - partition column and partitioner priorities which shows the priorities of the different partitioning strategies.

Actually, I'm not sure the former test is still relevant as IMO completely subsumed by the second.

Comment on lines 624 to 631
override def partition(
topic: String,
key: Any,
keyBytes: Array[Byte],
value: Any,
valueBytes: Array[Byte],
cluster: Cluster
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent more or less.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only minor things found.

assert(partitions.head != keyPartition)
}

test("batch - partition column and partitioner priorities") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking a bit different test split but this is also fine. It covers all the priority scenarios, good job. In this case I agree with your view about not having test("batch - partition column sets partition in kafka writes") since it just duplicates things. From my view we can remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this test covers everything. Great.


// custom partitioner (always returns 0) overrides default partitioner
writeToKafka(df, topic2, Map(
"kafka.partitioner.class" -> "org.apache.spark.sql.kafka010.TestKafkaPartitioner"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe these can be moved into a constant.

@gaborgsomogyi
Copy link
Contributor

As we've spoken it would be good to merge #26158 before this to simplify the area a bit. So I'm going to give my approval when the other one merged.

@redsk
Copy link
Contributor Author

redsk commented Oct 21, 2019

@gaborgsomogyi All changes made.

As we've spoken it would be good to merge #26158 before this to simplify the area a bit. So I'm going to give my approval when the other one merged.

Ok. Does #26158 require more work? Doesn't seem so by looking at its thread.

Do you think there is any chance of a Spark 2.4.5 release? In that case, I'd happily backport this PR to the 2.4 branch.

@gaborgsomogyi
Copy link
Contributor

@redsk #26158 needs to be reviewed and I'm doing that right now.

Do you think there is any chance of a Spark 2.4.5 release?

This is a new feature and we're not backporting such things so I don't think it will appear on 2.4 line.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a nit.

value: Any,
valueBytes: Array[Byte],
cluster: Cluster)
: Int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: append this to the previous line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the style guide, https://github.com/databricks/scala-style-guide#spacing-and-indentation , the return type on a new line is also acceptable (and I like it better :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can take a look at other codes and see how the code looks like. most likely we don't prefer unnecessary line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 sticking to the available code style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright then. Changed.

assert(partitions.head != keyPartition)
}

test("batch - partition column and partitioner priorities") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this test covers everything. Great.

@HeartSaVioR
Copy link
Contributor

I can even rebase #26158 with the change here if this can be merged sooner than #26158.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Oct 22, 2019

I can even rebase #26158 with the change here if this can be merged sooner than #26158.

Makes sense, either will be merged indicates conflict resolution on the other side.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Minus the nit.

value: Any,
valueBytes: Array[Byte],
cluster: Cluster)
: Int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 sticking to the available code style.

@redsk
Copy link
Contributor Author

redsk commented Oct 22, 2019

It seems that linter checks in CI have been failing in the last 12 hours (for all branches).

[error] (*:update) sbt.ResolveException: unresolved dependency: com.typesafe#sbt-mima-plugin;0.3.0: not found

Nobody seems to have mentioned that in the Spark dev mailing list, though.

@HeartSaVioR
Copy link
Contributor

The issue regarding mima plugin got resolved.

@redsk
Copy link
Contributor Author

redsk commented Oct 24, 2019

Is there anything else I can do? Do you thing this can be merged now?

@HeartSaVioR
Copy link
Contributor

Sorry, but I'm not a committer. You may still be encouraged to ping someone who "commits" the file (not author list) to review this. Like cc. someone.

@HeartSaVioR
Copy link
Contributor

cc. @tdas @zsxwing @jose-torres as they are committers handled many of things in SS area.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK pending tests

@SparkQA
Copy link

SparkQA commented Oct 24, 2019

Test build #4904 has finished for PR 26153 at commit 96a8e9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@redsk
Copy link
Contributor Author

redsk commented Oct 25, 2019

@srowen it seems that tests passed cleanly. Is this PR ready to be merged?

@srowen
Copy link
Member

srowen commented Oct 25, 2019

Merged to master

@srowen srowen closed this in 8bd8f49 Oct 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants