Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions akka-actor/src/main/scala/akka/actor/Stash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,27 @@ private[akka] trait StashSupport {
theStash = theStash.tail
}

private[akka] def unstash(filterPredicate: Any => Boolean): Unit = {
if (theStash.nonEmpty) {
val buf = Vector.newBuilder[Envelope]
try {
val i = theStash.reverseIterator
var found = false
while (i.hasNext) {
val envelope = i.next()
if (!found && filterPredicate(envelope.message)) {
enqueueFirst(envelope)
found = true
} else {
buf += envelope
}
}
} finally {
theStash = buf.result().reverse
}
}
}

/**
* Prepends all messages in the stash to the mailbox, and then clears the stash.
*
Expand Down
37 changes: 26 additions & 11 deletions akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor }

lazy val tcpDnsClient: ActorRef = createTcpClient()

override def preStart() = {
override def preStart(): Unit = {
udp ! Udp.Bind(self, new InetSocketAddress(InetAddress.getByAddress(Array.ofDim(4)), 0))
}

Expand Down Expand Up @@ -78,27 +78,22 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor }
case DropRequest(id) =>
log.debug("Dropping request [{}]", id)
inflightRequests -= id
unstashQuestion(id)

case Question4(id, name) =>
log.debug("Resolving [{}] (A)", name)
val msg = message(name, id, RecordType.A)
inflightRequests += (id -> (sender() -> msg))
log.debug("Message [{}] to [{}]: [{}]", id, ns, msg)
socket ! Udp.Send(msg.write(), ns)
sendingQueryOrStash(socket, msg)

case Question6(id, name) =>
log.debug("Resolving [{}] (AAAA)", name)
val msg = message(name, id, RecordType.AAAA)
inflightRequests += (id -> (sender() -> msg))
log.debug("Message to [{}]: [{}]", ns, msg)
socket ! Udp.Send(msg.write(), ns)
sendingQueryOrStash(socket, msg)

case SrvQuestion(id, name) =>
log.debug("Resolving [{}] (SRV)", name)
val msg = message(name, id, RecordType.SRV)
inflightRequests += (id -> (sender() -> msg))
log.debug("Message to [{}]: [{}]", ns, msg)
socket ! Udp.Send(msg.write(), ns)
sendingQueryOrStash(socket, msg)

case Udp.CommandFailed(cmd) =>
log.debug("Command failed [{}]", cmd)
Expand All @@ -111,6 +106,7 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor }
case (s, _) =>
s ! Failure(new RuntimeException("Send failed to nameserver"))
inflightRequests -= msg.id
unstashQuestion(msg.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just be unstashAll... the questions that still collide will still collide and get re-stashed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true

}
}
case _ =>
Expand All @@ -133,6 +129,7 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor }
val (recs, additionalRecs) =
if (msg.flags.responseCode == ResponseCode.SUCCESS) (msg.answerRecs, msg.additionalRecs) else (Nil, Nil)
self ! Answer(msg.id, recs, additionalRecs)
unstashQuestion(msg.id)
}
case response: Answer =>
inflightRequests.get(response.id) match {
Expand All @@ -146,7 +143,25 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor }
case Udp.Unbound => context.stop(self)
}

def createTcpClient() = {
private def unstashQuestion(id: Short): Unit = unstash {
case question: DnsQuestion => question.id == id
case _ => false
}

private def sendingQueryOrStash(socket: ActorRef, message: Message): Unit = {
val id = message.id
inflightRequests.get(id) match {
case None =>
inflightRequests += (id -> (sender() -> message))
log.debug("Message [{}] to [{}]: [{}]", id, ns, message)
socket ! Udp.Send(message.write(), ns)
case Some((_, msg)) =>
stash()
log.debug("In flight query with same id [{}] previous query: [{}]", id, msg)
}
}

def createTcpClient(): ActorRef = {
context.actorOf(
BackoffSupervisor.props(
BackoffOpts.onFailure(
Expand Down