2017-09-22 4 views
1

Ich benutze Source.queue, um HttpRequests in die Warteschlange zu stellen und es auf der Clientseite zu drosseln, um Dateien von einem entfernten Server herunterzuladen. Ich verstehe, dass Source.queue nicht threadsafe ist und wir MergeHub verwenden müssen, um es threadsafe zu machen. Es folgt der Codeabschnitt, der Source.queue verwendet und cachedHostConnectionPool verwendet.akka-http Wie verwende ich MergeHub, um Anfragen von der Clientseite zu drosseln

import java.io.File 

import akka.actor.Actor 
import akka.event.Logging 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.client.RequestBuilding 
import akka.http.scaladsl.model.{HttpResponse, HttpRequest, Uri} 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

import scala.concurrent.{Promise, Future} 
import scala.concurrent.duration._ 
import scala.util.{Failure, Success} 

class HttpClient extends Actor with RequestBuilding { 

    implicit val system = context.system 
    val logger = Logging(system, this) 
    implicit lazy val materializer = ActorMaterializer() 

    val config = ConfigFactory.load() 
    val remoteHost = config.getString("pool.connection.host") 
    val remoteHostPort = config.getInt("pool.connection.port") 
    val queueSize = config.getInt("pool.queueSize") 
    val throttleSize = config.getInt("pool.throttle.numberOfRequests") 
    val throttleDuration = config.getInt("pool.throttle.duration") 

    import scala.concurrent.ExecutionContext.Implicits.global 

    val connectionPool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = remoteHost, port = remoteHostPort) 

    // Construct a Queue 
    val requestQueue = 
     Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure) 
     .throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping) 
     .via(connectionPool) 
     .toMat(Sink.foreach({ 
      case ((Success(resp), p)) => p.success(resp) 
      case ((Failure(error), p)) => p.failure(error) 
     }))(Keep.left) 
     .run() 

    // Convert Promise[HttpResponse] to Future[HttpResponse] 
    def queueRequest(request: HttpRequest): Future[HttpResponse] = { 
     val responsePromise = Promise[HttpResponse]() 
     requestQueue.offer(request -> responsePromise).flatMap { 
      case QueueOfferResult.Enqueued => responsePromise.future 
      case QueueOfferResult.Dropped  => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) 
      case QueueOfferResult.Failure(ex) => Future.failed(ex) 
      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) 
     } 
    } 

    def receive = { 
     case "download" => 
     val uri = Uri(s"http://localhost:8080/file_csv.csv") 
     downloadFile(uri, new File("/tmp/compass_audience.csv")) 
    } 

    def downloadFile(uri: Uri, destinationFilePath: File) = { 

     def fileSink: Sink[ByteString, Future[IOResult]] = 
      Flow[ByteString].buffer(512, OverflowStrategy.backpressure) 
      .toMat(FileIO.toPath(destinationFilePath.toPath)) (Keep.right) 

     // Submit to queue and execute HttpRequest and write HttpResponse to file 
     Source.fromFuture(queueRequest(Get(uri))) 
      .flatMapConcat(_.entity.dataBytes) 
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 10000, allowTruncation = true)) 
      .map(_.utf8String) 
      .map(d => s"$d\n") 
      .map(ByteString(_)) 
      .runWith(fileSink) 

    } 
} 

Allerdings, wenn ich MergeHub verwenden, gibt sie Sink [(Httprequest, Versprechen [Httpresponse]), NOTUSED]. Ich muss die response.entity.dataBytes extrahieren und die Antwort mit einem Filesink auf eine Datei schreiben. Ich bin nicht in der Lage herauszufinden, wie MergeHub zu verwenden, um dies zu erreichen. Jede Hilfe wird geschätzt.

val hub: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = queueSize) 
    .throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping) 
    .via(connectionPool) 
    .toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(error), p)) => p.failure(error) 
    }))(Keep.left) 
    .run() 

Antwort

1

Source.Queue ist jetzt tatsächlich threadsicher. Wenn Sie MergeHub verwenden möchten:

private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] = 
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host).tail.head, port, connectionPoolSettings) 


    val ServerSink = 
    poolFlow.toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 

    // Attach a MergeHub Source to the consumer. This will materialize to a 
    // corresponding Sink. 
    val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink) 


    val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run() 



    protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = { 
    val responsePromise = Promise[HttpResponse]() 
    Source.single((httpRequest -> responsePromise)).runWith(toConsumer) 
    responsePromise.future.flatMap(handleHttpResponse(_, unmarshal)) 
    ) 
    } 

} 
Verwandte Themen