Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasbru committed Dec 17, 2024
1 parent 337fb8c commit d163f6e
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class StreamsInvalidTopologyEpochException extends ApiException {
public StreamsInvalidTopologyEpochException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class StreamsInvalidTopologyException extends ApiException {
public StreamsInvalidTopologyException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class StreamsTopologyFencedException extends ApiException {
public StreamsTopologyFencedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public enum ApiKeys {
READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true),
WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true);
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true),
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT);


private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -413,7 +416,10 @@ public enum Errors {
DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new),
VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new),
INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", InvalidRegularExpression::new),
REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new);
REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new),
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return DeleteShareGroupStateRequest.parse(buffer, apiVersion);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return DeleteShareGroupStateResponse.parse(responseBuffer, version);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;

public class StreamsGroupHeartbeatRequest extends AbstractRequest {

/**
* A member epoch of <code>-1</code> means that the member wants to leave the group.
*/
public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2;

/**
* A member epoch of <code>0</code> means that the member wants to join the group.
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;

public static class Builder extends AbstractRequest.Builder<StreamsGroupHeartbeatRequest> {
private final StreamsGroupHeartbeatRequestData data;

public Builder(StreamsGroupHeartbeatRequestData data) {
this(data, false);
}

public Builder(StreamsGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT, enableUnstableLastVersion);
this.data = data;
}

@Override
public StreamsGroupHeartbeatRequest build(short version) {
return new StreamsGroupHeartbeatRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final StreamsGroupHeartbeatRequestData data;

public StreamsGroupHeartbeatRequest(StreamsGroupHeartbeatRequestData data, short version) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT, version);
this.data = data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new StreamsGroupHeartbeatResponse(
new StreamsGroupHeartbeatResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
);
}

@Override
public StreamsGroupHeartbeatRequestData data() {
return data;
}

public static StreamsGroupHeartbeatRequest parse(ByteBuffer buffer, short version) {
return new StreamsGroupHeartbeatRequest(new StreamsGroupHeartbeatRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#FENCED_MEMBER_EPOCH}
* - {@link Errors#UNSUPPORTED_ASSIGNOR}
* - {@link Errors#UNRELEASED_INSTANCE_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH}
* - {@link Errors#STREAMS_TOPOLOGY_FENCED}
*/
public class StreamsGroupHeartbeatResponse extends AbstractResponse {
private final StreamsGroupHeartbeatResponseData data;

public StreamsGroupHeartbeatResponse(StreamsGroupHeartbeatResponseData data) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT);
this.data = data;
}

@Override
public StreamsGroupHeartbeatResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static StreamsGroupHeartbeatResponse parse(ByteBuffer buffer, short version) {
return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData(
new ByteBufferAccessor(buffer), version));
}

public enum Status {
STALE_TOPOLOGY((byte) 0, "The topology epoch supplied is inconsistent with the topology for this streams group."),
MISSING_SOURCE_TOPICS((byte) 1, "One or more source topics are missing or a source topic regex resolves to zero topics."),
INCORRECTLY_PARTITIONED_TOPICS((byte) 2, "One or more topics expected to be copartitioned are not copartitioned."),
MISSING_INTERNAL_TOPICS((byte) 3, "One or more internal topics are missing."),
SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the whole application.");

private final byte code;
private final String message;

Status(final byte code, final String message) {
this.code = code;
this.message = message;
}

public byte code() {
return code;
}

public String message() {
return message;
}
}
}
Loading

0 comments on commit d163f6e

Please sign in to comment.