Skip to content

Commit

Permalink
Add kcl multi example
Browse files Browse the repository at this point in the history
  • Loading branch information
abendt committed Apr 25, 2024
1 parent d01de3b commit 9afa0ae
Show file tree
Hide file tree
Showing 15 changed files with 369 additions and 102 deletions.
6 changes: 0 additions & 6 deletions camel/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ configurations {
}
}

repositories {
maven {
url = uri("https://repository.apache.org/content/repositories/snapshots")
}
}

tasks.withType<KotlinCompile>().configureEach {
kotlinOptions {
freeCompilerArgs += "-Xcontext-receivers"
Expand Down
199 changes: 199 additions & 0 deletions kcl/src/main/kotlin/multi/KinesisConsumer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package multi

import java.nio.ByteBuffer
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.UUID
import mu.KotlinLogging
import software.amazon.awssdk.arns.Arn
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
import software.amazon.kinesis.common.ConfigsBuilder
import software.amazon.kinesis.common.InitialPositionInStream
import software.amazon.kinesis.common.InitialPositionInStreamExtended
import software.amazon.kinesis.common.StreamConfig
import software.amazon.kinesis.common.StreamIdentifier
import software.amazon.kinesis.coordinator.Scheduler
import software.amazon.kinesis.coordinator.WorkerStateChangeListener
import software.amazon.kinesis.lifecycle.events.InitializationInput
import software.amazon.kinesis.lifecycle.events.LeaseLostInput
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput
import software.amazon.kinesis.lifecycle.events.ShardEndedInput
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy
import software.amazon.kinesis.processor.MultiStreamTracker
import software.amazon.kinesis.processor.ShardRecordProcessor
import software.amazon.kinesis.processor.ShardRecordProcessorFactory
import software.amazon.kinesis.retrieval.polling.PollingConfig

interface KinesisConsumerConfiguration<T> {
fun processorInitialized()

fun convertPayload(buffer: ByteBuffer): T

fun processPayload(payload: T)
}

class KinesisConsumer<T>(
val applicationName: String,
val streamName: List<String>,
val kinesisClient: KinesisAsyncClient,
val dynamoDbClient: DynamoDbAsyncClient,
val cloudWatchClient: CloudWatchAsyncClient,
val config: KinesisConsumerConfiguration<T>,
) {
val logger = KotlinLogging.logger {}

@Volatile
private var scheduler: Scheduler? = null

@Volatile
private var thread: Thread? = null

fun start() {
val workerIdentifier = UUID.randomUUID().toString()

val streamConfigs =
streamName.map {
kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(it).build())
}.map {
val response = it.get()

StreamConfig(
StreamIdentifier.multiStreamInstance(
Arn.fromString(response.streamDescription().streamARN()),
response.streamDescription().streamCreationTimestamp().epochSecond,
),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON),
)
}

val tracker: MultiStreamTracker =
object : MultiStreamTracker {
override fun streamConfigList(): List<StreamConfig> = streamConfigs

override fun formerStreamsLeasesDeletionStrategy(): FormerStreamsLeasesDeletionStrategy {
return object : FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() {
override fun waitPeriodToDeleteFormerStreams(): Duration {
return Duration.of(30, ChronoUnit.MINUTES)
}
}
}
}

val configsBuilder =
ConfigsBuilder(
tracker,
applicationName,
kinesisClient,
dynamoDbClient,
cloudWatchClient,
workerIdentifier,
MyShardRecordProcessorFactory(config),
)

var workerState: WorkerStateChangeListener.WorkerState? = null
val delegatingListener =
WorkerStateChangeListener {
if (workerState == null) {
logger.info { "worker state $it" }
} else {
logger.info { "worker state $workerState => $it" }
}

workerState = it
}

scheduler =
Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig().workerStateChangeListener(delegatingListener),
configsBuilder.leaseManagementConfig()
.shardSyncIntervalMillis(2000)
.failoverTimeMillis(2000)
.listShardsBackoffTimeInMillis(2000),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder
.retrievalConfig().retrievalSpecificConfig(PollingConfig(kinesisClient)),
)

logger.info { "starting scheduler" }
thread =
Thread(scheduler).apply {
isDaemon = true
start()
}
logger.info { "started scheduler" }
}

fun stop() {
scheduler?.let {
logger.info { "stopping scheduler" }
val stopped = it.startGracefulShutdown().get()
logger.info { "stopped gracefully $stopped" }
}

thread?.join()
}
}

class MyShardRecordProcessorFactory<T>(val config: KinesisConsumerConfiguration<T>) :
ShardRecordProcessorFactory {
val logger = KotlinLogging.logger {}

override fun shardRecordProcessor(): ShardRecordProcessor {
logger.info { "create record processor" }

return MyShardProcessor(config)
}
}

class MyShardProcessor<T>(val config: KinesisConsumerConfiguration<T>) : ShardRecordProcessor {
val logger = KotlinLogging.logger {}
var lastSequenceNumber: String? = null

override fun initialize(initialization: InitializationInput) {
logger.info { "initialize $initialization" }

config.processorInitialized()
}

override fun processRecords(processRecords: ProcessRecordsInput) {
logger.info { "processRecords $processRecords" }

processRecords.records().asSequence().map {
config.convertPayload(it.data())
}.forEach { config.processPayload(it) }

lastSequenceNumber = processRecords.records().last().sequenceNumber()

processRecords.checkpointer().checkpoint()
logger.info { "checkpointed ${processRecords.records().last().sequenceNumber()}" }
}

override fun leaseLost(leaseLost: LeaseLostInput) {
logger.info { "leaseLost" }
}

override fun shardEnded(shardEnded: ShardEndedInput) {
logger.info { "shardEnded" }

lastSequenceNumber?.let {
shardEnded.checkpointer().checkpoint(it)
logger.info { "Checkpointed previous sequence" }
}
}

override fun shutdownRequested(shutdownRequested: ShutdownRequestedInput) {
logger.info { "shutdownRequested" }

lastSequenceNumber?.let {
shutdownRequested.checkpointer().checkpoint(it)
logger.info { "Checkpointed previous sequence" }
}
}
}
50 changes: 50 additions & 0 deletions kcl/src/test/kotlin/config/KinesisTestBase.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package config

