Skip to content

Commit

Permalink
KAFKA-17367: Introduce share coordinator [2/N] (#17011)
Browse files Browse the repository at this point in the history
Introduces the share coordinator. This coordinator is built on the new coordinator runtime framework. It 
is responsible for persistence of share-group state in a new internal topic named "__share_group_state".
The responsibility for being a share coordinator is distributed across the brokers in a cluster. 

Reviewers: David Arthur <[email protected]>, Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>
  • Loading branch information
smjn authored Sep 10, 2024
1 parent 92672d1 commit 821c101
Show file tree
Hide file tree
Showing 41 changed files with 4,238 additions and 424 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ project(':core') {
implementation project(':server')
implementation project(':coordinator-common')
implementation project(':share')
implementation project(':share-coordinator')

implementation libs.argparse4j
implementation libs.commonsValidator
Expand Down Expand Up @@ -1054,6 +1055,7 @@ project(':core') {

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':group-coordinator').sourceSets.test.output
testImplementation project(':share-coordinator').sourceSets.test.output
testImplementation project(':metadata').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down Expand Up @@ -1586,6 +1588,7 @@ project(':coordinator-common') {
implementation project(':storage')
implementation libs.slf4jApi
implementation libs.metrics
implementation libs.hdrHistogram

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down Expand Up @@ -1632,7 +1635,10 @@ project(':share-coordinator') {
implementation project(':clients')
implementation project(':coordinator-common')
implementation project(':metadata')
implementation project(':server')
implementation project(':server-common')
implementation libs.slf4jApi
implementation libs.metrics

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control-coordinator-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
Expand All @@ -62,6 +63,7 @@
<allow pkg="org.apache.kafka.storage.internals.log" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<allow pkg="org.HdrHistogram" />
</subpackage>
</subpackage>
</subpackage>
Expand Down
19 changes: 19 additions & 0 deletions checkstyle/import-control-share-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@

<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
Expand All @@ -43,15 +46,31 @@
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.coordinator.common" />
<allow pkg="org.apache.kafka.coordinator.share.generated" />
<allow pkg="org.apache.kafka.coordinator.share.metrics" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.group.share" />
<allow pkg="org.apache.kafka.server.record" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.server.util.timer" />
<allow pkg="org.apache.kafka.timeline" />
<allow pkg="org.junit.jupiter.api" />
<allow pkg="org.mockito" />
<allow pkg="org.slf4j" />
<subpackage name="generated">
<allow pkg="com.fasterxml.jackson" />
</subpackage>
<subpackage name="metrics">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
</subpackage>
</subpackage>
</import-control>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ public static WriteShareGroupStateResponse parse(ByteBuffer buffer, short versio
);
}

public static WriteShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) {
return new WriteShareGroupStateResponseData()
.setResults(Collections.singletonList(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(
new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)))));
}

public static WriteShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData();
responseData.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult()
Expand All @@ -92,4 +102,9 @@ public static WriteShareGroupStateResponseData.WriteStateResult toResponseWriteS
.setTopicId(topicId)
.setPartitions(partitionResults);
}

public static WriteShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) {
return new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId);
}
}
4 changes: 4 additions & 0 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# Share state topic settings
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
Expand Down
4 changes: 4 additions & 0 deletions config/kraft/controller.properties
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# Share state topic settings
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
Expand Down
4 changes: 4 additions & 0 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# Share state topic settings
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.coordinator.common.runtime;

import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;

import org.slf4j.Logger;

import java.util.function.BiFunction;

public class CoordinatorOperationExceptionHelper {
/**
* This is the handler commonly used by all the operations that requires to convert errors to
* coordinator errors. The handler also handles and log unexpected errors.
*
* @param operationName The name of the operation.
* @param operationInput The operation's input for logging purposes.
* @param exception The exception to handle.
* @param handler A function which takes an Errors and a String and builds the expected
* output. The String can be null. Note that the function could further
* transform the error depending on the context.
* @return The output built by the handler.
* @param <IN> The type of the operation input. It must be a toString'able object.
* @param <OUT> The type of the value returned by handler.
*/
public static <IN, OUT> OUT handleOperationException(
String operationName,
IN operationInput,
Throwable exception,
BiFunction<Errors, String, OUT> handler,
Logger log
) {
ApiError apiError = ApiError.fromThrowable(exception);

switch (apiError.error()) {
case UNKNOWN_SERVER_ERROR:
log.error("Operation {} with {} hit an unexpected exception: {}.",
operationName, operationInput, exception.getMessage(), exception);
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);

case NETWORK_EXCEPTION:
// When committing offsets transactionally, we now verify the transaction with the
// transaction coordinator. Verification can fail with `NETWORK_EXCEPTION`, a
// retriable error which older clients may not expect and retry correctly. We
// translate the error to `COORDINATOR_LOAD_IN_PROGRESS` because it causes clients
// to retry the request without an unnecessary coordinator lookup.
return handler.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);

case UNKNOWN_TOPIC_OR_PARTITION:
case NOT_ENOUGH_REPLICAS:
case REQUEST_TIMED_OUT:
return handler.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);

case NOT_LEADER_OR_FOLLOWER:
case KAFKA_STORAGE_ERROR:
return handler.apply(Errors.NOT_COORDINATOR, null);

case MESSAGE_TOO_LARGE:
case RECORD_LIST_TOO_LARGE:
case INVALID_FETCH_SIZE:
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);

default:
return handler.apply(apiError.error(), apiError.message());
}
}
}
Loading

0 comments on commit 821c101

Please sign in to comment.