Skip to content

Commit

Permalink
=sbt Do retry connection in Player.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Aug 8, 2023
1 parent 4f570ea commit 9897cb4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration._
import scala.reflect.classTag
import scala.util.{ Failure, Success, Try }
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal

import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter }
import io.netty.channel.ChannelHandler.Sharable

import org.apache.pekko
import pekko.actor._
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
Expand Down Expand Up @@ -338,21 +340,36 @@ private[pekko] class PlayerHandler(

import ClientFSM._

val connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection](reconnect())
val connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection]()

var nextAttempt: Deadline = _

tryConnectToController()

@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
log.error("channel {} exception {}", ctx.channel(), cause)
cause match {
case _: ConnectException if reconnects > 0 =>
reconnects -= 1
scheduler.scheduleOnce(nextAttempt.timeLeft)(connectionRef.set(reconnect()))
case e => fsm ! ConnectionFailure(e.getMessage)
}
}

private def tryConnectToController(): Unit = {
Try(reconnect()) match {
case Success(r) => connectionRef.set(r)
case Failure(ex) =>
log.error("Error when try to connect to remote addr:[{}] will retry, time left:[{}], cause:[{}].",
server, nextAttempt.timeLeft, ex.getMessage)
scheduleReconnect()
}
}

private def scheduleReconnect(): Unit = {
scheduler.scheduleOnce(nextAttempt.timeLeft)(tryConnectToController())
}

private def reconnect(): RemoteConnection = {
nextAttempt = Deadline.now + backoff
RemoteConnection(Client, server, poolSize, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import io.netty.handler.codec.{
MessageToMessageDecoder,
MessageToMessageEncoder
}

import org.apache.pekko
import pekko.protobufv3.internal.Message
import pekko.util.Helpers
Expand Down Expand Up @@ -130,6 +131,8 @@ private[pekko] object RemoteConnection {
.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
.connect(sockaddr)
.sync()

new RemoteConnection {
override def channelFuture: ChannelFuture = cf

Expand Down
32 changes: 0 additions & 32 deletions project/SbtMultiJvm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import sbtassembly.AssemblyPlugin.assemblySettings
import sbtassembly.{ AssemblyKeys, MergeStrategy }
import AssemblyKeys._

import java.net.{ InetSocketAddress, Socket }
import java.util.concurrent.TimeUnit

object MultiJvmPlugin extends AutoPlugin {

case class Options(jvm: Seq[String], extra: String => Seq[String], run: String => Seq[String])
Expand Down Expand Up @@ -375,40 +372,11 @@ object MultiJvmPlugin extends AutoPlugin {
log.debug("Starting %s for %s".format(jvmName, testClass))
log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" ")))
val testClass2Process = (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions, jvmLogger, connectInput))
if (index == 0) {
log.debug("%s for %s 's started as `Controller`, waiting before can be connected for clients.".format(jvmName,
testClass))
val controllerHost = hosts.head
val serverPort: Int = Integer.getInteger("multinode.server-port", 4711)
waitingBeforeConnectable(controllerHost, serverPort, TimeUnit.SECONDS.toMillis(20L))
}
testClass2Process
}
processExitCodes(name, processes, log)
}

private def waitingBeforeConnectable(host: String, port: Int, timeoutInMillis: Long): Unit = {
val inetSocketAddress = new InetSocketAddress(host, port)
def telnet(addr: InetSocketAddress, timeout: Int): Boolean = {
val socket: Socket = new Socket()
try {
socket.connect(inetSocketAddress, timeout)
socket.isConnected
} catch {
case _: Exception => false
} finally {
socket.close()
}
}

val startTime = System.currentTimeMillis()
var connectivity = false
while (!connectivity && (System.currentTimeMillis() - startTime < timeoutInMillis)) {
connectivity = telnet(inetSocketAddress, 1000)
TimeUnit.MILLISECONDS.sleep(100)
}
}

def processExitCodes(name: String, processes: Seq[(String, Process)], log: Logger): (String, sbt.TestResult) = {
val exitCodes = processes.map {
case (testClass, process) => (testClass, process.exitValue())
Expand Down

0 comments on commit 9897cb4

Please sign in to comment.