Skip to content

fix(shulker-proxy-agent): crash when LoadBalancer service does not have any ingress in status #627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<'a> ResourceBuilder<'a> for ProxyRoleBuilder {
PolicyRule {
api_groups: Some(vec!["".to_string()]),
resources: Some(vec!["services".to_string()]),
verbs: vec!["get".to_string()],
verbs: vec!["get".to_string(), "watch".to_string()],
..PolicyRule::default()
},
PolicyRule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rules:
- services
verbs:
- get
- watch
- apiGroups:
- ""
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package io.shulkermc.proxyagent.adapters.kubernetes

import io.fabric8.kubernetes.api.model.ObjectReference
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder
import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.KubernetesClientBuilder
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import io.fabric8.kubernetes.client.informers.SharedIndexInformer
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
import io.shulkermc.proxyagent.Configuration
import io.shulkermc.proxyagent.adapters.kubernetes.models.AgonesV1GameServer
Expand All @@ -16,6 +18,8 @@ class ImplKubernetesGatewayAdapter(proxyNamespace: String, proxyName: String) :
companion object {
private const val PROXY_INFORMER_SYNC_MS = 30L * 1000
private const val SERVER_INFORMER_SYNC_MS = 10L * 1000
private const val SERVICE_INFORMER_SYNC_MS = 30L * 1000
private const val MINECRAFT_DEFAULT_PORT = 25565
}

private val kubernetesClient: KubernetesClient =
Expand Down Expand Up @@ -48,27 +52,13 @@ class ImplKubernetesGatewayAdapter(proxyNamespace: String, proxyName: String) :
).list()
}

override fun getFleetServiceAddress(): Optional<InetSocketAddress> {
if (Configuration.PROXY_PREFERRED_RECONNECT_ADDRESS.isPresent) {
return Configuration.PROXY_PREFERRED_RECONNECT_ADDRESS
}

val service =
override fun getExternalAddress(): Optional<InetSocketAddress> {
return this.getExternalAddressFromService(
this.kubernetesClient.services()
.inNamespace(Configuration.PROXY_NAMESPACE)
.withName(Configuration.PROXY_FLEET_NAME)
.get()

if (service.spec.type != "LoadBalancer") {
return Optional.empty()
}

return try {
val ingress = service.status.loadBalancer.ingress[0]
Optional.of(InetSocketAddress(ingress.ip, ingress.ports[0].port))
} catch (_: NullPointerException) {
Optional.empty()
}
.get(),
)
}

override fun watchProxyEvents(
Expand All @@ -81,14 +71,7 @@ class ImplKubernetesGatewayAdapter(proxyNamespace: String, proxyName: String) :
.withLabel("app.kubernetes.io/component", "proxy")
.inform(eventHandler, PROXY_INFORMER_SYNC_MS)

return proxyInformer.start()
.thenApply {
object : KubernetesGatewayAdapter.EventWatcher {
override fun stop() {
proxyInformer.stop()
}
}
}
return this.createEventWatcher(proxyInformer)
}

override fun watchMinecraftServerEvents(
Expand All @@ -101,12 +84,32 @@ class ImplKubernetesGatewayAdapter(proxyNamespace: String, proxyName: String) :
.withLabel("app.kubernetes.io/component", "minecraft-server")
.inform(eventHandler, SERVER_INFORMER_SYNC_MS)

return minecraftServerInformer.start().thenApply {
object : KubernetesGatewayAdapter.EventWatcher {
override fun stop() {
minecraftServerInformer.stop()
}
return this.createEventWatcher(minecraftServerInformer)
}

override fun watchExternalAddressUpdates(
callback: (address: Optional<InetSocketAddress>) -> Unit,
): CompletionStage<KubernetesGatewayAdapter.EventWatcher> {
val eventHandler =
this.createEventHandler<Service> { _, service ->
callback(this.getExternalAddressFromService(service))
}

val serviceInformer =
this.kubernetesClient.services()
.inNamespace(Configuration.PROXY_NAMESPACE)
.withName(Configuration.PROXY_FLEET_NAME)
.inform(eventHandler, SERVICE_INFORMER_SYNC_MS)

return this.createEventWatcher(serviceInformer)
}

private fun getExternalAddressFromService(service: Service): Optional<InetSocketAddress> {
return if (service.spec.type == "LoadBalancer") {
Optional.ofNullable(service.status.loadBalancer?.ingress?.firstOrNull())
.map { ingress -> InetSocketAddress(ingress.ip, MINECRAFT_DEFAULT_PORT) }
} else {
Optional.empty()
}
}

Expand All @@ -131,4 +134,14 @@ class ImplKubernetesGatewayAdapter(proxyNamespace: String, proxyName: String) :
}
}
}

private fun <T> createEventWatcher(informer: SharedIndexInformer<T>): CompletionStage<KubernetesGatewayAdapter.EventWatcher> {
return informer.start().thenApply {
object : KubernetesGatewayAdapter.EventWatcher {
override fun stop() {
informer.stop()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface KubernetesGatewayAdapter {

fun listMinecraftServers(): AgonesV1GameServer.List

fun getFleetServiceAddress(): Optional<InetSocketAddress>
fun getExternalAddress(): Optional<InetSocketAddress>

fun watchProxyEvents(
callback: (action: WatchAction, proxy: AgonesV1GameServer) -> Unit,
Expand All @@ -26,6 +26,10 @@ interface KubernetesGatewayAdapter {
callback: (action: WatchAction, minecraftServer: AgonesV1GameServer) -> Unit,
): CompletionStage<EventWatcher>

fun watchExternalAddressUpdates(
callback: (address: Optional<InetSocketAddress>) -> Unit,
): CompletionStage<EventWatcher>

interface EventWatcher {
fun stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.shulkermc.proxyagent.platform.ProxyPingHookResult
import io.shulkermc.proxyagent.platform.ServerPreConnectHookResult
import io.shulkermc.proxyagent.utils.createDisconnectMessage
import net.kyori.adventure.text.format.NamedTextColor
import java.net.InetSocketAddress
import java.util.Optional
import java.util.UUID

Expand Down Expand Up @@ -51,26 +52,20 @@ class PlayerMovementService(private val agent: ShulkerProxyAgentCommon) {
java.util.concurrent.TimeUnit.SECONDS,
)

private val externalClusterAddress = this.agent.kubernetesGateway.getFleetServiceAddress()

private var externalClusterAddress: Optional<InetSocketAddress> = Optional.empty()
private var isAllocatedByAgones = false
private var acceptingPlayers = true

init {
this.externalClusterAddress.ifPresentOrElse({ addr ->
this.agent.logger.info("Found fleet's external address: ${addr.hostName}")
}, {
this.agent.logger.info(
"Fleet external address was not found, transfer capabilities will be disabled",
)
})

this.agent.proxyInterface.addProxyPingHook(this::onProxyPing, HookPostOrder.FIRST)
this.agent.proxyInterface.addPlayerPreLoginHook(this::onPlayerPreLogin, HookPostOrder.FIRST)
this.agent.proxyInterface.addPlayerLoginHook(this::onPlayerLogin, HookPostOrder.EARLY)
this.agent.proxyInterface.addPlayerDisconnectHook(this::onPlayerDisconnect, HookPostOrder.LATE)
this.agent.proxyInterface.addServerPreConnectHook(this::onServerPreConnect, HookPostOrder.EARLY)
this.agent.proxyInterface.addServerPostConnectHook(this::onServerPostConnect, HookPostOrder.LATE)

this.onExternalAddressUpdate(this.agent.kubernetesGateway.getExternalAddress())
this.agent.kubernetesGateway.watchExternalAddressUpdates(this::onExternalAddressUpdate)
}

fun setAcceptingPlayers(acceptingPlayers: Boolean) {
Expand Down Expand Up @@ -174,6 +169,18 @@ class PlayerMovementService(private val agent: ShulkerProxyAgentCommon) {
this.agent.cache.setPlayerPosition(player.uniqueId, Configuration.PROXY_NAME, serverName)
}

private fun onExternalAddressUpdate(address: Optional<InetSocketAddress>) {
this.externalClusterAddress = address

if (address.isPresent) {
this.agent.logger.info("Updated fleet's external address: ${address.get()}")
} else {
this.agent.logger.warning(
"Fleet external address was not found or removed, transfer capabilities are disabled",
)
}
}

private fun isProxyConsideredFull(): Boolean {
return this.agent.proxyInterface.getPlayerCount() >= this.maxPlayersWithExclusionDelta
}
Expand Down
Loading