import io.kotest.core.extensions.install
import io.kotest.core.spec.style.StringSpec
import io.kotest.extensions.testcontainers.ContainerExtension
import mu.KotlinLogging
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import utils.KinesisFixture
import utils.KinesisStreamTestScope

abstract class KinesisTestBase(block: KinesisTestBase.() -> Unit = {}) : StringSpec() {
val logger = KotlinLogging.logger {}

val localstack =
install(ContainerExtension(LocalStackContainer(DockerImageName.parse("localstack/localstack")))) {
}

val kinesisClient =
KinesisAsyncClient.builder()
.configureForLocalstack(localstack)
.build()

val dynamoClient =
DynamoDbAsyncClient.builder()
.configureForLocalstack(localstack)
.build()

val cloudWatchClient =
CloudWatchAsyncClient.builder()
.configureForLocalstack(localstack)
.build()

init {
block()
}

val kinesisFixture = KinesisFixture(KinesisClient.builder().configureForLocalstack(localstack).build())

suspend fun withKinesisStream(
withShards: Int = 1,
block: suspend KinesisStreamTestScope.() -> Unit,
) {
kinesisFixture.withKinesisStream(withShards = withShards, block = block)
}
}
2 changes: 1 addition & 1 deletion kcl/src/test/kotlin/kcl/ConsumeKinesisEventSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContain
import kotlin.time.Duration.Companion.seconds

class ConsumeKinesisEventSpec : KinesisConsumerBase({
class ConsumeKinesisEventSpec : KclTestBase({
"can consume kinesis events" {
withKinesisStream {
sendEvent("First")
Expand Down
2 changes: 1 addition & 1 deletion kcl/src/test/kotlin/kcl/ConsumeMultipleShardsSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldHaveSize
import kotlin.time.Duration.Companion.seconds

class ConsumeMultipleShardsSpec : KinesisConsumerBase({
class ConsumeMultipleShardsSpec : KclTestBase({
"can consume from multiple shards" {
withKinesisStream(withShards = 2) {
withKinesisConsumer {
Expand Down
4 changes: 2 additions & 2 deletions kcl/src/test/kotlin/kcl/ErrorHandlingSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import io.kotest.matchers.collections.shouldNotContain
import io.kotest.matchers.shouldBe
import kotlin.time.Duration.Companion.seconds

class ErrorHandlingSpec : KinesisConsumerBase({
"erroneous events are not retried" {
class ErrorHandlingSpec : KclTestBase({
"failing events are not retried" {
withKinesisStream {
withKinesisConsumer(shouldFailPermanentlyOn = setOf("First")) {
sendEvent("First")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,59 +1,18 @@
package kcl

import config.configureForLocalstack
import io.kotest.core.extensions.install
import io.kotest.core.spec.style.StringSpec
import io.kotest.extensions.testcontainers.ContainerExtension
import config.KinesisTestBase
import java.nio.ByteBuffer
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import mu.KotlinLogging
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import utils.KinesisFixture
import utils.KinesisStreamTestScope

abstract class KinesisConsumerBase(block: KinesisConsumerBase.() -> Unit) : StringSpec() {
val logger = KotlinLogging.logger {}

val localstack =
install(ContainerExtension(LocalStackContainer(DockerImageName.parse("localstack/localstack")))) {
}

val kinesisClient =
KinesisAsyncClient.builder()
.configureForLocalstack(localstack)
.build()

val dynamoClient =
DynamoDbAsyncClient.builder()
.configureForLocalstack(localstack)
.build()

val cloudWatchClient =
CloudWatchAsyncClient.builder()
.configureForLocalstack(localstack)
.build()

abstract class KclTestBase(block: KclTestBase.() -> Unit) : KinesisTestBase() {
init {
block()
}

val kinesisFixture = KinesisFixture(KinesisClient.builder().configureForLocalstack(localstack).build())

suspend fun withKinesisStream(
withShards: Int = 1,
block: suspend KinesisStreamTestScope.() -> Unit,
) {
kinesisFixture.withKinesisStream(withShards = withShards, block = block)
}

interface KinesisConsumerTestScope {
val processorInvoked: Int
val eventsReceived: List<String>
Expand Down
2 changes: 1 addition & 1 deletion kcl/src/test/kotlin/kcl/ReconsumeAfterRestartSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.kotest.matchers.collections.shouldNotContain
import io.kotest.matchers.equals.shouldBeEqual
import kotlin.time.Duration.Companion.seconds

class ReconsumeAfterRestartSpec : KinesisConsumerBase({
class ReconsumeAfterRestartSpec : KclTestBase({
"not committed is re-delivered after restart" {
withKinesisStream {
withKinesisConsumer(shouldFailPermanently = true) {
Expand Down
2 changes: 1 addition & 1 deletion kcl/src/test/kotlin/kcl2/CanConsumeEventsSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContainAll
import kotlin.time.Duration.Companion.seconds

class CanConsumeEventsSpec : KinesisConsumerBase({
class CanConsumeEventsSpec : Kcl2TestBase({

"can consume events" {
withKinesisStream {
Expand Down
Loading

0 comments on commit 9afa0ae

Please sign in to comment.