2016-07-07 4 views
1

Ich habe ein Skript, das eine Menge Web-Anfragen (~ 300000) macht. Es sieht so etwas wie diesesBatching-Anfragen mit play.api.libs.ws

// Setup a new wsClient 
val config = new NingAsyncHttpClientConfigBuilder(DefaultWSClientConfig()).build 
val builder = new AsyncHttpClientConfig.Builder(config) 
val wsClient = new NingWSClient(builder.build) 

// Each of these use the wsClient 
def getAs: Future[Seq[A]] = { ... } 
def getBs: Future[Seq[B]] = { ... } 
def getCs: Future[Seq[C]] = { ... } 
def getDs: Future[Seq[D]] = { ... } 

(for { 
    as <- getAs 
    bs <- getBs 
    cs <- getCs 
    ds <- getDs 
} yield (as, bs, cs, ds)).map(tuple => println("done")) 

Das Problem ist, dass ich in eine Too many open files Ausnahme ausgeführt werden, da jede Funktion asynchron Tausende von Anfragen zu machen, von denen jede eine Dateibeschreibung verwendet.

Ich versuchte reorganisieren meine Funktionen, so dass jeder einzelne Chargen mit ihren eigenen Client machen würde:

def getAs: Future[Seq[A]] = { 
    someCollection.group(1000).map(batch => { 
     val client = new NingWSClient(builder.build) // Make a new client for every batch 
     Future.sequence(batch.map(thing => { 
      wsClient.url(...).map(...) 
     })).map(things => { 
      wsClient.close // Close the client 
      things 
     }) 
    }) 
} 

Aber dies bewirkt, dass die für Verständnis vorzeitig zu beenden (ohne Fehlermeldungen oder Ausnahmen):

(for { 
    as <- getAs 
    bs <- getBs // This doesn't happen 
    cs <- getCs // Or any of the following ones 
    ds <- getDs 
} yield (as, bs, cs, ds)).map(tuple => println("done")) 

ich für den richtigen Weg suchen nur eine große Anzahl von hTTP-Anfragen zu machen, ohne zu viele Dateideskriptoren zu öffnen.

Antwort

2

Ich hatte ein ähnliches Problem, zu viele Anfragen für einen Web-Service (~ 500 +). Ihr Codebeispiel mit Gruppierung ist fast korrekt, jedoch erhalten Sie Iterator[Future[List[Int]]] oder wenn Sie Future.sequence -d es Future[Iterator[List[Int]]]. Aber ich denke, dass sie alle asynchron ausführen werden. Sie müssen die erste Charge und dann flatMap es (warten, bis es fertig ist) und dann nächste Charge feuern. Das ist, was ich schreiben verwaltet folgende this answer:

val futureIterator = list.grouped(50).foldLeft(Future.successful[List[Int]](Nil)) { 
    (fItems, items) => 
    fItems flatMap { processed => 
     println("PROCESSED: " + processed); println("SPAWNED: " + items); 
     Future.traverse(items)(getFuture) map (res => processed ::: res) 
    } 
} 
println(Await.result(futureIterator, Duration.Inf)) 

hoffe, das hilft!

0

können Sie verwenden Octoparts:

https://m3dev.github.io/octoparts/

aber es klingt wirklich wie das Muster umkehren wollen so die WSClient Anrufe auf der Außenseite macht, und dann flatmap Sie die Zukunft [WSResponse] kommt zurück aus. Dadurch wird die Anzahl der Futures auf den internen Netty-Thread-Pool, der von AsyncHttpClient verwendet wird, reduziert, und Sie können die Konfigurationseinstellungen ändern, um die Anzahl der Threads im Netty-Channel-Pool zu erhöhen oder zu verringern.