Skip to content

Commit

Permalink
KAFKA-18136: Remove zk migration from code base (#18016)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
frankvicky authored Dec 12, 2024
1 parent 4c5ea05 commit 772aa24
Show file tree
Hide file tree
Showing 40 changed files with 71 additions and 1,465 deletions.
9 changes: 1 addition & 8 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,13 @@ object Kafka extends Logging {
props
}

// For Zk mode, the API forwarding is currently enabled only under migration flag. We can
// directly do a static IBP check to see API forwarding is enabled here because IBP check is
// static in Zk mode.
private def enableApiForwarding(config: KafkaConfig) =
config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled

private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, doLog = false)
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = enableApiForwarding(config)
threadNamePrefix = None
)
} else {
new KafkaRaftServer(
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Meter, Timer}
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback, ZkMigrationStateMetricName}
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
Expand All @@ -42,7 +42,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
Expand Down Expand Up @@ -81,11 +80,9 @@ object KafkaController extends Logging {
private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount"
private val ActiveBrokerCountMetricName = "ActiveBrokerCount"
private val FencedBrokerCountMetricName = "FencedBrokerCount"
private val ZkMigrationStateMetricName = "ZkMigrationState"

// package private for testing
private[controller] val MetricNames = Set(
ZkMigrationStateMetricName,
ActiveControllerCountMetricName,
OfflinePartitionsCountMetricName,
PreferredReplicaImbalanceCountMetricName,
Expand Down Expand Up @@ -174,7 +171,6 @@ class KafkaController(val config: KafkaConfig,
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")

metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK.value().intValue())
metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0)
metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount)
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ object KafkaRaftManager {
// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers
if (config.processRoles.nonEmpty) {
throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")
} else if (!config.migrationEnabled) {
throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")
} else {
val metadataDir = new File(config.metadataLogDir)
val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3160,7 +3160,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new KafkaPrincipal(entry.principalType, entry.principalName))

// DelegationToken changes only need to be executed on the controller during migration
if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
Errors.NOT_CONTROLLER, owner, requester))
Expand Down Expand Up @@ -3204,7 +3204,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
Expand Down Expand Up @@ -3250,7 +3250,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()
Expand Down
43 changes: 4 additions & 39 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty

val migrationEnabled: Boolean = getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG)
val migrationMetadataMinBatchSize: Int = getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG)

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
Expand Down Expand Up @@ -804,9 +801,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.")
}
if (brokerIdGenerationEnable) {
if (migrationEnabled) {
require(brokerId >= 0, "broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).")
}
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
} else {
require(brokerId >= 0, "broker.id must be greater than or equal to 0")
Expand All @@ -817,11 +811,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Missing configuration `${KRaftConfigs.NODE_ID_CONFIG}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
if (migrationEnabled) {
if (zkConnect == null) {
throw new ConfigException(s"If using `${KRaftConfigs.MIGRATION_ENABLED_CONFIG}` in KRaft mode, `${ZkConfigs.ZK_CONNECT_CONFIG}` must also be set.")
}
}
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
Expand All @@ -846,15 +835,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
)
}
}
def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
throw new ConfigException(
s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
|set of controllers.""".stripMargin.replace("\n", " ")
)
}
}

def validateControlPlaneListenerEmptyForKRaft(): Unit = {
require(controlPlaneListenerName.isEmpty,
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
Expand Down Expand Up @@ -922,25 +903,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else {
// ZK-based
if (migrationEnabled) {
require(brokerId >= 0,
"broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).")
validateQuorumVotersAndQuorumBootstrapServerForMigration()
require(controllerListenerNames.nonEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
s"'${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}' to 3.4 or higher")
if (logDirs.size > 1) {
require(interBrokerProtocolVersion.isDirectoryAssignmentSupported,
s"Cannot enable ZooKeeper migration with multiple log directories (aka JBOD) without setting " +
s"'${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}' to ${MetadataVersion.IBP_3_7_IV2} or higher")
}
} else {
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
}
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
}

val listenerNames = listeners.map(_.listenerName).toSet
Expand Down
Loading

0 comments on commit 772aa24

Please sign in to comment.