Ich habe eine HTTP-Verbindung Pool, die nach ein paar Stunden Laufzeit hängt:Akka HTTP Connection Pool hängt nach einigen Stunden
private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = {
val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host)
Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew)
.via(pool).toMat(Sink.foreach {
case ((Success(res), p)) => p.success(res)
case ((Failure(e), p)) => p.failure(e)
})(Keep.left).run
}
I enqueue Artikel mit:
private def enqueue(uri: Uri): Future[HttpResponse] = {
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = uri) -> promise
queue.offer(request).flatMap {
case Enqueued => promise.future
case _ => Future.failed(ConnectionPoolDroppedRequest)
}
}
und Lösung der Antwort wie folgt:
private def request(uri: Uri): Future[HttpResponse] = {
def retry = {
Thread.sleep(config.dispatcherRetryInterval)
logger.info(s"retrying")
request(uri)
}
logger.info("req-start")
for {
response <- enqueue(uri)
_ = logger.info("req-end")
finalResponse <- response.status match {
case TooManyRequests => retry
case OK => Future.successful(response)
case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString))
}
} yield finalResponse
}
Das Ergebnis dieser Funktion wird dann immer transformiert, wenn die Zukunft erfolgreich ist:
def get(uri: Uri): Future[Try[JValue]] = {
for {
response <- request(uri)
json <- Unmarshal(response.entity).to[Try[JValue]]
} yield json
}
Alles funktioniert für eine Weile in Ordnung und dann alles, was ich in den Protokollen sehen ist req-Start- und kein req-Ende.
Meine akka Konfiguration ist wie folgt:
akka {
actor.deployment.default {
dispatcher = "my-dispatcher"
}
}
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 256
parallelism-factor = 128.0
parallelism-max = 1024
}
}
akka.http {
host-connection-pool {
max-connections = 512
max-retries = 5
max-open-requests = 16384
pipelining-limit = 1
}
}
Ich bin nicht sicher, ob dies ein Konfigurationsproblem oder ein Code Problem. Ich habe meine Parallelitäts- und Verbindungsnummern so hoch, weil ich ohne sie eine sehr schlechte req/s Rate bekomme (ich möchte so schnell wie möglich anfordern - ich habe einen anderen Ratenbegrenzungscode, um den Server zu schützen).
Ich konsumiere tatsächlich die Entität, nachdem ich die Antwort bekomme. Ich werde den Beitrag mit weiteren Informationen aktualisieren. – asuna
Habe meinen Code geändert, um akka.pattern.after zu verwenden, und wird ein Update veröffentlichen, wenn das Problem erneut auftritt. Ich habe den früheren Thread.sleep-Code jedoch profiliert, und der Profiler hat gezeigt, dass keiner der Threads schläft, wenn er nicht mehr funktioniert. Immer wenn ich einen 429 bekomme, zeigt jvisvisualvm, dass einer der Threads für etwa 500ms schläft und dann beginnt dieser Thread wieder zu laufen, also bin ich ein wenig skeptisch, wenn ich den Scheduler benutze, um ihn zu reparieren. Nichtsdestotrotz war die Verwendung von Thread.sleep wirklich schlecht - danke, dass du mir eine gute Lösung gegeben hast, um das zu beheben. – asuna
Ich laufe auf das gleiche Problem. Dies ist der Thread-Dump: https://gist.github.com/pradyuman/bf83a8f3a293d8c679fcb6dc5f566a80 – asuna