2017-03-28 1 views
1
def fixture = 
    new { 

     val xyz = new XYZ(spark) 
    }  
val fList: scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]] = scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]]() //mutable List of future means List[Future] 

    test("test case") {  
     val tasks = for (i <- 1 to 10) { 
      fList ++ scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]](Future { 
      println("Executing task " + i) 
      val ds = read(fixture.etlSparkLayer,i)   
      ds 
      }) 
     } 

     Thread.sleep(1000*4200) 
     val futureOfList = Future.sequence(fList)//list of Future job in Future sequence  
     println(Await.ready(futureOfList, Duration.Inf)) 


     val await_result: Seq[Dataset[Row]] = Await.result(futureOfList, Duration.Inf) 
     println("Squares: " + await_result) 

     futureOfList.onComplete { 
      case Success(x) => println("Success!!! " + x) 
      case Failure(ex) => println("Failed !!! " + ex) 
     }    
     } 

Ich führe einen Testfall mit Sequenz von Future List und List haben Sammlung von Future.I versuchen, die gleiche Funktion mehrere Male parallel mithilfe von Future in scala auszuführen. In meinem System starten nur 4 Jobs in einer Zeit nach der Fertigstellung von 4 Jobs, die nächsten 4 Job werden so beginnen, vervollständigen alle Jobs. Also, wie man mehr als 4 Jobs gleichzeitig startet und wie der Haupt-Thread auf den gesamten Future Thread wartet? Ich probierte Await.result und Wartebereit, aber nicht in der Lage, Hauptthread zu steuern, für die Hauptthreadsteuerung verwende ich Thread.sleep concept.this Programm zum Lesen von RDBMS-Tabelle und Schreiben in Elasticsearch. Also, wie Haupt Hauptproblem zu steuern?Control-Thread mit mehreren Future-Jobs in der Skala

+0

Haben Sie Futures verwenden? Schauspieler scheinen besser zu passen. Sie können die Nebenläufigkeit viel einfacher mit Schauspielern steuern. –

+0

Überprüfen Sie diese http://stackoverflow.com/questions/29344430/scala-waiting-for-sequence-of-futures. Wie auch immer, ich hatte viele Probleme mit Scala Futures, die versuchten, ihre Ausführung zu kontrollieren. Ich musste Scalaz Task verwenden und alles funktionierte viel besser. – EmiCareOfCell44

+0

@ JonAnderson Ich weiß das, aber dafür können wir 2 Abhängigkeit hinzufügen. –

Antwort

1

Unter der Annahme, dass Sie die Verwendung scala.concurrent.ExecutionContext.Implicits.globalExecutionContext Sie stimmen die Anzahl der Threads, wie hier beschrieben:

https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L100

Insbesondere die folgenden Systemeigenschaften: scala.concurrent.context.minThreads, scala.concurrent.context.numThreads. scala.concurrent.context.maxThreads und scala.concurrent.context.maxExtraThreads

Andernfalls können Sie den Code in etwa so umschreiben:

import scala.collection.immutable 
import scala.concurrent.duration._ 
import scala.concurrent._ 
import java.util.concurrent.Executors 

test("test case") { 
    implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT)) 
    val aFuture = Future.traverse(1 to 10) { 
    i => Future { 
     println("Executing task " + i) 
     read(fixture.etlSparkLayer,i) // If this is a blocking operation you may want to consider wrapping it in a `blocking {}`-block.   
    } 
    } 
    aFuture.onComplete(_ => ec.shutdownNow()) // Only for this test, and to make sure the pool gets cleaned up 
    val await_result: immutable.Seq[Dataset[Row]] = Await.result(aFuture, 60.minutes) // Or other timeout 
    println("Squares: " + await_result) 
} 
+0

Ich bin Anfänger in Scala so können Sie mir bitte ein Link oder Material oder Tipps zu meiner E-Mail [email protected] –

+1

@xyz_scala https://github.com/viktorklang/blog –

+0

von diesem ich kann Anzahl von dort steuern Thread-Pool, aber wie Spark-Kontext-Operation zu steuern? Mittel ich versuche, in ES mit Hilfe von Funken zu schreiben, Aber Funken fängt an, nur 4 auf einmal zu schreiben. –