diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e7c4538a..b0a52b52 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,14 @@ jobs: os: [ubuntu-latest] scala: [2.12, 2.13, 3] java: [temurin@8] - project: [rootJS, rootJVM, rootNative] + project: [coreJVM, coreJS, coreNative, otel4sJVM, otel4sJS, otel4sNative] + exclude: + - project: otel4sJVM + scala: 2.12 + - project: otel4sJS + scala: 2.12 + - project: otel4sNative + scala: 2.12 runs-on: ${{ matrix.os }} timeout-minutes: 60 steps: @@ -80,11 +87,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p core/.native/target core/.js/target core/.jvm/target project/target + run: mkdir -p otel4s/.native/target core/.native/target otel4s/.js/target otel4s/.jvm/target core/.js/target core/.jvm/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar core/.native/target core/.js/target core/.jvm/target project/target + run: tar cf targets.tar otel4s/.native/target core/.native/target otel4s/.js/target otel4s/.jvm/target core/.js/target core/.jvm/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') @@ -121,92 +128,152 @@ jobs: if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' run: sbt +update - - name: Download target directories (2.12, rootJS) + - name: Download target directories (2.12, coreJVM) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.12-rootJS + name: target-${{ matrix.os }}-${{ matrix.java }}-2.12-coreJVM - - name: Inflate target directories (2.12, rootJS) + - name: Inflate target directories (2.12, coreJVM) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (2.12, rootJVM) + - name: Download target directories (2.12, coreJS) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.12-rootJVM + name: target-${{ matrix.os }}-${{ matrix.java }}-2.12-coreJS - - name: Inflate target directories (2.12, rootJVM) + - name: Inflate target directories (2.12, coreJS) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (2.12, rootNative) + - name: Download target directories (2.12, coreNative) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.12-rootNative + name: target-${{ matrix.os }}-${{ matrix.java }}-2.12-coreNative - - name: Inflate target directories (2.12, rootNative) + - name: Inflate target directories (2.12, coreNative) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (2.13, rootJS) + - name: Download target directories (2.13, coreJVM) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-rootJS + name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-coreJVM - - name: Inflate target directories (2.13, rootJS) + - name: Inflate target directories (2.13, coreJVM) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (2.13, rootJVM) + - name: Download target directories (2.13, coreJS) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-rootJVM + name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-coreJS - - name: Inflate target directories (2.13, rootJVM) + - name: Inflate target directories (2.13, coreJS) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (2.13, rootNative) + - name: Download target directories (2.13, coreNative) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-rootNative + name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-coreNative - - name: Inflate target directories (2.13, rootNative) + - name: Inflate target directories (2.13, coreNative) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (3, rootJS) + - name: Download target directories (2.13, otel4sJVM) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3-rootJS + name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-otel4sJVM - - name: Inflate target directories (3, rootJS) + - name: Inflate target directories (2.13, otel4sJVM) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (3, rootJVM) + - name: Download target directories (2.13, otel4sJS) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3-rootJVM + name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-otel4sJS - - name: Inflate target directories (3, rootJVM) + - name: Inflate target directories (2.13, otel4sJS) run: | tar xf targets.tar rm targets.tar - - name: Download target directories (3, rootNative) + - name: Download target directories (2.13, otel4sNative) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3-rootNative + name: target-${{ matrix.os }}-${{ matrix.java }}-2.13-otel4sNative - - name: Inflate target directories (3, rootNative) + - name: Inflate target directories (2.13, otel4sNative) + run: | + tar xf targets.tar + rm targets.tar + + - name: Download target directories (3, coreJVM) + uses: actions/download-artifact@v4 + with: + name: target-${{ matrix.os }}-${{ matrix.java }}-3-coreJVM + + - name: Inflate target directories (3, coreJVM) + run: | + tar xf targets.tar + rm targets.tar + + - name: Download target directories (3, coreJS) + uses: actions/download-artifact@v4 + with: + name: target-${{ matrix.os }}-${{ matrix.java }}-3-coreJS + + - name: Inflate target directories (3, coreJS) + run: | + tar xf targets.tar + rm targets.tar + + - name: Download target directories (3, coreNative) + uses: actions/download-artifact@v4 + with: + name: target-${{ matrix.os }}-${{ matrix.java }}-3-coreNative + + - name: Inflate target directories (3, coreNative) + run: | + tar xf targets.tar + rm targets.tar + + - name: Download target directories (3, otel4sJVM) + uses: actions/download-artifact@v4 + with: + name: target-${{ matrix.os }}-${{ matrix.java }}-3-otel4sJVM + + - name: Inflate target directories (3, otel4sJVM) + run: | + tar xf targets.tar + rm targets.tar + + - name: Download target directories (3, otel4sJS) + uses: actions/download-artifact@v4 + with: + name: target-${{ matrix.os }}-${{ matrix.java }}-3-otel4sJS + + - name: Inflate target directories (3, otel4sJS) + run: | + tar xf targets.tar + rm targets.tar + + - name: Download target directories (3, otel4sNative) + uses: actions/download-artifact@v4 + with: + name: target-${{ matrix.os }}-${{ matrix.java }}-3-otel4sNative + + - name: Inflate target directories (3, otel4sNative) run: | tar xf targets.tar rm targets.tar diff --git a/build.sbt b/build.sbt index bd28760c..e17ece0b 100644 --- a/build.sbt +++ b/build.sbt @@ -11,7 +11,18 @@ ThisBuild / startYear := Some(2019) ThisBuild / licenses := Seq(License.MIT) ThisBuild / tlSiteApiUrl := Some(url("https://www.javadoc.io/doc/org.typelevel/keypool_2.12")) -lazy val root = tlCrossRootProject.aggregate(core) +lazy val root = tlCrossRootProject.aggregate(core, otel4s) + +ThisBuild / githubWorkflowBuildMatrixAdditions := { + val projects = core.componentProjects ++ otel4s.componentProjects + + Map("project" -> projects.map(_.id).toList) +} + +ThisBuild / githubWorkflowBuildMatrixExclusions ++= { + val projects = otel4s.componentProjects.map(_.id) + projects.map(project => MatrixExclude(Map("project" -> project, "scala" -> "2.12"))) +} lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) .crossType(CrossType.Pure) @@ -43,19 +54,38 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) ProblemFilters .exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPoolBuilder.this"), ProblemFilters - .exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool#Builder.this") + .exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool#Builder.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.Pool#Builder.this") ) ) +lazy val otel4s = crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Pure) + .in(file("otel4s")) + .dependsOn(core) + .settings(commonSettings) + .settings( + name := "keypool-otel4s", + startYear := Some(2024), + crossScalaVersions := Seq(Scala213, Scala3), + libraryDependencies ++= Seq( + "org.typelevel" %%% "otel4s-core-metrics" % otel4sV, + "org.typelevel" %%% "otel4s-sdk-metrics-testkit" % otel4sV % Test + ), + mimaPreviousArtifacts ~= { _.filterNot(_.revision.startsWith("0.4")) } + ) + lazy val docs = project .in(file("site")) .settings(commonSettings) - .dependsOn(core.jvm) + .dependsOn(core.jvm, otel4s.jvm) .enablePlugins(TypelevelSitePlugin) val catsV = "2.10.0" val catsEffectV = "3.5.4" +val otel4sV = "0.7.0" + val munitV = "1.0.0-RC1" val munitCatsEffectV = "2.0.0-M5" diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala index d98245f2..ed26948b 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala @@ -68,7 +68,8 @@ object KeyPool { private[keypool] val kpMaxIdle: Int, private[keypool] val kpMaxTotal: Int, private[keypool] val kpMaxTotalSem: Semaphore[F], - private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] + private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]], + private[keypool] val kpMetrics: Metrics[F] ) extends KeyPool[F, A, B] { def take(k: A): Resource[F, Managed[F, B]] = @@ -137,6 +138,7 @@ object KeyPool { private[keypool] def reap[F[_], A, B]( idleTimeAllowedInPoolNanos: FiniteDuration, kpVar: Ref[F, PoolMap[A, (B, F[Unit])]], + metrics: Metrics[F], onReaperException: Throwable => F[Unit] )(implicit F: Temporal[F]): F[Unit] = { // We are going to do non-referentially transparent things as we may be waiting for our modification to go through @@ -199,7 +201,7 @@ object KeyPool { val (m_, toDestroy) = findStale(now, idleCount, m) ( m_, - toDestroy.traverse_(_._2._2).attempt.flatMap { + toDestroy.traverse_(r => metrics.idleDec >> r._2._2).attempt.flatMap { case Left(t) => onReaperException(t) // .handleErrorWith(t => F.delay(t.printStackTrace())) // CHEATING? case Right(()) => F.unit @@ -237,7 +239,8 @@ object KeyPool { kp: KeyPoolConcrete[F, A, B], k: A, r: B, - destroy: F[Unit] + destroy: F[Unit], + isFromPool: Boolean ): F[Unit] = { def addToList[Z]( now: FiniteDuration, @@ -254,22 +257,25 @@ object KeyPool { else (l, Some(x)) } } + + def decIdle = kp.kpMetrics.idleDec.whenA(isFromPool) + def go(now: FiniteDuration, pc: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], F[Unit]) = pc match { - case p @ PoolClosed() => (p, destroy) + case p @ PoolClosed() => (p, decIdle >> destroy) case p @ PoolOpen(idleCount, m) => - if (idleCount > kp.kpMaxIdle) (p, destroy) + if (kp.kpMaxIdle == 0 || idleCount > kp.kpMaxIdle) (p, decIdle >> destroy) else m.get(k) match { case None => val cnt_ = idleCount + 1 val m_ = PoolMap.open(cnt_, m + (k -> One((r, destroy), now))) - (m_, Applicative[F].pure(())) + (m_, kp.kpMetrics.idleInc) case Some(l) => val (l_, mx) = addToList(now, kp.kpMaxPerKey(k), (r, destroy), l) val cnt_ = idleCount + mx.fold(1)(_ => 0) val m_ = PoolMap.open(cnt_, m + (k -> l_)) - (m_, mx.fold(Applicative[F].unit)(_ => destroy)) + (m_, mx.fold(kp.kpMetrics.idleInc)(_ => decIdle >> destroy)) } } @@ -295,21 +301,27 @@ object KeyPool { } } + def allocateNew: F[(B, F[Unit])] = + kp.kpMetrics.acquireRecordDuration.surround(kp.kpRes(k).allocated) + for { _ <- kp.kpMaxTotalSem.permit optR <- Resource.eval(kp.kpVar.modify(go)) releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState)) resource <- Resource.makeFull[F, (B, F[Unit])] { poll => - optR.fold(poll(kp.kpRes(k).allocated))(r => Applicative[F].pure(r)) + optR.fold(poll(allocateNew))(r => Applicative[F].pure(r)) } { resource => for { reusable <- releasedState.get out <- reusable match { - case Reusable.Reuse => put(kp, k, resource._1, resource._2).attempt.void + case Reusable.Reuse => put(kp, k, resource._1, resource._2, optR.nonEmpty).attempt.void case Reusable.DontReuse => resource._2.attempt.void } } yield out } + _ <- Resource.eval(kp.kpMetrics.acquiredTotalInc.whenA(optR.isEmpty)) + _ <- kp.kpMetrics.inUseCount + _ <- kp.kpMetrics.inUseRecordDuration } yield new Managed(resource._1, optR.isDefined, releasedState) } @@ -320,7 +332,8 @@ object KeyPool { val kpMaxPerKey: A => Int, val kpMaxIdle: Int, val kpMaxTotal: Int, - val onReaperException: Throwable => F[Unit] + val onReaperException: Throwable => F[Unit], + val metricsProvider: Metrics.Provider[F] ) { private def copy( kpRes: A => Resource[F, B] = this.kpRes, @@ -329,7 +342,8 @@ object KeyPool { kpMaxPerKey: A => Int = this.kpMaxPerKey, kpMaxIdle: Int = this.kpMaxIdle, kpMaxTotal: Int = this.kpMaxTotal, - onReaperException: Throwable => F[Unit] = this.onReaperException + onReaperException: Throwable => F[Unit] = this.onReaperException, + metricsProvider: Metrics.Provider[F] = this.metricsProvider ): Builder[F, A, B] = new Builder[F, A, B]( kpRes, kpDefaultReuseState, @@ -337,7 +351,8 @@ object KeyPool { kpMaxPerKey, kpMaxIdle, kpMaxTotal, - onReaperException + onReaperException, + metricsProvider ) def doOnCreate(f: B => F[Unit]): Builder[F, A, B] = @@ -366,6 +381,9 @@ object KeyPool { def withOnReaperException(f: Throwable => F[Unit]): Builder[F, A, B] = copy(onReaperException = f) + def withMetricsProvider(metricsProvider: Metrics.Provider[F]): Builder[F, A, B] = + copy(metricsProvider = metricsProvider) + def build: Resource[F, KeyPool[F, A, B]] = { def keepRunning[Z](fa: F[Z]): F[Z] = fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa) @@ -374,10 +392,11 @@ object KeyPool { Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) )(kpVar => KeyPool.destroy(kpVar)) kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong)) + kpMetrics <- Resource.eval(metricsProvider.get) _ <- idleTimeAllowedInPool match { case fd: FiniteDuration => val nanos = 0.seconds.max(fd) - keepRunning(KeyPool.reap(nanos, kpVar, onReaperException)).background.void + keepRunning(KeyPool.reap(nanos, kpVar, kpMetrics, onReaperException)).background.void case _ => Applicative[Resource[F, *]].unit } @@ -388,7 +407,8 @@ object KeyPool { kpMaxIdle, kpMaxTotal, kpMaxTotalSem, - kpVar + kpVar, + kpMetrics ) } @@ -404,7 +424,8 @@ object KeyPool { Defaults.maxPerKey, Defaults.maxIdle, Defaults.maxTotal, - Defaults.onReaperException[F] + Defaults.onReaperException[F], + Defaults.metricsProvider ) def apply[F[_]: Temporal, A, B]( @@ -422,6 +443,7 @@ object KeyPool { def onReaperException[F[_]: Applicative] = { (t: Throwable) => Function.const(Applicative[F].unit)(t) } + def metricsProvider[F[_]: Applicative]: Metrics.Provider[F] = Metrics.Provider.noop } } } diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala index 8a713890..681738fd 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala @@ -21,7 +21,7 @@ package org.typelevel.keypool -import internal.{PoolList, PoolMap} +import internal.{Metrics, PoolList, PoolMap} import cats._ import cats.syntax.all._ import cats.effect.kernel._ @@ -92,10 +92,11 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) )(kpVar => KeyPool.destroy(kpVar)) kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong)) + kpMetrics <- Resource.pure(Metrics.noop) _ <- idleTimeAllowedInPool match { case fd: FiniteDuration => val nanos = 0.seconds.max(fd) - keepRunning(KeyPool.reap(nanos, kpVar, onReaperException)).background.void + keepRunning(KeyPool.reap(nanos, kpVar, kpMetrics, onReaperException)).background.void case _ => Applicative[Resource[F, *]].unit } @@ -106,7 +107,8 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( kpMaxIdle, kpMaxTotal, kpMaxTotalSem, - kpVar + kpVar, + kpMetrics ) } diff --git a/core/src/main/scala/org/typelevel/keypool/Pool.scala b/core/src/main/scala/org/typelevel/keypool/Pool.scala index 5f4337ac..635f76a3 100644 --- a/core/src/main/scala/org/typelevel/keypool/Pool.scala +++ b/core/src/main/scala/org/typelevel/keypool/Pool.scala @@ -24,6 +24,7 @@ package org.typelevel.keypool import cats._ import cats.effect.kernel._ import cats.syntax.all._ +import org.typelevel.keypool.internal.Metrics import scala.concurrent.duration._ /** @@ -76,7 +77,8 @@ object Pool { val idleTimeAllowedInPool: Duration, val kpMaxIdle: Int, val kpMaxTotal: Int, - val onReaperException: Throwable => F[Unit] + val onReaperException: Throwable => F[Unit], + val metricsProvider: Metrics.Provider[F] ) { private def copy( kpRes: Resource[F, B] = this.kpRes, @@ -84,14 +86,16 @@ object Pool { idleTimeAllowedInPool: Duration = this.idleTimeAllowedInPool, kpMaxIdle: Int = this.kpMaxIdle, kpMaxTotal: Int = this.kpMaxTotal, - onReaperException: Throwable => F[Unit] = this.onReaperException + onReaperException: Throwable => F[Unit] = this.onReaperException, + metricsProvider: Metrics.Provider[F] = this.metricsProvider ): Builder[F, B] = new Builder[F, B]( kpRes, kpDefaultReuseState, idleTimeAllowedInPool, kpMaxIdle, kpMaxTotal, - onReaperException + onReaperException, + metricsProvider ) def doOnCreate(f: B => F[Unit]): Builder[F, B] = @@ -117,6 +121,9 @@ object Pool { def withOnReaperException(f: Throwable => F[Unit]): Builder[F, B] = copy(onReaperException = f) + def withMetricsProvider(metricsProvider: Metrics.Provider[F]): Builder[F, B] = + copy(metricsProvider = metricsProvider) + private def toKeyPoolBuilder: KeyPool.Builder[F, Unit, B] = new KeyPool.Builder( kpRes = _ => kpRes, @@ -125,7 +132,8 @@ object Pool { kpMaxPerKey = _ => kpMaxTotal, kpMaxIdle = kpMaxIdle, kpMaxTotal = kpMaxTotal, - onReaperException = onReaperException + onReaperException = onReaperException, + metricsProvider = metricsProvider ) def build: Resource[F, Pool[F, B]] = { @@ -147,7 +155,8 @@ object Pool { Defaults.idleTimeAllowedInPool, Defaults.maxIdle, Defaults.maxTotal, - Defaults.onReaperException[F] + Defaults.onReaperException[F], + Defaults.metricsProvider ) def apply[F[_]: Temporal, B]( @@ -164,6 +173,7 @@ object Pool { def onReaperException[F[_]: Applicative] = { (t: Throwable) => Function.const(Applicative[F].unit)(t) } + def metricsProvider[F[_]: Applicative]: Metrics.Provider[F] = Metrics.Provider.noop } } } diff --git a/core/src/main/scala/org/typelevel/keypool/internal/Metrics.scala b/core/src/main/scala/org/typelevel/keypool/internal/Metrics.scala new file mode 100644 index 00000000..a66df167 --- /dev/null +++ b/core/src/main/scala/org/typelevel/keypool/internal/Metrics.scala @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019 Typelevel + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.typelevel.keypool.internal + +import cats.Applicative +import cats.effect.kernel.Resource + +private[keypool] trait Metrics[F[_]] { + + /** + * Increments the number of idle resources. + */ + def idleInc: F[Unit] + + /** + * Decrements the number of idle resources. + */ + def idleDec: F[Unit] + + /** + * Records the number of in-use resources. + */ + def inUseCount: Resource[F, Unit] + + /** + * Records for how long the resource has been in use. + */ + def inUseRecordDuration: Resource[F, Unit] + + /** + * Increments the number of acquired resources. + */ + def acquiredTotalInc: F[Unit] + + /** + * Records how long does it take to acquire a resource. + */ + def acquireRecordDuration: Resource[F, Unit] + +} + +private[keypool] object Metrics { + + trait Provider[F[_]] { + def get: F[Metrics[F]] + } + + object Provider { + def noop[F[_]: Applicative]: Provider[F] = + new Provider[F] { + def get: F[Metrics[F]] = Applicative[F].pure(Metrics.noop) + } + } + + def noop[F[_]: Applicative]: Metrics[F] = + new Metrics[F] { + def idleInc: F[Unit] = Applicative[F].unit + def idleDec: F[Unit] = Applicative[F].unit + def inUseCount: Resource[F, Unit] = Resource.unit + def inUseRecordDuration: Resource[F, Unit] = Resource.unit + def acquiredTotalInc: F[Unit] = Applicative[F].unit + def acquireRecordDuration: Resource[F, Unit] = Resource.unit + } + +} diff --git a/otel4s/src/main/scala/org/typelevel/keypool/Otel4sMetrics.scala b/otel4s/src/main/scala/org/typelevel/keypool/Otel4sMetrics.scala new file mode 100644 index 00000000..61beb883 --- /dev/null +++ b/otel4s/src/main/scala/org/typelevel/keypool/Otel4sMetrics.scala @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024 Typelevel + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.typelevel.keypool + +import java.util.concurrent.TimeUnit + +import cats.Monad +import cats.effect.kernel.Resource +import cats.syntax.functor._ +import cats.syntax.flatMap._ +import org.typelevel.keypool.internal.Metrics +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.{BucketBoundaries, Meter} + +object Otel4sMetrics { + + private val DefaultHistogramBuckets: BucketBoundaries = + BucketBoundaries(Vector(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10)) + + /** + * Creates metrics provider using otel4s `Meter` under the hood. + * + * Use `attributes` to customize the metrics. + * + * @example + * {{{ + * val attributes = Attributes(Attribute("pool.name", "db-pool")) + * + * Otel4sMetrics.provider[IO](attributes = attributes) + * }}} + * + * @param prefix + * the prefix to prepend to the metrics + * + * @param attributes + * the attributes to attach to the measurements + * + * @param inUseDurationSecondsHistogramBuckets + * the histogram buckets for the 'in_use.duration' histogram + * + * @param acquireDurationSecondsHistogramBuckets + * the histogram buckets for 'acquire.duration' histogram + */ + def provider[F[_]: Monad: Meter]( + prefix: String = "keypool", + attributes: Attributes = Attributes.empty, + inUseDurationSecondsHistogramBuckets: BucketBoundaries = DefaultHistogramBuckets, + acquireDurationSecondsHistogramBuckets: BucketBoundaries = DefaultHistogramBuckets + ): Metrics.Provider[F] = + new Metrics.Provider[F] { + def get: F[Metrics[F]] = + for { + idle <- Meter[F] + .upDownCounter[Long](s"$prefix.idle.current") + .withUnit("{resource}") + .withDescription("A current number of idle resources.") + .create + + inUse <- Meter[F] + .upDownCounter[Long](s"$prefix.in_use.current") + .withUnit("{resource}") + .withDescription("A current number of resources in use.") + .create + + inUseDuration <- Meter[F] + .histogram[Long](s"$prefix.in_use.duration") + .withUnit("s") + .withDescription("For how long a resource is in use.") + .withExplicitBucketBoundaries(inUseDurationSecondsHistogramBuckets) + .create + + acquiredTotal <- Meter[F] + .counter[Long](s"$prefix.acquired.total") + .withUnit("{resource}") + .withDescription("A total number of acquired resources.") + .create + + acquireDuration <- Meter[F] + .histogram[Long](s"$prefix.acquire.duration") + .withUnit("s") + .withDescription("How long does it take to acquire a resource.") + .withExplicitBucketBoundaries(acquireDurationSecondsHistogramBuckets) + .create + } yield new Metrics[F] { + def idleInc: F[Unit] = + idle.inc(attributes) + + def idleDec: F[Unit] = + idle.dec(attributes) + + def inUseCount: Resource[F, Unit] = + Resource.make(inUse.inc(attributes))(_ => inUse.dec(attributes)) + + def inUseRecordDuration: Resource[F, Unit] = + inUseDuration.recordDuration(TimeUnit.SECONDS, attributes) + + def acquiredTotalInc: F[Unit] = + acquiredTotal.inc(attributes) + + def acquireRecordDuration: Resource[F, Unit] = + acquireDuration.recordDuration(TimeUnit.SECONDS, attributes) + } + } +} diff --git a/otel4s/src/test/scala/org/typelevel/keypool/PoolMetricsSpec.scala b/otel4s/src/test/scala/org/typelevel/keypool/PoolMetricsSpec.scala new file mode 100644 index 00000000..e03bee84 --- /dev/null +++ b/otel4s/src/test/scala/org/typelevel/keypool/PoolMetricsSpec.scala @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2024 Typelevel + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.typelevel.keypool + +import cats.effect._ +import cats.effect.testkit._ +import munit.CatsEffectSuite +import org.typelevel.keypool.internal.Metrics +import org.typelevel.otel4s.{Attribute, Attributes} +import org.typelevel.otel4s.metrics.{BucketBoundaries, Meter, MeterProvider} +import org.typelevel.otel4s.sdk.metrics.data.{MetricPoints, PointData, TimeWindow} +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class PoolMetricsSpec extends CatsEffectSuite { + import PoolMetricsSpec._ + + test("Metrics should be empty for unused pool") { + val expectedSnapshot = + MetricsSnapshot(Vector.empty, Vector.empty, Vector.empty, Vector.empty, Vector.empty) + + createTestkit.use { testkit => + for { + snapshot <- mkPool(testkit.metrics.meterProvider).surround(testkit.snapshot) + } yield assertEquals(snapshot, expectedSnapshot) + } + } + + test("In use: increment on acquire and decrement on release") { + poolTest() { (sdk, pool) => + for { + inUse <- pool.take.surround(sdk.snapshot) + afterUse <- sdk.snapshot + } yield { + assertEquals(inUse.inUse, Vector(1L)) + assertEquals(afterUse.inUse, Vector(0L)) + } + } + } + + test("In use: increment on acquire and decrement on release (failure)") { + val exception = new RuntimeException("Something went wrong") with NoStackTrace + + poolTest() { (sdk, pool) => + for { + deferred <- IO.deferred[MetricsSnapshot] + _ <- pool.take + .surround(sdk.snapshot.flatMap(deferred.complete) >> IO.raiseError(exception)) + .attempt + inUse <- deferred.get + afterUse <- sdk.snapshot + } yield { + assertEquals(inUse.inUse, Vector(1L)) + assertEquals(afterUse.inUse, Vector(0L)) + } + } + } + + test("Idle: keep 0 when `maxIdle` is 0") { + poolTest(_.withMaxIdle(0)) { (sdk, pool) => + for { + inUse <- pool.take.surround(sdk.snapshot) + afterUse <- sdk.snapshot + } yield { + assertEquals(inUse.idle, Vector.empty) + assertEquals(afterUse.idle, Vector.empty) + } + } + } + + test("Idle: keep 1 when `maxIdle` is 1") { + poolTest(_.withMaxIdle(1)) { (sdk, pool) => + for { + inUse <- pool.take.surround(sdk.snapshot) + afterUse <- sdk.snapshot + } yield { + assertEquals(inUse.idle, Vector.empty) + assertEquals(afterUse.idle, Vector(1L)) + } + } + } + + test("Idle: decrement on reaper cleanup") { + poolTest(_.withMaxIdle(1).withIdleTimeAllowedInPool(1.second)) { (sdk, pool) => + for { + inUse <- pool.take.surround(sdk.snapshot) + afterUse <- sdk.snapshot + afterSleep <- sdk.snapshot.delayBy(6.seconds) + } yield { + assertEquals(inUse.idle, Vector.empty) + assertEquals(afterUse.idle, Vector(1L)) + assertEquals(afterSleep.idle, Vector(0L)) + } + } + + } + + test("Generate valid metric snapshots") { + poolTest() { (sdk, pool) => + pool.take + .surround(sdk.snapshot.delayBy(1.second)) + .product(sdk.snapshot) + .map { case (inUse, afterUse) => + val acquireDuration = Vector( + PointData.histogram( + TimeWindow(Duration.Zero, 1.second), + Attributes(Attribute("pool.name", "test")), + Vector.empty, + Some(PointData.Histogram.Stats(0.0, 0.0, 0.0, 1)), + HistogramBuckets, + Vector(1, 0, 0, 0, 0) + ) + ) + + val expectedInUse = MetricsSnapshot( + idle = Vector.empty, + inUse = Vector(1L), + inUseDuration = Vector.empty, + acquiredTotal = Vector(1L), + acquireDuration = acquireDuration + ) + + val expectedAfterUser = MetricsSnapshot( + idle = Vector(1L), + inUse = Vector(0L), + inUseDuration = Vector( + PointData.histogram( + TimeWindow(Duration.Zero, 1.second), + Attributes(Attribute("pool.name", "test")), + Vector.empty, + Some(PointData.Histogram.Stats(1.0, 1.0, 1.0, 1)), + HistogramBuckets, + Vector(0, 1, 0, 0, 0) + ) + ), + acquiredTotal = Vector(1L), + acquireDuration = acquireDuration + ) + + assertEquals(inUse, expectedInUse) + assertEquals(afterUse, expectedAfterUser) + } + } + } + + private def poolTest( + customize: Pool.Builder[IO, Ref[IO, Int]] => Pool.Builder[IO, Ref[IO, Int]] = identity + )(scenario: (OtelTestkit[IO], Pool[IO, Ref[IO, Int]]) => IO[Unit]): IO[Unit] = + TestControl.executeEmbed { + createTestkit.use { sdk => + sdk.metrics.meterProvider.get("org.typelevel.keypool").flatMap { implicit M: Meter[IO] => + val builder = Pool + .Builder(Ref.of[IO, Int](1), nothing) + .withMetricsProvider(metricsProvider) + + customize(builder).build.use(pool => scenario(sdk, pool)) + } + } + } + + private def mkPool(meterProvider: MeterProvider[IO]) = + Resource.eval(meterProvider.get("org.typelevel.keypool")).flatMap { implicit M: Meter[IO] => + Pool + .Builder( + Ref.of[IO, Int](1), + nothing + ) + .withMetricsProvider(metricsProvider) + .withMaxTotal(10) + .build + } + + private def metricsProvider(implicit M: Meter[IO]): Metrics.Provider[IO] = + Otel4sMetrics.provider[IO]( + "keypool", + Attributes(Attribute("pool.name", "test")), + HistogramBuckets, + HistogramBuckets + ) + + private def createTestkit: Resource[IO, OtelTestkit[IO]] = + MetricsTestkit.inMemory[IO]().map { testkit => + new OtelTestkit[IO] { + val metrics: MetricsTestkit[IO] = testkit + + def snapshot: IO[MetricsSnapshot] = + for { + metrics <- testkit.collectMetrics + } yield { + def counterValue(name: String): Vector[Long] = + metrics + .find(_.name == name) + .map(_.data) + .collectFirst { case sum: MetricPoints.Sum => + sum.points.toVector.collect { case long: PointData.LongNumber => + long.value + } + } + .getOrElse(Vector.empty) + + def histogramSnapshot(name: String): Vector[PointData.Histogram] = + metrics + .find(_.name == name) + .map(_.data) + .collectFirst { case histogram: MetricPoints.Histogram => + histogram.points.toVector + } + .getOrElse(Vector.empty) + + MetricsSnapshot( + counterValue("keypool.idle.current"), + counterValue("keypool.in_use.current"), + histogramSnapshot("keypool.in_use.duration"), + counterValue("keypool.acquired.total"), + histogramSnapshot("keypool.acquire.duration") + ) + } + } + } + + private val HistogramBuckets: BucketBoundaries = + BucketBoundaries(Vector(0.01, 1.0, 100.0, 1000.0)) + + private def nothing(ref: Ref[IO, Int]): IO[Unit] = + ref.get.void + +} + +object PoolMetricsSpec { + + trait OtelTestkit[F[_]] { + def metrics: MetricsTestkit[F] + def snapshot: F[MetricsSnapshot] + } + + final case class MetricsSnapshot( + idle: Vector[Long], + inUse: Vector[Long], + inUseDuration: Vector[PointData.Histogram], + acquiredTotal: Vector[Long], + acquireDuration: Vector[PointData.Histogram] + ) + +}