From d163f6e887f5792fa37142fdc2567d24f30f2a66 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Tue, 17 Dec 2024 14:29:59 +0100 Subject: [PATCH] impl --- .../StreamsInvalidTopologyEpochException.java | 23 +++ .../StreamsInvalidTopologyException.java | 23 +++ .../StreamsTopologyFencedException.java | 23 +++ .../apache/kafka/common/protocol/ApiKeys.java | 4 +- .../apache/kafka/common/protocol/Errors.java | 8 +- .../common/requests/AbstractRequest.java | 2 + .../common/requests/AbstractResponse.java | 2 + .../StreamsGroupHeartbeatRequest.java | 88 ++++++++++++ .../StreamsGroupHeartbeatResponse.java | 101 ++++++++++++++ .../message/StreamsGroupHeartbeatRequest.json | 132 ++++++++++++++++++ .../StreamsGroupHeartbeatResponse.json | 117 ++++++++++++++++ .../common/requests/RequestResponseTest.java | 12 ++ .../kafka/network/RequestConvertToJson.java | 8 ++ 13 files changed, 541 insertions(+), 2 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyEpochException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyFencedException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java create mode 100644 clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json create mode 100644 clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyEpochException.java new file mode 100644 index 0000000000000..2c14a2d37cb87 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyEpochException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsInvalidTopologyEpochException extends ApiException { + public StreamsInvalidTopologyEpochException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java new file mode 100644 index 0000000000000..28a5c8ab77de8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsInvalidTopologyException extends ApiException { + public StreamsInvalidTopologyException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyFencedException.java new file mode 100644 index 0000000000000..8e4120221a0f1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyFencedException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsTopologyFencedException extends ApiException { + public StreamsTopologyFencedException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index e95882be69927..b96a4db382f4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -130,7 +130,9 @@ public enum ApiKeys { READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true), WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true), DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true), - READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true); + READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), + STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT); + private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 309ae7bc86a36..2dbf3abde9791 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -119,6 +119,9 @@ import org.apache.kafka.common.errors.SnapshotNotFoundException; import org.apache.kafka.common.errors.StaleBrokerEpochException; import org.apache.kafka.common.errors.StaleMemberEpochException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.errors.StreamsTopologyFencedException; import org.apache.kafka.common.errors.TelemetryTooLargeException; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.errors.TimeoutException; @@ -413,7 +416,10 @@ public enum Errors { DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new), VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new), INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", InvalidRegularExpression::new), - REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new); + REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new), + STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new), + STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new), + STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 87a7a82686951..9c4b6863b8572 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -350,6 +350,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return DeleteShareGroupStateRequest.parse(buffer, apiVersion); case READ_SHARE_GROUP_STATE_SUMMARY: return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion); + case STREAMS_GROUP_HEARTBEAT: + return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 83f29471ba3d4..5c3af7918f342 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -287,6 +287,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return DeleteShareGroupStateResponse.parse(responseBuffer, version); case READ_SHARE_GROUP_STATE_SUMMARY: return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version); + case STREAMS_GROUP_HEARTBEAT: + return StreamsGroupHeartbeatResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java new file mode 100644 index 0000000000000..51ef4069089e6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class StreamsGroupHeartbeatRequest extends AbstractRequest { + + /** + * A member epoch of -1 means that the member wants to leave the group. + */ + public static final int LEAVE_GROUP_MEMBER_EPOCH = -1; + public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2; + + /** + * A member epoch of 0 means that the member wants to join the group. + */ + public static final int JOIN_GROUP_MEMBER_EPOCH = 0; + + public static class Builder extends AbstractRequest.Builder { + private final StreamsGroupHeartbeatRequestData data; + + public Builder(StreamsGroupHeartbeatRequestData data) { + this(data, false); + } + + public Builder(StreamsGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.STREAMS_GROUP_HEARTBEAT, enableUnstableLastVersion); + this.data = data; + } + + @Override + public StreamsGroupHeartbeatRequest build(short version) { + return new StreamsGroupHeartbeatRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsGroupHeartbeatRequestData data; + + public StreamsGroupHeartbeatRequest(StreamsGroupHeartbeatRequestData data, short version) { + super(ApiKeys.STREAMS_GROUP_HEARTBEAT, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new StreamsGroupHeartbeatResponse( + new StreamsGroupHeartbeatResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + ); + } + + @Override + public StreamsGroupHeartbeatRequestData data() { + return data; + } + + public static StreamsGroupHeartbeatRequest parse(ByteBuffer buffer, short version) { + return new StreamsGroupHeartbeatRequest(new StreamsGroupHeartbeatRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java new file mode 100644 index 0000000000000..b4f6f31bc5b72 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#UNKNOWN_MEMBER_ID} + * - {@link Errors#FENCED_MEMBER_EPOCH} + * - {@link Errors#UNSUPPORTED_ASSIGNOR} + * - {@link Errors#UNRELEASED_INSTANCE_ID} + * - {@link Errors#GROUP_MAX_SIZE_REACHED} + * - {@link Errors#STREAMS_INVALID_TOPOLOGY} + * - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH} + * - {@link Errors#STREAMS_TOPOLOGY_FENCED} + */ +public class StreamsGroupHeartbeatResponse extends AbstractResponse { + private final StreamsGroupHeartbeatResponseData data; + + public StreamsGroupHeartbeatResponse(StreamsGroupHeartbeatResponseData data) { + super(ApiKeys.STREAMS_GROUP_HEARTBEAT); + this.data = data; + } + + @Override + public StreamsGroupHeartbeatResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsGroupHeartbeatResponse parse(ByteBuffer buffer, short version) { + return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData( + new ByteBufferAccessor(buffer), version)); + } + + public enum Status { + STALE_TOPOLOGY((byte) 0, "The topology epoch supplied is inconsistent with the topology for this streams group."), + MISSING_SOURCE_TOPICS((byte) 1, "One or more source topics are missing or a source topic regex resolves to zero topics."), + INCORRECTLY_PARTITIONED_TOPICS((byte) 2, "One or more topics expected to be copartitioned are not copartitioned."), + MISSING_INTERNAL_TOPICS((byte) 3, "One or more internal topics are missing."), + SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the whole application."); + + private final byte code; + private final String message; + + Status(final byte code, final String message) { + this.code = code; + this.message = message; + } + + public byte code() { + return code; + } + + public String message() { + return message; + } + } +} diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json new file mode 100644 index 0000000000000..85b6020f31b25 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 88, + "type": "request", + "listeners": ["broker", "zkBroker"], + "name": "StreamsGroupHeartbeatRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member ID generated by the streams consumer. The member ID must be kept during the entire lifetime of the streams consumer process." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of the member otherwise." }, + { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, + "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its tasks otherwise." }, + + { "name": "Topology", "type": "Topology", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The topology metadata of the streams application. Used to initialize the topology of the group and to check if the topology corresponds to the topology initialized for the group. Only sent when memberEpoch = 0, must be non-empty. Null otherwise.", + "fields": [ + { "name": "Epoch", "type": "int32", "versions": "0+", + "about": "The epoch of the topology. Used to check if the topology corresponds to the topology initialized on the brokers." }, + { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+", + "about": "The sub-topologies of the streams application.", + "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "String to uniquely identify the subtopology. Deterministically generated from the topology" }, + { "name": "SourceTopics", "type": "[]string", "versions": "0+", + "about": "The topics the topology reads from." }, + { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+", + "about": "The regular expressions identifying topics the subtopology reads from." }, + { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", + "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, + { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", + "about": "The repartition topics the subtopology writes to." }, + { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", + "about": "The set of source topics that are internally created repartition topics. Created automatically." }, + { "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+", + "about": "A subset of source topics that must be copartitioned.", + "fields": [ + { "name": "SourceTopics", "type": "[]int16", "versions": "0+", + "about": "The topics the topology reads from. Index into the array on the subtopology level." }, + { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+", + "about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." }, + { "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+", + "about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." } + ]} + ]} + ] + }, + + { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Currently owned active tasks for this client. Null if unchanged since last heartbeat." }, + { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Currently owned standby tasks for this client. Null if unchanged since last heartbeat." }, + { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Currently owned warm-up tasks for this client. Null if unchanged since last heartbeat." }, + + { "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Identity of the streams instance that may have multiple consumers. Null if unchanged since last heartbeat." }, + { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "User-defined endpoint for Interactive Queries. Null if unchanged since last heartbeat, or if not defined on the client." }, + { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." }, + + { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Cumulative changelog offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." }, + { "name": "TaskEndOffsets", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Cumulative changelog end-offsets for tasks. Only updated when a warm-up task has caught up, and according to the task offset interval. Null if unchanged since last heartbeat." }, + { "name": "ShutdownApplication", "type": "bool", "versions": "0+", "default": false, + "about": "Whether all Streams clients in the group should shut down." } + ], + + "commonStructs": [ + { "name": "KeyValue", "versions": "0+", "fields": [ + { "name": "Key", "type": "string", "versions": "0+", + "about": "key of the config" }, + { "name": "Value", "type": "string", "versions": "0+", + "about": "value of the config" } + ]}, + { "name": "TopicInfo", "versions": "0+", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The name of the topic." }, + { "name": "Partitions", "type": "int32", "versions": "0+", + "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, + { "name": "ReplicationFactor", "type": "int16", "versions": "0+", + "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, + { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", + "about": "Topic-level configurations as key-value pairs." + } + ]}, + { "name": "Endpoint", "versions": "0+", "fields": [ + { "name": "Host", "type": "string", "versions": "0+", + "about": "host of the endpoint" }, + { "name": "Port", "type": "uint16", "versions": "0+", + "about": "port of the endpoint" } + ]}, + { "name": "TaskOffset", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition." }, + { "name": "Offset", "type": "int64", "versions": "0+", + "about": "The offset." } + ]}, + { "name": "TaskIds", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions of the input topics processed by this member." } + ]} + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json new file mode 100644 index 0000000000000..43b5268e20562 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 88, + "type": "response", + "name": "StreamsGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNKNOWN_MEMBER_ID (version 0+) + // - FENCED_MEMBER_EPOCH (version 0+) + // - UNRELEASED_INSTANCE_ID (version 0+) + // - GROUP_MAX_SIZE_REACHED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) + // - CLUSTER_AUTHORIZATION_FAILED (version 0+) + // - STREAMS_INVALID_TOPOLOGY (version 0+) + // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+) + // - STREAMS_TOPOLOGY_FENCED (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member id is always generated by the streams consumer."}, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, + { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+", + "about": "The heartbeat interval in milliseconds." }, + { "name": "AcceptableRecoveryLag", "type": "int32", "versions": "0+", + "about": "The maximal lag a warm-up task can have to be considered caught-up." }, + { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+", + "about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." }, + + { "name": "Status", "type": "[]Status", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Indicate zero or more status for the group. Null if unchanged since last heartbeat." }, + + // The streams app knows which partitions to fetch from given this information + { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Assigned active tasks for this client. Null if unchanged since last heartbeat." }, + { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Assigned standby tasks for this client. Null if unchanged since last heartbeat." }, + { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, + + // IQ-related information + { "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." , + "fields": [ + { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", + "about": "User-defined endpoint to connect to the node" }, + { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+", + "about": "All partitions available on the node" } + ] + } + ], + "commonStructs": [ + { "name": "Status", "versions": "0+", "fields": [ + // Possible status codes + // 0 - STALE_TOPOLOGY - The topology epoch supplied is lower than the topology epoch for this streams group. + // 1 - MISSING_SOURCE_TOPICS - One or more source topics are missing or a source topic regex resolves to zero topics. + // Missing topics are indicated in the StatusDetail. + // 2 - INCORRECTLY_PARTITIONED_TOPICS - One or more topics are incorrectly partitioned, that is, they are not copartitioned despite being + // part of a copartition group, or the number of partitions in a changelog topic does not correspond + // to the maximal number of source topic partition for that subtopology. + // Incorrectly partitioned topics are indicated in the StatusDetail. + // 3 - MISSING_INTERNAL_TOPICS - One or more internal topics are missing. + // Missing topics are indicated in the StatusDetail. + // The group coordinator will attempt to create all missing internal topics, if any errors occur during + // topic creation, this will be indicated in StatusDetail. + // 4 - SHUTDOWN_APPLICATION - A client requested the shutdown of the whole application. + { "name": "StatusCode", "type": "int8", "versions": "0+", + "about": "A code to indicate that a particular status is active for the group membership" }, + { "name": "StatusDetail", "type": "string", "versions": "0+", + "about": "A string representation of the status." } + ]}, + { "name": "TopicPartition", "versions": "0+", "fields": [ + { "name": "Topic", "type": "string", "versions": "0+", + "about": "topic name" }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "partitions" } + ]}, + { "name": "TaskIds", "versions": "0+", "fields": [ + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions of the input topics processed by this member." } + ]}, + { "name": "Endpoint", "versions": "0+", "fields": [ + { "name": "Host", "type": "string", "versions": "0+", + "about": "host of the endpoint" }, + { "name": "Port", "type": "uint16", "versions": "0+", + "about": "port of the endpoint" } + ]} + ] +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index a838dafacd5a7..bc0a304a09ff6 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -234,6 +234,8 @@ import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState; import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState; import org.apache.kafka.common.message.StopReplicaResponseData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -1124,6 +1126,7 @@ private AbstractRequest getRequest(ApiKeys apikey, short version) { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateRequest(version); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateRequest(version); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version); + case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1218,6 +1221,7 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateResponse(); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse(); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse(); + case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -4035,6 +4039,14 @@ private ReadShareGroupStateSummaryResponse createReadShareGroupStateSummaryRespo return new ReadShareGroupStateSummaryResponse(data); } + private AbstractRequest createStreamsGroupHeartbeatRequest(final short version) { + return new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData()).build(version); + } + + private AbstractResponse createStreamsGroupHeartbeatResponse() { + return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData()); + } + @Test public void testInvalidSaslHandShakeRequest() { AbstractRequest request = new SaslHandshakeRequest.Builder( diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index ac744ef7bac2d..66fd9b52e20a5 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -175,6 +175,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter; import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter; import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter; import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter; import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter; import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter; @@ -356,6 +358,8 @@ import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse; import org.apache.kafka.common.requests.StopReplicaRequest; import org.apache.kafka.common.requests.StopReplicaResponse; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; @@ -541,6 +545,8 @@ public static JsonNode request(AbstractRequest request) { return ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) request).data(), request.version()); case SHARE_GROUP_HEARTBEAT: return ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) request).data(), request.version()); + case STREAMS_GROUP_HEARTBEAT: + return StreamsGroupHeartbeatRequestDataJsonConverter.write(((StreamsGroupHeartbeatRequest) request).data(), request.version()); case STOP_REPLICA: return StopReplicaRequestDataJsonConverter.write(((StopReplicaRequest) request).data(), request.version()); case SYNC_GROUP: @@ -725,6 +731,8 @@ public static JsonNode response(AbstractResponse response, short version) { return ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) response).data(), version); case SHARE_GROUP_HEARTBEAT: return ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse) response).data(), version); + case STREAMS_GROUP_HEARTBEAT: + return StreamsGroupHeartbeatResponseDataJsonConverter.write(((StreamsGroupHeartbeatResponse) response).data(), version); case STOP_REPLICA: return StopReplicaResponseDataJsonConverter.write(((StopReplicaResponse) response).data(), version); case SYNC_GROUP: