2016-10-21 8 views
-1

Folgendes ist meine Klasse, wo ich Aufgaben gleichzeitig ausführen. Mein Problem ist, dass meine Anwendung niemals endet, auch nicht nach dem Ergebnis für alle Funktionen. Ich vermute, dass Thread-Pool nicht heruntergefahren wird, was meine Anwendung auch nach meinen Aufgaben am Leben hält. Glaubt mir, ich habe viel gegoogelt, um es herauszufinden, aber kein Glück. Was ich hier vermisse?Scala Future Nebenläufigkeit Problem

import scala.concurrent.ExecutionContext.Implicits.global 
    import scala.concurrent.Future 
    import scala.collection.mutable.ListBuffer 
    import scala.util.Failure 
    import scala.util.Success 

    object AppLauncher{ 

     def launchAll(): ListBuffer[Future[String]] = { 
     // My code logic where I launch all my threads say 50 
     null 
     } 

def main(args:Array[String]):Unit= { 
register(launchAll()) 
} 



     def register(futureList: ListBuffer[Future[String]]): Unit = 
     { 
      futureList.foreach { future => 
      { 
       future.onComplete { 
       case Success(successResult) => { 
        println(successResult) 
       } 
       case Failure(failureResult) => { println(failureResult) } 
       } 
      } 
      } 
     } 
    } 
+0

Wie erstellen Sie diese Futures? Ich habe versucht, das Beispiel zu führen. Ich habe Dummy-Futures mit ListBuffer.fill (50) (Future ("asd")) erstellt. Der Hauptteil wurde fertiggestellt, bevor etwas auf die Konsole gedruckt wurde. Ich habe Thread.sleep (1000) nach dem Aufruf der Registermethode hinzugefügt. Das Programm druckte zwanzigmal und schloss nach 1000 Sekunden. Vielleicht gibt es ein Problem mit deiner Zukunft oder etwas? Können Sie weitere Details zu Ihren Threads angeben? – NieMaszNic

Antwort

-1

Endlich war ich in der Lage, das Problem herauszufinden. Das Problem ist wegen Thread Pool wurde nicht beendet, auch nachdem meine Futures erfolgreich abgeschlossen wurden. Ich habe versucht, das Problem zu isolieren, indem ich meine Implementierung leicht wie folgt änderte.

// import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future 
    import scala.collection.mutable.ListBuffer 
    import scala.util.Failure 
    import scala.util.Success 

    //Added ExecutionContex explicitly 
    import java.util.concurrent.Executors 
    import concurrent.ExecutionContext 

    object AppLauncher { 

    //Implemented EC explicitly 
    private val pool = Executors.newFixedThreadPool(1000) 
    private implicit val executionContext = ExecutionContext.fromExecutorService(pool) 

    def launchAll(): ListBuffer[Future[String]] = { 
     // My code logic where I launch all my threads say 50 
     null 
    } 

    def main(args: Array[String]): Unit = { 
     register(launchAll()) 
    } 

    def register(futureList: ListBuffer[Future[String]]): Unit = 
     { 
     futureList.foreach { future => 
      { 

      println("Waiting...") 
      val result = Await.result(future, scala.concurrent.duration.Duration.Inf) 

      println(result) 

      } 

     } 
 pool.shutdownNow() 
     executionContext.shutdownNow() 
 println(pool.isTerminated() + " Pool terminated") 
     println(pool.isShutdown() + " Pool shutdown") 

     println(executionContext.isTerminated() + " executionContext terminated") 
     println(executionContext.isShutdown() + " executionContext shutdown") 
     } 

    } 

Ergebnis vor Code zum Herunterfahren Pool

hervorgehoben Zugabe

falsch Pool beendet

wahr Pool Abschaltung

falsch ExecutionContext

beendet

wahr ExecutionContext Abschaltung

Nach der Zugabe von markierten Code mein Problem gelöst. Ich habe sichergestellt, dass in meinem Code kein Ressourcenleck auftritt. Mein Szenario erlaubt mir, den Pool zu töten, wenn alle Futures abgeschlossen sind. Ich bin mir der Tatsache bewusst, dass ich die elegante Callback-Implementierung in die blockierende Implementierung geändert habe, aber trotzdem hat es mein Problem gelöst.

0

Normalerweise, wenn Sie auf einem iterable von Future s arbeiten, sollten Sie Future.sequence verwenden, die sagen ändert, ein Seq[Future[T]] zu einem Future[Seq[T]].

So verwenden Sie so etwas wie:

def register(futureList: Seq[Future[String]]) = Future.sequence(futureList) foreach { results => 
    println("received result") 
} 

, wenn Sie jeweils Zukunft und Druckausgaben abbilden möchten, wie es abgeschlossen ist, können Sie auch etwas auf den Linien tun;

def register(futureList: Seq[Future[String]]) = Future.sequence (
    futureList.map(f => f.map { v => 
    println(s"$v is complete") 
    v 
    })) map { vs => 
    println("all values done $vs") 
    vs 
} 
+0

Vielen Dank für die Antwort. Ich habe versucht mit Future.sequence (futureList) aber das Ergebnis ist das gleiche – BDR

+0

können Sie mir sagen, wie Ihre launchAll() -Methode startet alle Futures? – Ashesh

+0

Future [String] { // Meine Busslogiken gehen hier } Lass uns sagen, dass ich mehr als einen haben werde Futures werden dynamisch basierend auf den Umständen aufgerufen werden – BDR