From d6d7f4048bc3409bc75c056b6f3daff1ae3a3dfd Mon Sep 17 00:00:00 2001 From: kerr Date: Sat, 15 Apr 2023 19:44:13 +0800 Subject: [PATCH] =act Fix dns query overriding in dnsClient. --- .../src/main/scala/akka/actor/Stash.scala | 21 +++++++++++ .../akka/io/dns/internal/DnsClient.scala | 37 +++++++++++++------ 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 6711b8723c7..f45b3fb2e97 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -204,6 +204,27 @@ private[akka] trait StashSupport { theStash = theStash.tail } + private[akka] def unstash(filterPredicate: Any => Boolean): Unit = { + if (theStash.nonEmpty) { + val buf = Vector.newBuilder[Envelope] + try { + val i = theStash.reverseIterator + var found = false + while (i.hasNext) { + val envelope = i.next() + if (!found && filterPredicate(envelope.message)) { + enqueueFirst(envelope) + found = true + } else { + buf += envelope + } + } + } finally { + theStash = buf.result().reverse + } + } + } + /** * Prepends all messages in the stash to the mailbox, and then clears the stash. * diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala b/akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala index 68c97f7189b..108ae8078cf 100644 --- a/akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala +++ b/akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala @@ -49,7 +49,7 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor } lazy val tcpDnsClient: ActorRef = createTcpClient() - override def preStart() = { + override def preStart(): Unit = { udp ! Udp.Bind(self, new InetSocketAddress(InetAddress.getByAddress(Array.ofDim(4)), 0)) } @@ -78,27 +78,22 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor } case DropRequest(id) => log.debug("Dropping request [{}]", id) inflightRequests -= id + unstashQuestion(id) case Question4(id, name) => log.debug("Resolving [{}] (A)", name) val msg = message(name, id, RecordType.A) - inflightRequests += (id -> (sender() -> msg)) - log.debug("Message [{}] to [{}]: [{}]", id, ns, msg) - socket ! Udp.Send(msg.write(), ns) + sendingQueryOrStash(socket, msg) case Question6(id, name) => log.debug("Resolving [{}] (AAAA)", name) val msg = message(name, id, RecordType.AAAA) - inflightRequests += (id -> (sender() -> msg)) - log.debug("Message to [{}]: [{}]", ns, msg) - socket ! Udp.Send(msg.write(), ns) + sendingQueryOrStash(socket, msg) case SrvQuestion(id, name) => log.debug("Resolving [{}] (SRV)", name) val msg = message(name, id, RecordType.SRV) - inflightRequests += (id -> (sender() -> msg)) - log.debug("Message to [{}]: [{}]", ns, msg) - socket ! Udp.Send(msg.write(), ns) + sendingQueryOrStash(socket, msg) case Udp.CommandFailed(cmd) => log.debug("Command failed [{}]", cmd) @@ -111,6 +106,7 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor } case (s, _) => s ! Failure(new RuntimeException("Send failed to nameserver")) inflightRequests -= msg.id + unstashQuestion(msg.id) } } case _ => @@ -133,6 +129,7 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor } val (recs, additionalRecs) = if (msg.flags.responseCode == ResponseCode.SUCCESS) (msg.answerRecs, msg.additionalRecs) else (Nil, Nil) self ! Answer(msg.id, recs, additionalRecs) + unstashQuestion(msg.id) } case response: Answer => inflightRequests.get(response.id) match { @@ -146,7 +143,25 @@ import akka.pattern.{ BackoffOpts, BackoffSupervisor } case Udp.Unbound => context.stop(self) } - def createTcpClient() = { + private def unstashQuestion(id: Short): Unit = unstash { + case question: DnsQuestion => question.id == id + case _ => false + } + + private def sendingQueryOrStash(socket: ActorRef, message: Message): Unit = { + val id = message.id + inflightRequests.get(id) match { + case None => + inflightRequests += (id -> (sender() -> message)) + log.debug("Message [{}] to [{}]: [{}]", id, ns, message) + socket ! Udp.Send(message.write(), ns) + case Some((_, msg)) => + stash() + log.debug("In flight query with same id [{}] previous query: [{}]", id, msg) + } + } + + def createTcpClient(): ActorRef = { context.actorOf( BackoffSupervisor.props( BackoffOpts.onFailure(