Skip to content

Commit

Permalink
Add kubernetes service discovery (#32)
Browse files Browse the repository at this point in the history
* Add service discovery

* Fixed bug

* Add order

* Make ktlint happy

* Remove invalid code

* Modify ApplicationContextAware to SPI

* Optimize code

* Optimize code

* Make ktlint happy

* Optimize code

* Optimize code

Co-authored-by: wz <[email protected]>
  • Loading branch information
GuoDuanLZ and GuoDuanLZ authored Jun 18, 2020
1 parent 8db2a71 commit 0954725
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 88 deletions.
4 changes: 2 additions & 2 deletions buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ plugins {
`java-library`
`kotlin-dsl`
id("idea")
id("com.bybutter.sisyphus.project") version "0.0.3"
id("com.bybutter.sisyphus.project") version "0.0.4-M0"
}

dependencies {
implementation(platform("com.bybutter.sisyphus:sisyphus-dependencies:0.0.3"))
implementation(platform("com.bybutter.sisyphus:sisyphus-dependencies:0.0.4-M0"))
implementation("com.bybutter.sisyphus.tools:sisyphus-protobuf-gradle-plugin")
implementation("com.bybutter.sisyphus.tools:sisyphus-project-gradle-plugin")
implementation("org.jetbrains.kotlin:kotlin-gradle-plugin")
Expand Down
2 changes: 2 additions & 0 deletions buildSrc/src/main/kotlin/dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ object Dependencies {

const val nettyTcnative = "io.netty:netty-tcnative-boringssl-static"

const val kubeJavaClient = "io.kubernetes:client-java"

const val retrofit = "com.squareup.retrofit2:retrofit"

const val okhttp = "com.squareup.okhttp3:okhttp"
Expand Down
1 change: 1 addition & 0 deletions dependencies/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
api("org.reflections:reflections:0.9.12")
api("com.github.os72:protoc-jar:3.11.4")
api("io.netty:netty-tcnative-boringssl-static:2.0.30.Final")
api("io.kubernetes:client-java:8.0.2")
api("org.apache.maven.wagon:wagon-http:3.4.0")
api("org.junit.jupiter:junit-jupiter:5.6.2")
api("org.reflections:reflections:0.9.12")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ object ProtoTypes {

fun ensureClassByProtoName(name: String): Class<*> {
return getClassByProtoName(name)
?: throw UnsupportedOperationException("Message '$name' not defined in current context.")
?: throw UnsupportedOperationException("Message '$name' not defined in current context.")
}

fun getClassByTypeUrl(url: String): Class<*>? {
Expand All @@ -134,7 +134,7 @@ object ProtoTypes {

fun ensureClassByTypeUrl(url: String): Class<*> {
return getClassByTypeUrl(url)
?: throw UnsupportedOperationException("Message '$url' not defined in current context.")
?: throw UnsupportedOperationException("Message '$url' not defined in current context.")
}

fun getSupportByProtoName(name: String): ProtoSupport<*, *>? {
Expand All @@ -143,7 +143,7 @@ object ProtoTypes {

fun ensureSupportByProtoName(name: String): ProtoSupport<*, *> {
return getSupportByProtoName(name)
?: throw UnsupportedOperationException("Message '$name' not defined in current context.")
?: throw UnsupportedOperationException("Message '$name' not defined in current context.")
}

fun getSupportByTypeUrl(url: String): ProtoSupport<*, *>? {
Expand All @@ -152,7 +152,7 @@ object ProtoTypes {

fun ensureSupportByTypeUrl(url: String): ProtoSupport<*, *> {
return getSupportByTypeUrl(url)
?: throw UnsupportedOperationException("Message '$url' not defined in current context.")
?: throw UnsupportedOperationException("Message '$url' not defined in current context.")
}

fun getProtoNameByClass(clazz: Class<*>): String? {
Expand Down Expand Up @@ -197,6 +197,14 @@ object ProtoTypes {
val extensions = extensionMap[name.trim('.')] ?: mutableMapOf()
return extensions.keys
}

fun getRegisteredServiceNames(): Set<String> {
return protoToServiceMap.keys
}

fun getRegisterService(name: String): Class<*>? {
return protoToServiceMap[name]
}
}

private data class DescriptorInfo(val file: FileDescriptorProto, val descriptor: Any)
12 changes: 12 additions & 0 deletions middleware/sisyphus-grpc-client-kubernetes/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
middleware

plugins {
`java-library`
}

description = "Middleware for grpc service discovery of kubernetes in Sisyphus Project"

dependencies {
api(project(":middleware:sisyphus-grpc-client"))
implementation(Dependencies.kubeJavaClient)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.bybutter.sisyphus.middleware.grpc.client.kubernetes

import com.bybutter.sisyphus.middleware.grpc.ClientRepository
import com.bybutter.sisyphus.protobuf.ProtoTypes
import io.kubernetes.client.openapi.apis.CoreV1Api
import io.kubernetes.client.util.Config
import java.nio.charset.Charset
import java.nio.file.Files
import java.nio.file.Paths
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory
import org.springframework.beans.factory.support.AbstractBeanDefinition
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.core.env.Environment

class KubernetesClientRepository : ClientRepository {

override var order: Int = Int.MIN_VALUE + 2000

override fun listClientBeanDefinition(beanFactory: ConfigurableListableBeanFactory, environment: Environment): List<AbstractBeanDefinition> {
val api = try {
CoreV1Api(Config.fromCluster())
} catch (e: IllegalStateException) {
throw IllegalStateException("Get config fail: $e, maybe not in k8s.")
} catch (e: NullPointerException) {
throw e
}
val path = Paths.get(Config.SERVICEACCOUNT_ROOT, "namespace")
if (!Files.exists(path)) {
throw IllegalStateException("can not find ${path.fileName}.")
}
val namespace = String(Files.readAllBytes(path), Charset.defaultCharset())
val beanDefinitionList = arrayListOf<AbstractBeanDefinition>()
val registerServiceNames = ProtoTypes.getRegisteredServiceNames()
for (serviceName in registerServiceNames) {
val list = api.listNamespacedService(namespace, null, null, null, null, serviceName, null, null, null, null)
val channel = list.items[0].spec?.ports?.get(0)?.port?.let {
createGrpcChannel(serviceName, it)
} ?: continue
val service = ProtoTypes.getRegisterService(serviceName) ?: throw IllegalStateException("Grpc service not be found.")
val client = getClientFromService(service)
val stub = getStubFromService(service)
val clientBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(client as Class<Any>) {
processStub(createGrpcClient(stub, channel), beanFactory)
}
beanDefinitionList.add(clientBeanDefinition.beanDefinition)
}
return beanDefinitionList
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.bybutter.sisyphus.middleware.grpc.client.kubernetes.KubernetesClientRepository
2 changes: 1 addition & 1 deletion middleware/sisyphus-grpc-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ description = "Middleware for using gRPC client in Sisyphus Project"
dependencies {
api(project(":lib:sisyphus-grpc"))
implementation(Dependencies.Grpc.stub)
runtime(Dependencies.nettyTcnative)
runtimeOnly(Dependencies.nettyTcnative)
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
package com.bybutter.sisyphus.middleware.grpc

import com.bybutter.sisyphus.rpc.GrpcServerConstants
import com.bybutter.sisyphus.rpc.RpcService
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientInterceptor
import io.grpc.ManagedChannelBuilder
import io.grpc.stub.AbstractStub
import kotlin.reflect.full.companionObject
import com.bybutter.sisyphus.spi.ServiceLoader
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory
import org.springframework.beans.factory.getBeansOfType
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.support.BeanDefinitionRegistry
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor
import org.springframework.context.EnvironmentAware
import org.springframework.core.annotation.AnnotationUtils
import org.springframework.core.env.Environment
import org.springframework.stereotype.Component

Expand All @@ -32,84 +22,22 @@ class ClientRegistrar : BeanDefinitionRegistryPostProcessor, EnvironmentAware {
}

override fun postProcessBeanFactory(beanFactory: ConfigurableListableBeanFactory) {
val registry = beanFactory as BeanDefinitionRegistry
val localPort = environment.getProperty(GrpcServerConstants.GRPC_PORT_PROPERTY, Int::class.java, GrpcServerConstants.DEFAULT_GRPC_PORT)
val localChannel = ManagedChannelBuilder.forTarget("localhost:$localPort").usePlaintext().userAgent("Generated by Sisyphus").build()

for (serviceName in beanFactory.getBeanNamesForAnnotation(RpcServiceImpl::class.java)) {
val serviceBeanDefinition = registry.getBeanDefinition(serviceName)

val serviceClass = Class.forName(serviceBeanDefinition.beanClassName)
val rpcService = AnnotationUtils.findAnnotation(serviceClass, RpcService::class.java) ?: continue
val service = rpcService.client.java.declaringClass
val stub = service.kotlin.companionObject?.java?.classes?.firstOrNull {
it.simpleName == "Stub"
} ?: throw IllegalStateException("Grpc service must have stub class in companion.")

val clientBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(rpcService.client.java as Class<Any>) {
processStub(createGrpcClient(stub, localChannel), beanFactory)
}

registry.registerBeanDefinition(rpcService.value, clientBeanDefinition.beanDefinition)
logger.info("Register '${rpcService.value}Clinet' via local service implement '$serviceName'")
}

val properties = beanFactory.getBeansOfType<GrpcChannelProperty>()
if (properties.isEmpty()) return

for (property in properties.values) {
val channel = createGrpcChannel(property)
beanFactory.registerSingleton(property.name, channel)

for (service in property.services) {
val rpcService = service.getAnnotation(RpcService::class.java)
?: throw IllegalStateException("Grpc service must be annotated with 'RpcService'.")
val client = service.declaredClasses.firstOrNull { it.simpleName == "Client" }
?: throw IllegalStateException("Grpc service must have nested class named 'Client'.")
val stub = service.kotlin.companionObject?.java?.classes?.firstOrNull {
it.simpleName == "Stub"
} ?: throw IllegalStateException("Grpc service must have stub class in companion.")
val clientRepositories = ServiceLoader.load(ClientRepository::class.java).sortedBy { it.order }

if (registry.containsBeanDefinition(rpcService.value)) {
continue
}
val registry = beanFactory as BeanDefinitionRegistry

val clientBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(client as Class<Any>) {
processStub(createGrpcClient(stub, channel, property), beanFactory)
for (clientRepository in clientRepositories) {
val clientBeanList = clientRepository.listClientBeanDefinition(beanFactory, environment)
for (clientBean in clientBeanList) {
val beanName = clientBean.beanClass.name
if (!registry.containsBeanDefinition(beanName)) {
registry.registerBeanDefinition(beanName, clientBean)
}

registry.registerBeanDefinition(rpcService.value, clientBeanDefinition.beanDefinition)
logger.info("Register '${rpcService.value}($service)' via remote service '${property.target}'")
}
}
}

override fun postProcessBeanDefinitionRegistry(registry: BeanDefinitionRegistry) {
}

private fun processStub(stub: AbstractStub<*>, beanFactory: ConfigurableListableBeanFactory): AbstractStub<*> {
var result = stub

val builderInterceptors = beanFactory.getBeansOfType(ClientBuilderInterceptor::class.java)
for ((_, builderInterceptor) in builderInterceptors) {
result = builderInterceptor.intercept(result)
}

val interceptors = beanFactory.getBeansOfType(ClientInterceptor::class.java)
return result.withInterceptors(*interceptors.values.toTypedArray())
}

private fun createGrpcChannel(property: GrpcChannelProperty): Channel {
return ManagedChannelBuilder.forTarget(property.target).usePlaintext().userAgent("Generated by Sisyphus").build()
}

private fun createGrpcClient(target: Class<*>, channel: Channel, property: GrpcChannelProperty): AbstractStub<*> {
return target.getDeclaredConstructor(Channel::class.java, CallOptions::class.java)
.newInstance(channel, property.options) as AbstractStub<*>
}

private fun createGrpcClient(target: Class<*>, channel: Channel): AbstractStub<*> {
return target.getDeclaredConstructor(Channel::class.java, CallOptions::class.java)
.newInstance(channel, CallOptions.DEFAULT) as AbstractStub<*>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.bybutter.sisyphus.middleware.grpc

import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientInterceptor
import io.grpc.ManagedChannelBuilder
import io.grpc.stub.AbstractStub
import kotlin.reflect.full.companionObject
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory
import org.springframework.beans.factory.support.AbstractBeanDefinition
import org.springframework.core.env.Environment

interface ClientRepository {

var order: Int

fun listClientBeanDefinition(beanFactory: ConfigurableListableBeanFactory, environment: Environment): List<AbstractBeanDefinition>

fun getStubFromService(service: Class<*>): Class<*> {
return service.kotlin.companionObject?.java?.classes?.firstOrNull {
it.simpleName == "Stub"
} ?: throw IllegalStateException("Grpc service must have stub class in companion.")
}

fun getClientFromService(service: Class<*>): Class<*> {
return service.declaredClasses.firstOrNull { it.simpleName == "Client" }
?: throw IllegalStateException("Grpc service must have nested class named 'Client'.")
}

fun processStub(stub: AbstractStub<*>, beanFactory: ConfigurableListableBeanFactory): AbstractStub<*> {
var result = stub

val builderInterceptors = beanFactory.getBeansOfType(ClientBuilderInterceptor::class.java)
for ((_, builderInterceptor) in builderInterceptors) {
result = builderInterceptor.intercept(result)
}

val interceptors = beanFactory.getBeansOfType(ClientInterceptor::class.java)
return result.withInterceptors(*interceptors.values.toTypedArray())
}

fun createGrpcChannel(property: GrpcChannelProperty): Channel {
return ManagedChannelBuilder.forTarget(property.target).usePlaintext().userAgent("Generated by Sisyphus").build()
}

fun createGrpcChannel(name: String, port: Int): Channel {
return ManagedChannelBuilder.forAddress(name, port).usePlaintext().userAgent("Generated by Sisyphus").build()
}

fun createGrpcClient(target: Class<*>, channel: Channel, property: GrpcChannelProperty): AbstractStub<*> {
return target.getDeclaredConstructor(Channel::class.java, CallOptions::class.java)
.newInstance(channel, property.options) as AbstractStub<*>
}

fun createGrpcClient(target: Class<*>, channel: Channel): AbstractStub<*> {
return target.getDeclaredConstructor(Channel::class.java, CallOptions::class.java)
.newInstance(channel, CallOptions.DEFAULT) as AbstractStub<*>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.bybutter.sisyphus.middleware.grpc

import com.bybutter.sisyphus.rpc.GrpcServerConstants
import com.bybutter.sisyphus.rpc.RpcService
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory
import org.springframework.beans.factory.support.AbstractBeanDefinition
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.core.annotation.AnnotationUtils
import org.springframework.core.env.Environment

class LocalClientRepository : ClientRepository {

override var order: Int = Int.MIN_VALUE + 1000

override fun listClientBeanDefinition(beanFactory: ConfigurableListableBeanFactory, environment: Environment): List<AbstractBeanDefinition> {
val localPort = environment.getProperty(GrpcServerConstants.GRPC_PORT_PROPERTY, Int::class.java, GrpcServerConstants.DEFAULT_GRPC_PORT)
val localChannel = createGrpcChannel("localhost", localPort)
val beanDefinitionList = arrayListOf<AbstractBeanDefinition>()
for (serviceName in beanFactory.getBeanNamesForAnnotation(RpcServiceImpl::class.java)) {
val serviceBeanDefinition = beanFactory.getBeanDefinition(serviceName)
val serviceClass = Class.forName(serviceBeanDefinition.beanClassName)
val rpcService = AnnotationUtils.findAnnotation(serviceClass, RpcService::class.java) ?: continue
val service = rpcService.client.java.declaringClass
val stub = getStubFromService(service)
val clientBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(rpcService.client.java as Class<Any>) {
processStub(createGrpcClient(stub, localChannel), beanFactory)
}
beanDefinitionList.add(clientBeanDefinition.beanDefinition)
}
return beanDefinitionList
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.bybutter.sisyphus.middleware.grpc

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory
import org.springframework.beans.factory.getBeansOfType
import org.springframework.beans.factory.support.AbstractBeanDefinition
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.core.env.Environment

class RemoteClientRepository : ClientRepository {

override var order: Int = Int.MAX_VALUE - 1000

override fun listClientBeanDefinition(beanFactory: ConfigurableListableBeanFactory, environment: Environment): List<AbstractBeanDefinition> {
val properties = beanFactory.getBeansOfType<GrpcChannelProperty>()
if (properties.isEmpty()) return arrayListOf()
val beanDefinitionList = arrayListOf<AbstractBeanDefinition>()
for (property in properties.values) {
val channel = createGrpcChannel(property)
beanFactory.registerSingleton(property.name, channel)
for (service in property.services) {
val client = getClientFromService(service)
val stub = getStubFromService(service)
val clientBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(client as Class<Any>) {
processStub(createGrpcClient(stub, channel, property), beanFactory)
}
beanDefinitionList.add(clientBeanDefinition.beanDefinition)
}
}
return beanDefinitionList
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
com.bybutter.sisyphus.middleware.grpc.LocalClientRepository
com.bybutter.sisyphus.middleware.grpc.RemoteClientRepository
Loading

0 comments on commit 0954725

Please sign in to comment.