2017-06-22 2 views
2

Ich muss viele Dateien auf S3 hochladen, es würde Stunden dauern, diesen Job sequenziell abzuschließen. Das ist genau das, was Kotlins neue Coroutines auszeichnen, also wollte ich ihnen einen ersten Versuch geben, anstatt wieder mit einem Thread-basierten Ausführungsdienst herumzuspielen.Gleichzeitiger S3-Datei-Upload über Kotlin Coroutinen

Hier ist meine (vereinfacht) Code:

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking { 
    val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build() 
    for ((x, ys) in superTiles) { 
     val jobs = mutableListOf<Deferred<Any>>() 
     for ((y, superTile) in ys) { 
      val job = async(CommonPool) { 
       uploadTile(s3, x, y, superTile) 
      } 
      jobs.add(job) 
     } 
     jobs.map { it.await() } 
    } 
} 

suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) { 
    val json: String = "{}" 
    val key = "$s3Prefix/x4/$z/$x/$y.json" 
    s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) 
} 

Das Problem: Der Code ist immer noch sehr langsam ist und die Protokollierung zeigt, dass Anfragen noch sequentiell ausgeführt werden: ein Auftrag abgeschlossen ist, bevor die nächste erstellt wird. Nur in sehr wenigen Fällen (1 von 10) werden Jobs gleichzeitig ausgeführt.

Warum läuft der Code nicht viel schneller/gleichzeitig? Was kann ich tun?

+0

Ungereimt raten: move 'val s3 = AmazonS3ClientBuilder ...' innerhalb der asynchronen Abschnitt, so dass Sie mehrere Clients haben? –

+0

das hat auch nicht funktioniert. Meine ungebildete Vermutung ist nun, dass 'putObject' die Anfrage blockiert, etwas, was sich nicht ändern kann. – linqu

+0

Genau. Es sieht so aus, als ob das S3 SDK nicht-blockierende IO (über NIO) nicht unterstützt, so dass Sie pro Upload einen Thread benötigen. Sie können immer noch mehrere parallel laufen lassen, aber es ist wahrscheinlich nicht ratsam, _all_ parallel zu machen. Irgendwann werden Sie auch nur durch Ihre Netzwerkbandbreite begrenzt sein. – diesieben07

Antwort

5

Kotlin Koroutinen treffen, wenn Sie mit asynchronen API arbeiten, während AmazonS3.putObject API, die Sie verwenden, ist ein Old-School-Blockierung, synchrones API, so erhalten Sie nur so viele gleichzeitige Uploads wie die Anzahl der Threads in der CommonPool, dass du benutzt. Es gibt keinen Wert beim Markieren Ihrer uploadTile-Funktion mit suspend geändert, weil es keine suspendierenden Funktionen in seinem Körper verwendet.

Der erste Schritt, um mehr Durchsatz in Ihrer Upload-Aufgabe zu erzielen, besteht darin, die asynchrone API dafür zu verwenden. Ich würde vorschlagen, für diesen Geldbeutel auf Amazon S3 TransferManager zu schauen. Sehen Sie, ob das Ihr Problem zuerst löst.

Die Kotlin-Coroutinen wurden entwickelt, um Ihnen zu helfen, Ihre asynchronen APIs zu leicht zu bedienenden logischen Workflows zu kombinieren. Zum Beispiel ist es einfach, indem er die folgende Erweiterungsfunktion asynchrone API von TransferManager für mit Koroutinen Gebrauch anzupassen:

suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont -> 
    addProgressListener { 
     if (isDone) { 
      // we know it should not actually wait when done 
      try { cont.resume(waitForUploadResult()) } 
      catch (e: Throwable) { cont.resumeWithException(e) } 
     } 
    } 
    cont.invokeOnCompletion { abort() } 
} 

Diese Erweiterung ermöglicht es Ihnen, sehr fließend Code zu schreiben, mit TransferManager arbeitet und Sie können Ihre uploadTile Funktion umschreiben

suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) { 
    val json: String = "{}" 
    val key = "$s3Prefix/x4/$z/$x/$y.json" 
    tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) 
     .await() 
} 

Hinweis, wie diese neue Version von uploadTile eine Suspensions Funktion verwendet await die abov definiert wurde: mit Sperr AmazonS3 Schnittstelle arbeitet mit TransferManager, anstatt zu arbeiten e.

Verwandte Themen