2016-03-21 5 views
0

Ich versuche Akka-http zu machen, um HTTP-Anfragen an einen einzelnen Host (beispielsweise „akka.io“) zu verwenden. Das Problem ist, dass der erzeugte Fluss (Http(). CachedHostConnectionPool) beginnt, Antworten erst dann zu senden, wenn N HTTP-Anfragen gemacht werden, wobei N gleich Max-Verbindungen ist.Warum akka http Antworten für erste N Anfragen nicht ausgibt?

import scala.util.Failure 
import scala.util.Success 
import com.typesafe.config.ConfigFactory 
import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.HttpRequest 
import akka.http.scaladsl.model.Uri.apply 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Source 

object ConnectionPoolExample extends App { 

    implicit val system = ActorSystem() 
    implicit val executor = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val config = ConfigFactory.load() 

    val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10) 
    lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings) 

    val fakeSource = Source.fromIterator[Unit] {() => Iterator.continually { Thread.sleep(1000);() } } 
    val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) } 

    val responses = requests.via(poolClientFlow) 

    responses.runForeach { 
    case (tryResponse, jsonData) => 
     tryResponse match { 
     case Success(httpResponse) => 
      httpResponse.entity.dataBytes.runWith(Sink.ignore) 
      println(s"status: ${httpResponse.status}") 
     case Failure(e) => { 
      println(e) 
     } 
     } 
    } 
} 

Die Ausgabe sieht wie folgt aus:

Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
status: 200 OK 
Creating request 
status: 200 OK 
Creating request 
status: 200 OK 
... 

Ich andernfalls alle Konfigurationsparameter zu finden, die Antworten erlauben würde emittieren, sobald sie bereit sind, und nicht, wenn der Pool ist aus der freien Verbindungen.

Danke!

Antwort

0

Der Grund dafür ist, dass Sie den Client zu tun, andere Arbeit blockieren, indem Thread.sleep -Das Methode aufrufen einfach in reaktiven Programme verboten ist. Der richtige und einfachere Ansatz besteht darin, Source.tick zu verwenden.

+0

Danke Roland. Das spezielle Beispiel wird mit Source.tick gelöst. Es war schade, Thread.sleep (1000) in dieser fakeSource zu verwenden. Die wirkliche Quelle von Kafka zu lesen und es durch die Erweiterung GraphStag [SourceShape [A]] ... 'val stream = consumerMap.getOrElse (topicname, List()). Kopf SetHandler (out, neuen OutHandler implementiert { außer Kraft setzen def onPull(): Unit = {. val jsonData = JsonParser (stream.head.message()) convertTo [A] Push (out, jsonData) } }) ' ... Ist es auch blockiert der Kunde? – uladzimir

+0

Ja, Sie möchten '.async' zu dieser Quelle hinzufügen, um sie vom Rest des Streams zu entkoppeln. Wir arbeiten auch an der richtigen Kafka-Integration, siehe reactive-kafka. –