2017-02-27 4 views
3

Ich habe eine HTTP-Verbindung Pool, die nach ein paar Stunden Laufzeit hängt:Akka HTTP Connection Pool hängt nach einigen Stunden

private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = { 
    val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host) 
    Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew) 
     .via(pool).toMat(Sink.foreach { 
     case ((Success(res), p)) => p.success(res) 
     case ((Failure(e), p)) => p.failure(e) 
     })(Keep.left).run 
    } 

I enqueue Artikel mit:

private def enqueue(uri: Uri): Future[HttpResponse] = { 
    val promise = Promise[HttpResponse] 
    val request = HttpRequest(uri = uri) -> promise 

    queue.offer(request).flatMap { 
     case Enqueued => promise.future 
     case _ => Future.failed(ConnectionPoolDroppedRequest) 
    } 
} 

und Lösung der Antwort wie folgt:

private def request(uri: Uri): Future[HttpResponse] = { 
    def retry = { 
     Thread.sleep(config.dispatcherRetryInterval) 
     logger.info(s"retrying") 
     request(uri) 
    } 

    logger.info("req-start") 
    for { 
     response <- enqueue(uri) 

     _ = logger.info("req-end") 

     finalResponse <- response.status match { 
     case TooManyRequests => retry 
     case OK => Future.successful(response) 
     case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString)) 
     } 
    } yield finalResponse 
} 

Das Ergebnis dieser Funktion wird dann immer transformiert, wenn die Zukunft erfolgreich ist:

def get(uri: Uri): Future[Try[JValue]] = { 
    for { 
    response <- request(uri) 
    json <- Unmarshal(response.entity).to[Try[JValue]] 
    } yield json 
} 

Alles funktioniert für eine Weile in Ordnung und dann alles, was ich in den Protokollen sehen ist req-Start- und kein req-Ende.

Meine akka Konfiguration ist wie folgt:

akka { 
    actor.deployment.default { 
    dispatcher = "my-dispatcher" 
    } 
} 

my-dispatcher { 
    type = Dispatcher 
    executor = "fork-join-executor" 

    fork-join-executor { 
    parallelism-min = 256 
    parallelism-factor = 128.0 
    parallelism-max = 1024 
    } 
} 

akka.http { 
    host-connection-pool { 
    max-connections = 512 
    max-retries = 5 
    max-open-requests = 16384 
    pipelining-limit = 1 
    } 
} 

Ich bin nicht sicher, ob dies ein Konfigurationsproblem oder ein Code Problem. Ich habe meine Parallelitäts- und Verbindungsnummern so hoch, weil ich ohne sie eine sehr schlechte req/s Rate bekomme (ich möchte so schnell wie möglich anfordern - ich habe einen anderen Ratenbegrenzungscode, um den Server zu schützen).

Antwort

3

Sie verbrauchen nicht die Entität der Antworten, die Sie vom Server erhalten. Nennen Sie die folgenden Dokumente:

Konsumieren (oder Verwerfen) der Entität einer Anfrage ist obligatorisch! Wenn versehentlich links weder verbraucht oder verworfen werden Akka HTTP die eingehenden Daten übernehmen sollte Rück unter Druck bleiben und blockieren die eingehenden Daten über TCP Gegendruckmechanismen. Ein Client sollte die Entität konsumieren, unabhängig vom Status der HttpResponse.

Die Entität kommt in Form einer Source[ByteString, _], die ausgeführt werden muss, um Ressourcenmangel zu vermeiden.

Wenn Sie nicht über die Einheit lesen müssen, ist die einfachste Art und Weise die Einheit Bytes zu konsumieren ist, sie zu verwerfen, durch

mit
res.discardEntityBytes() 

(Sie können einen Rückruf an, indem Sie das Hinzufügen - zB - .future().map(...)) .

This page in the docs beschreibt alle Alternativen dazu, wie es die Bytes zu lesen, wenn nötig.

--- EDIT

Nach mehr Code/Infos zur Verfügung gestellt wurden, ist es klar, dass der Ressourcenverbrauch nicht das Problem ist. Es gibt eine andere große rote Flagge in dieser Implementierung, nämlich die Thread.sleep in der Wiederholungsmethode. Dies ist ein blockierender Aufruf, bei dem die Threading-Infrastruktur des zugrunde liegenden Aktorsystems sehr wahrscheinlich verhungern wird.

Eine vollständige geblasene Erklärung, warum dies ist gefährlich in den docs zur Verfügung gestellt.

Versuchen Sie das mal und mit akka.pattern.after (docs) zu verändern.Beispiel unten:

def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri)) 
+0

Ich konsumiere tatsächlich die Entität, nachdem ich die Antwort bekomme. Ich werde den Beitrag mit weiteren Informationen aktualisieren. – asuna

+0

Habe meinen Code geändert, um akka.pattern.after zu verwenden, und wird ein Update veröffentlichen, wenn das Problem erneut auftritt. Ich habe den früheren Thread.sleep-Code jedoch profiliert, und der Profiler hat gezeigt, dass keiner der Threads schläft, wenn er nicht mehr funktioniert. Immer wenn ich einen 429 bekomme, zeigt jvisvisualvm, dass einer der Threads für etwa 500ms schläft und dann beginnt dieser Thread wieder zu laufen, also bin ich ein wenig skeptisch, wenn ich den Scheduler benutze, um ihn zu reparieren. Nichtsdestotrotz war die Verwendung von Thread.sleep wirklich schlecht - danke, dass du mir eine gute Lösung gegeben hast, um das zu beheben. – asuna

+0

Ich laufe auf das gleiche Problem. Dies ist der Thread-Dump: https://gist.github.com/pradyuman/bf83a8f3a293d8c679fcb6dc5f566a80 – asuna