Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pekko migration #17

Merged
merged 32 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b9734a4
first migration
Seetaramayya Feb 1, 2023
60af2ba
temp fix in build.sbt
Seetaramayya Feb 1, 2023
2145a27
migrated pekko kafka
Seetaramayya Feb 1, 2023
7ac492e
package folders adjusted for core module (main)
Seetaramayya Feb 1, 2023
896e10b
package folders adjusted for tests/test
Seetaramayya Feb 1, 2023
362a76d
package folders adjusted for tests/it
Seetaramayya Feb 1, 2023
19d04ff
package folders adjusted for tests/main
Seetaramayya Feb 1, 2023
967910c
package folders adjusted for testkit (main)
Seetaramayya Feb 1, 2023
9cb099a
package folders adjusted for benchmarks (it)
Seetaramayya Feb 1, 2023
cf331f4
package folders adjusted for benchmarks (main)
Seetaramayya Feb 1, 2023
bb70254
package folders adjusted for cluster-sharding (main)
Seetaramayya Feb 1, 2023
4886eca
package folders adjusted for testkit java classes
Seetaramayya Feb 1, 2023
0e504bb
Some more akka import adjustments in scala classes
Seetaramayya Feb 1, 2023
18e4355
Some more akka import adjustments in java classes
Seetaramayya Feb 1, 2023
a6d98ad
- Finally, replaced akka word with org.apache.pekko everywhere in .sc…
Seetaramayya Feb 1, 2023
1c01fcb
Finally, replaced akka word with org.apache.pekko everywhere in .java…
Seetaramayya Feb 1, 2023
06c30e0
adjusted logback as well
Seetaramayya Feb 1, 2023
f865a74
PR comments
Seetaramayya Feb 1, 2023
e6d23a8
SCM and contributor data adjusted
Seetaramayya Feb 2, 2023
9d71592
formatted files
Seetaramayya Feb 2, 2023
f834557
fixed the documentation build
Seetaramayya Feb 4, 2023
a5a58e3
fixed pekko discovery check
Seetaramayya Feb 4, 2023
30d9337
Fixed another unit test
Seetaramayya Feb 4, 2023
db626e7
one more test fix
Seetaramayya Feb 4, 2023
e641032
Renamed package names in logback
Seetaramayya Feb 4, 2023
f2780a8
Fixed all unit tests (build should be green)
Seetaramayya Feb 4, 2023
8345031
Renamed Alpakka... -> PekkoConnectors...
Seetaramayya Feb 4, 2023
82d4b75
Renamed Alpakka... -> PekkoConnectors...
Seetaramayya Feb 4, 2023
4bbd401
Removed unnecessary TODO
Seetaramayya Feb 4, 2023
107a927
Removed dependency on lightbend stream-alpakka-csv
Seetaramayya Feb 4, 2023
4bfdac4
- Using package configuration mapping instead of default scaladoc.bas…
Seetaramayya Feb 5, 2023
68bdfe4
All doc links in build.sbt are adjusted
Seetaramayya Feb 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ optIn.configStyleArguments = false
danglingParentheses.preset = false
spaces.inImportCurlyBraces = true
rewrite.neverInfix.excludeFilters = [
at
and
min
max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_1000_100, topic_1000_5000, topic_1000_5000_8 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_1000_100, topic_1000_5000, topic_1000_5000_8 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
it should "bench with small messages" in {
Expand Down Expand Up @@ -36,7 +36,7 @@ class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
}
}

class AlpakkaKafkaBatchedConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaBatchedConsumer extends BenchmarksBase() {

it should "bench with small messages" in {
val cmd = RunTestCommand("alpakka-kafka-batched-consumer", bootstrapServers, topic_1000_100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase._
import akka.kafka.benchmarks.InflightMetrics._
import akka.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import akka.kafka.benchmarks.Timed.{ runPerfTest, runPerfTestInflightMetrics }
import akka.kafka.benchmarks.app.RunTestCommand
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike
package org.apache.pekko.kafka.benchmarks

import org.apache.pekko.kafka.benchmarks.BenchmarksBase._
import org.apache.pekko.kafka.benchmarks.InflightMetrics._
import org.apache.pekko.kafka.benchmarks.PerfFixtureHelpers.FilledTopic
import org.apache.pekko.kafka.benchmarks.Timed.{ runPerfTest, runPerfTestInflightMetrics }
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.testkit.KafkaTestkitTestcontainersSettings
import org.apache.pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import com.typesafe.config.Config

object BenchmarksBase {
Expand Down Expand Up @@ -61,7 +61,7 @@ class ApacheKafkaConsumerNokafka extends BenchmarksBase() {
}
}

class AlpakkaKafkaConsumerNokafka extends BenchmarksBase() {
class PekkoConnectorsKafkaConsumerNokafka extends BenchmarksBase() {
it should "bench" in {
val cmd = RunTestCommand("alpakka-kafka-plain-consumer-nokafka", bootstrapServers, topic_2000_100)
runPerfTest(cmd,
Expand All @@ -83,7 +83,7 @@ class ApacheKafkaPlainConsumer extends BenchmarksBase() {
}
}

class AlpakkaKafkaPlainConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaPlainConsumer extends BenchmarksBase() {
it should "bench" in {
val cmd = RunTestCommand("alpakka-kafka-plain-consumer", bootstrapServers, topic_2000_100)
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.plainSources(cmd), ReactiveKafkaConsumerBenchmarks.consumePlain)
Expand Down Expand Up @@ -127,7 +127,7 @@ class ApacheKafkaAtMostOnceConsumer extends BenchmarksBase() {
}
}

class AlpakkaKafkaAtMostOnceConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaAtMostOnceConsumer extends BenchmarksBase() {
it should "bench" in {
val cmd = RunTestCommand("alpakka-kafka-at-most-once-consumer", bootstrapServers, topic_50_100)
runPerfTest(cmd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

import BenchmarksBase._

Expand Down Expand Up @@ -38,7 +38,7 @@ class RawKafkaCommitEveryPollConsumer extends BenchmarksBase() {
// }
}

class AlpakkaCommitAndForgetConsumer extends BenchmarksBase() {
class PekkoConnectorsCommitAndForgetConsumer extends BenchmarksBase() {
val prefix = "alpakka-kafka-commit-and-forget-"

it should "bench with small messages" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,45 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

/**
* Compares the `CommittingProducerSinkStage` with the composed implementation of `Producer.flexiFlow` and `Committer.sink`.
*/
class AlpakkaCommittableProducer extends BenchmarksBase() {
class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
it should "bench composed sink with 100b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer-composed", bootstrapServers, topic_100_100)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.composedSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}

it should "bench composed sink with 5000b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer-composed-5000b", bootstrapServers, topic_100_5000)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.composedSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}

it should "bench `Producer.committableSink` with 100b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer", bootstrapServers, topic_100_100)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.producerSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}

it should "bench `Producer.committableSink` with 5000b messages" in {
val cmd = RunTestCommand("alpakka-committable-producer-5000b", bootstrapServers, topic_100_5000)
runPerfTest(
cmd,
AlpakkaCommittableSinkFixtures.producerSink(cmd),
AlpakkaCommittableSinkBenchmarks.run)
PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
PekkoConnectorsCommittableSinkBenchmarks.run)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_2000_100, topic_2000_500, topic_2000_5000, topic_2000_5000_8 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{
topic_2000_100,
topic_2000_500,
topic_2000_5000,
topic_2000_5000_8
}
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

class ApacheKafkaPlainProducer extends BenchmarksBase() {
private val prefix = "apache-kafka-plain-producer"
Expand All @@ -34,7 +39,7 @@ class ApacheKafkaPlainProducer extends BenchmarksBase() {
}
}

class AlpakkaKafkaPlainProducer extends BenchmarksBase() {
class PekkoConnectorsKafkaPlainProducer extends BenchmarksBase() {
private val prefix = "alpakka-kafka-plain-producer"

it should "bench with small messages" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.apache.pekko.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import akka.kafka.benchmarks.Timed.runPerfTest
import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.BenchmarksBase.{ topic_100_100, topic_100_5000 }
import org.apache.pekko.kafka.benchmarks.Timed.runPerfTest
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
import scala.concurrent.duration._

class ApacheKafkaTransactions extends BenchmarksBase() {
Expand All @@ -26,7 +26,7 @@ class ApacheKafkaTransactions extends BenchmarksBase() {
}
}

class AlpakkaKafkaTransactions extends BenchmarksBase() {
class PekkoConnectorsKafkaTransactions extends BenchmarksBase() {
it should "bench with small messages" in {
val cmd = RunTestCommand("alpakka-kafka-transactions", bootstrapServers, topic_100_100)
runPerfTest(
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
</appender>

<logger name="org.apache" level="WARN"/>
<logger name="akka" level="WARN"/>
<logger name="akka.kafka.benchmarks" level="INFO"/>
<logger name="org.apache.pekko" level="WARN"/>
<logger name="org.apache.pekko.kafka.benchmarks" level="INFO"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.kafka.benchmarks

import org.apache.pekko.util.ByteString

import java.nio.charset.{ Charset, StandardCharsets }
import scala.collection.immutable

private[benchmarks] sealed trait CsvQuotingStyle

object CsvQuotingStyle {

/** Quote only fields requiring quotes */
case object Required extends CsvQuotingStyle

/** Quote all fields */
case object Always extends CsvQuotingStyle
}

// TODO: This needs to be deleted after migrating alpakka to pekko.
pjfanning marked this conversation as resolved.
Show resolved Hide resolved
// This is just temporary base to see everything compiles and tests will pass without any issue
private[benchmarks] class CsvFormatter(delimiter: Char,
quoteChar: Char,
escapeChar: Char,
endOfLine: String,
quotingStyle: CsvQuotingStyle,
charset: Charset = StandardCharsets.UTF_8) {

private[this] val charsetName = charset.name()

private[this] val delimiterBs = ByteString(String.valueOf(delimiter), charsetName)
private[this] val quoteBs = ByteString(String.valueOf(quoteChar), charsetName)
private[this] val duplicatedQuote = ByteString(String.valueOf(Array(quoteChar, quoteChar)), charsetName)
private[this] val duplicatedEscape = ByteString(String.valueOf(Array(escapeChar, escapeChar)), charsetName)
private[this] val endOfLineBs = ByteString(endOfLine, charsetName)

def toCsv(fields: immutable.Iterable[Any]): ByteString =
if (fields.nonEmpty) nonEmptyToCsv(fields)
else endOfLineBs

private def nonEmptyToCsv(fields: immutable.Iterable[Any]) = {
val builder = ByteString.createBuilder

def splitAndDuplicateQuotesAndEscapes(field: String, splitAt: Int) = {

@inline def indexOfQuoteOrEscape(lastIndex: Int) = {
var index = lastIndex
var found = -1
while (index < field.length && found == -1) {
val char = field(index)
if (char == quoteChar || char == escapeChar) found = index
index += 1
}
found
}

var lastIndex = 0
var index = splitAt
while (index > -1) {
builder ++= ByteString.apply(field.substring(lastIndex, index), charsetName)
val char = field.charAt(index)
if (char == quoteChar) {
builder ++= duplicatedQuote
} else {
builder ++= duplicatedEscape
}
lastIndex = index + 1
index = indexOfQuoteOrEscape(lastIndex)
}
if (lastIndex < field.length) {
builder ++= ByteString(field.substring(lastIndex), charsetName)
}
}

def append(field: String) = {
val (quoteIt, splitAt) = requiresQuotesOrSplit(field)
if (quoteIt || quotingStyle == CsvQuotingStyle.Always) {
builder ++= quoteBs
if (splitAt != -1) {
splitAndDuplicateQuotesAndEscapes(field, splitAt)
} else {
builder ++= ByteString(field, charsetName)
}
builder ++= quoteBs
} else {
builder ++= ByteString(field, charsetName)
}
}

val iterator = fields.iterator
var hasNext = iterator.hasNext
while (hasNext) {
val next = iterator.next()
if (next != null) {
append(next.toString)
}
hasNext = iterator.hasNext
if (hasNext) {
builder ++= delimiterBs
}
}
builder ++= endOfLineBs
builder.result()
}

private def requiresQuotesOrSplit(field: String): (Boolean, Int) = {
var quotes = CsvQuotingStyle.Always == quotingStyle
var split = -1
var index = 0
while (index < field.length && !(quotes && split != -1)) {
val char = field(index)
if (char == `quoteChar` || char == `escapeChar`) {
quotes = true
split = index
} else if (char == '\r' || char == '\n' || char == `delimiter`) {
quotes = true
}
index += 1
}
(quotes, split)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.benchmarks
package org.apache.pekko.kafka.benchmarks

import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.pekko.kafka.benchmarks.app.RunTestCommand

case class FixtureGen[F](command: RunTestCommand, generate: Int => F)
Loading