Skip to content

Banno/kafka4s

Folders and files

NameName
Last commit message
Last commit date
Jan 2, 2025
Mar 12, 2024
Dec 30, 2024
Mar 12, 2024
Mar 19, 2025
Jan 3, 2024
Jan 2, 2024
Jan 14, 2025
Jun 2, 2022
Jan 4, 2022
Jan 11, 2024
Mar 11, 2025
Jun 24, 2019
Nov 3, 2021
Dec 13, 2019
Jun 24, 2019
Jun 24, 2019
Feb 6, 2024
Mar 13, 2025
Jul 31, 2024

Repository files navigation

kafka4s - Functional programming with Kafka and Scala

CI Maven Central Javadocs License Code of Conduct

kafka4s provides pure, referentially transparent functions for working with Kafka, and integrates with FP libraries such as cats-effect and fs2.

Quick Start

To use kafka4s in an existing SBT project with Scala 2.13 or a later version, add the following dependencies to your build.sbt depending on your needs:

libraryDependencies ++= Seq(
  "com.banno" %% "kafka4s" % "<version>"
)

Note: If your project uses fs2 1.x, you'll want releases from the 2.x series. For fs2 2.x projects, you'll want 3.x series releases.

Sending records to Kafka is an effect. If we wanted to periodically write random integers to a Kafka topic, we could do:

Stream
  .resource(ProducerApi.resource[F, Int, Int](BootstrapServers(kafkaBootstrapServers)))
  .flatMap { producer =>
    Stream
      .awakeDelay[F](1.second)
      .evalMap { _ =>
        Sync[F].delay(Random.nextInt()).flatMap { i =>
          producer.sendAndForget(new ProducerRecord(topic.name, i, i))
        }
      }
  }

Polling Kafka for records is also an effect, and we can obtain a stream of records from a topic. We can print the even random integers from the above topic using:

Stream.resource(
   ConsumerApi
      .resource[F, Int, Int](
        BootstrapServers(kafkaBootstrapServers),
        GroupId("example3"),
        AutoOffsetReset.earliest,
        EnableAutoCommit(true)
      )
  )
  .evalTap(_.subscribe(topic.name))
  .flatMap(
    _.recordStream(1.second)
      .map(_.value)
      .filter(_ % 2 == 0)
      .evalMap(i => Sync[F].delay(println(i)))
  )

Learning more

To learn more about kafka4s, start with our Getting Started Guide, play with some example apps, and check out the kafka4s Scaladoc for more info.

Running the examples

To run the examples, setup the following:

  • Pull down the docker image in the project directory:
docker-compose up -d
  • Add local host alias kafka.local to your machines /etc/hosts file. You will need to use sudo access to edit this file.
  • Run example in sbt, for example:
examples/runMain example3.CatsEffectApp