Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12612: Remove checksum from ConsumerRecord/RecordMetadata for 3.0 #10470

Merged
merged 14 commits into from
Apr 14, 2021

Conversation

ijuma
Copy link
Member

@ijuma ijuma commented Apr 3, 2021

The methods have been deprecated since 0.11 without replacement since
message format 2 moved the checksum to the record batch (instead of the
record).

Unfortunately, we did not deprecate the constructors that take a checksum
(even though we intended to) so we cannot remove them. I have deprecated
them for removal in 4.0 and added a single non deprecated constructor to
ConsumerRecord and RecordMetadata that take all remaining parameters.
ConsumerRecord could do with one additional convenience constructor, but
that requires a KIP and hence should be done separately.

Also:

  • Removed ChecksumMessageFormatter, which is technically not public
    API, but may have been used with the console consumer.
  • Updated all usages of ConsumerRecord/RecordMetadata constructors
    to use the non deprecated ones.
  • Added tests for deprecated ConsumerRecord/RecordMetadata`
    constructors.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma
Copy link
Member Author

ijuma commented Apr 3, 2021

Will update upgrade.html soon.

@ijuma
Copy link
Member Author

ijuma commented Apr 3, 2021

Will update upgrade.html soon.

Done.

@ijuma ijuma requested review from chia7712 and dajac April 3, 2021 18:30
@@ -614,18 +613,3 @@ class NoOpMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
}

class ChecksumMessageFormatter extends MessageFormatter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should explicitly mention this one in the upgrade notes. It could be considered as part of the console-consumer tool which is part of our public API. It is in a gray zone. I am pretty sure that it is not used anymore but how knows... What's you take on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I was debating it myself. Technically, this is not public API since it's not mentioned in the public javadoc and it's in the core module (everything is private in the core module).

However, it is possible that people would pass this class when using the console consumer. The checksum value has never been correct for message format 2, which has been the default since 0.11, so I figured the chances that people actually used this semi public API is low. Seems like there isn't much harm in mentioning it in the upgrade notes though, so will add.

ijuma added 2 commits April 6, 2021 15:06
…ecated since 0.11.

These has been deprecated since 0.11, but there are no replacements.
Message format 2 moved the checksum to the record batch (instead of the
record).

clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma thanks for great cleanup!

* @deprecated use constructor without `checksum` parameter.
*/
@Deprecated
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we have to offer this constructor. We remove the checksum from all constructors of ConsumerRecord even though we don't add deprecation annotation to constructors before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably need to add back some of the ConsumerRecord constructors since they were not previously deprecated.

public long checksum() {
if (checksum == null)
// The checksum is null only for message format v2 and above, which do not have a record-level checksum.
this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove computePartialChecksum? It is unused now.

@@ -1385,7 +1385,7 @@ private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType, record.checksumOrNull(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove checksumOrNull from Record? That method works for AbstractLegacyRecordBatch only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had initially removed it. But I added it back since it's used by DumpLogSegments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DumpLogSegments can call RecordBatch#checksum instead of checksumOrNull if the record is legacy. I can file a PR as follow-up to address that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that suggestion works. However, you could add conditional code in DumpLogSegments so that the crc at the record level is only printed if it's AbstractLegacyRecordBatch. Sounds good as a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good as a follow-up.

#10498

@@ -65,7 +64,6 @@ public void testChecksumNullForMagicV2() {
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
assertNotNull(future);
assertNull(future.checksumOrNull());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test case (testChecksumNullForMagicV2) is meaningless after we remove assertNull(future.checksumOrNull()). It seems to me it is ok to remove whole test case.

ijuma added 2 commits April 6, 2021 22:10
* apache-github/trunk:
  KAFKA-10769 Remove JoinGroupRequest#containsValidPattern as it is dup… (apache#9851)
  KAFKA-12384: stabilize ListOffsetsRequestTest#testResponseIncludesLeaderEpoch (apache#10389)
  KAFKA-5146: remove Connect dependency from Streams module (apache#10131)
@ijuma ijuma force-pushed the remove-checksum branch from f3f284f to bfa4f6c Compare April 7, 2021 05:49
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -32,7 +31,6 @@
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this line need deprecation cycle also?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
*
* @deprecated use one of the constructors without a `checksum` parameter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a pity that we have to carry on these constructors. Should we explicitly add the deprecation version in the message here and in the others? Thinking about the future, it is a little easier to decide when they can be removed when they do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree it's annoying. Also, it would be nice to have a constructor like:

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          long timestamp,
                          TimestampType timestampType,
                          K key,
                          V value)

And deprecate:

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          K key,
                          V value) {

But that requires a KIP. For now, I will remove some of the new constructors I added and add a single one:

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          long timestamp,
                          TimestampType timestampType,
                          int serializedKeySize,
                          int serializedValueSize,
                          K key,
                          V value,
                          Headers headers,
                          Optional<Integer> leaderEpoch) {

I'll update the PR with these changes tomorrow.

@ijuma ijuma changed the title KAFKA-12612: Remove checksum from ConsumerRecord and RecordMetadata for 3.0 KAFKA-12612: Remove checksum from ConsumerRecord/RecordMetadata for 3.0 Apr 14, 2021
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -398,7 +398,7 @@ private int writeLegacyCompressedWrapperHeader() {
/**
* Append a record and return its checksum for message format v0 and v1, or null for v2 and above.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this docs?

@ijuma
Copy link
Member Author

ijuma commented Apr 14, 2021

3 jobs passed, one (Build / JDK 15 and Scala 2.13) failed with unrelated flakes:

org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnectorBuild / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()

@ijuma ijuma merged commit 89933f2 into apache:trunk Apr 14, 2021
@ijuma ijuma deleted the remove-checksum branch April 14, 2021 21:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants