2016-04-05 4 views
7

Ich habe dieses Problem in meinem realen Projekt angetroffen und durch meinen Testcode und Profiler bewiesen. Anstatt den Code "tl; dr" einzufügen, zeige ich Ihnen ein Bild und beschreibe es dann. enter image description hereÜber Future.firstCompletedOf und Garbage Collect-Mechanismus

Einfach gesagt, ich bin mit Future.firstCompletedOf ein Ergebnis von 2 Future s zu erhalten, von denen beide haben keine gemeinsamen Dinge und tun umeinander nicht. Auch wenn, was die Frage ist, die ich ansprechen möchte, kann der Garbage Collector das erste Result Objekt nicht wiederverwenden, bis beide Future s fertig sind.

Also bin ich wirklich neugierig auf den Mechanismus dahinter. Könnte jemand es von einer niedrigeren Ebene erklären, oder einen Hinweis für mich geben, um mich zu untersuchen.

Danke!

PS: ist es, weil sie das gleiche ExecutionContext teilen?

** Update ** Paste Testcode als

angefordert
object Main extends App{ 
    println("Test start") 

    val timeout = 30000 

    trait Result { 
    val id: Int 
    val str = "I'm short" 
    } 
    class BigObject(val id: Int) extends Result{ 
    override val str = "really big str" 
    } 

    def guardian = Future({ 
    Thread.sleep(timeout) 
    new Result { val id = 99999 } 
    }) 

    def worker(i: Int) = Future({ 
    Thread.sleep(100) 
    new BigObject(i) 
    }) 

    for (i <- Range(1, 1000)){ 
    println("round " + i) 
    Thread.sleep(20) 
    Future.firstCompletedOf(Seq(
     guardian, 
     worker(i) 
    )).map(r => println("result" + r.id)) 
    } 

    while (true){ 
    Thread.sleep(2000) 
    } 
} 
+0

Ich bin neugierig darüber, wie Sie zu beweisen, dass "Ergebnis" kann nicht Müll gesammelt werden, weil ich das Gegenteil sagen würde, könnte es interessant sein.Vielleicht fügen Sie weitere Details hinzu, wie Sie dies verifiziert haben? –

+0

Zeigen Sie den Code. Es ist ziemlich unmöglich zu sagen, was ohne es passieren könnte. –

+0

Eigentlich ist das Problem ein allgemeiner und hängt nicht von einem bestimmten Anwendungsfall ab, daher ist es sehr möglich, ohne weitere Details zu antworten. –

Antwort

9

Mal sehen, wie firstCompletedOf implementiert:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { 
    val p = Promise[T]() 
    val completeFirst: Try[T] => Unit = p tryComplete _ 
    futures foreach { _ onComplete completeFirst } 
    p.future 
} 

Wenn { futures foreach { _ onComplete completeFirst } tun, die Funktion { _ onComplete completeFirst } irgendwo über ExecutionContext.execute gespeichert. Wo genau diese Funktion gespeichert ist, ist irrelevant, wir wissen nur, dass sie irgendwo gespeichert werden muss, damit sie später wieder aufgenommen und in einem Thread-Pool ausgeführt werden kann, wenn ein Thread verfügbar wird.

Diese Funktion schließt über completeFirst, die über schließt. Solange es noch eine Zukunft (von) gibt, die darauf wartet, beendet zu werden, gibt es einen Verweis auf , der verhindert, dass es Müll gesammelt wird (obwohl zu diesem Zeitpunkt die Wahrscheinlichkeit ist, dass firstCompletedOf bereits zurückgegeben wurde, entfernen p aus der Stapel).

Wenn die erste Zukunft abgeschlossen ist, wird das Ergebnis in das Versprechen gespeichert (durch Aufruf von p.tryComplete). Weil das Versprechen das Ergebnis enthält, ist das Ergebnis mindestens so lange erreichbar, wie erreichbar ist, und wie wir gesehen haben, ist erreichbar, solange mindestens einmal von nicht abgeschlossen worden ist. Dies ist der Grund, warum das Ergebnis nicht gesammelt werden kann, bevor alle Futures abgeschlossen sind.

UPDATE: Jetzt ist die Frage: könnte es behoben werden? Ich denke es könnte. Alles, was wir tun müssten, ist sicherzustellen, dass die erste Zukunft, die abgeschlossen wird, den Verweis auf p auf threadsichere Weise "ausnullt", was beispielsweise mit einer AtomicReference geschehen kann. Etwas wie folgt aus:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { 
    val p = Promise[T]() 
    val pref = new java.util.concurrent.atomic.AtomicReference(p) 
    val completeFirst: Try[T] => Unit = { result: Try[T] => 
    val promise = pref.getAndSet(null) 
    if (promise != null) { 
     promise.tryComplete(result) 
    } 
    } 
    futures foreach { _ onComplete completeFirst } 
    p.future 
} 

ich es getestet haben und wie erwartet es erlaubt das Ergebnis Müll so schnell gesammelt, wie die erste Zukunft abgeschlossen ist. Es sollte sich in allen anderen Aspekten gleich verhalten.

+0

Danke, dass du es für mich erledigt hast, ich starrte ziemlich lange auf "firstCompletedOf" und konnte es nicht herausfinden. Und dennoch, die Schlussfolgerung ist ziemlich gegen die Intuition, weiß nicht, ob sich jemand jemals darüber beschwert hat ... – noru

+0

Ich habe eine alternative Implementierung hinzugefügt, die diese Situation beheben sollte. Lassen Sie es mich wissen, wenn es für Sie funktioniert (dies könnte eine Pull-Anforderung an die Standardbibliothek rechtfertigen). –

+0

es funktioniert gut, wie ich beobachtet habe. Die Fäden sind immer noch besetzt, aber das ist eine völlig andere Geschichte. Danke für Ihre Hilfe! – noru