Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Pluggable caches in FinalFlatMap #396

Merged
merged 11 commits into from
Dec 6, 2013
Merged

Pluggable caches in FinalFlatMap #396

merged 11 commits into from
Dec 6, 2013

Conversation

ianoc
Copy link
Collaborator

@ianoc ianoc commented Dec 5, 2013

Added an synchronous cache, pulled cache logic out thats used by the FFM. Allow ticks to trigger flushing.

private lazy val runtime = Runtime.getRuntime
private def memoryWaterMark = {
val used = (runtime.totalMemory - runtime.freeMemory).toDouble / runtime.maxMemory
used > 0.8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a magic number. Config? val?

private def keySpaceTooBig = keyMap.size > cacheSize

override def cleanup = {
Future {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this use the pool?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how the pool could use the pool to shutdown? it would be recursive right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not safe to call shutdown from a thread that is created by the pool? If not, how do we ensure that?

Not a big deal, but if you don't want to tackle, add an issue, a link to it here and a comment explaining the concern about correctness and using the pool to shut itself down.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could call shutdown, but this currently waits to ensure its shutdown before returning -- if we drop that part (awaitShutdown). Then we could do it inside the pool?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an issue

#397

@@ -49,4 +49,9 @@ object Constants {
val DEFAULT_MAX_WAITING_FUTURES = MaxWaitingFutures(10)
val DEFAULT_MAX_FUTURE_WAIT_TIME = MaxFutureWaitTime(Duration.fromSeconds(60))
val DEFAULT_FM_PREFER_LOCAL_DEPENDENCY = PreferLocalDependency(false)
val DEFAULT_FLUSH_FREQUENCY = FlushFrequency(Duration.fromSeconds(10))
val DEFAULT_USE_ASYNC_CACHE = UseAsyncCache(false)
val DEFAULT_ASYNCPOOLSIZE = AsyncPoolSize(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about: Runtime.getRuntime().availableProcessors() as the default pool size?

http://stackoverflow.com/questions/4759570/finding-number-of-cores-in-java

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@ianoc
Copy link
Collaborator Author

ianoc commented Dec 5, 2013

@johnynek poke

merge(key, List(monoid.sumOption(dataCP).get))
}
} else {
if(keyMap.putIfAbsent(key, oldQueue) != null) { // Not there
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more comments here. So this branch happens when there IS a value for this key, right? Not there is confusing to me.

import scala.collection.mutable.SynchronizedQueue
import com.twitter.summingbird.online.option.{ValueCombinerCacheSize, AsyncPoolSize, FlushFrequency, SoftMemoryFlushPercent}
import java.util.concurrent.{Executors, ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConversions._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like these. Can we replace with JavaConverters._ and use the .asJava .asScala syntax. Implicit conversions of this kind bite us.

lastDump = System.currentTimeMillis
startKeyset.flatMap{case k =>
Option(keyMap.remove(k)).map(_.toSeq).flatMap(monoid.sumOption(_)).map((k, _))
}.toMap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(breakOut) magic is supposedly faster here:

import scala.collection.breakOut
flatMap { k => ... }(breakOut)

http://stackoverflow.com/questions/1715681/scala-2-8-breakout

johnynek added a commit that referenced this pull request Dec 6, 2013
Pluggable caches in FinalFlatMap
@johnynek johnynek merged commit 30c03b6 into develop Dec 6, 2013
@johnynek johnynek deleted the feature/branchF branch December 6, 2013 01:10
@johnynek
Copy link
Collaborator

johnynek commented Dec 6, 2013

Thanks for pushing this through. Next up, see if we can get it upping Ashu's throughput?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants