2017-08-04 2 views
0

Ich lerne Akka Actor vor kurzem. Ich habe das Dokument der Dispatcher in Actor gelesen. Ich bin neugierig auf die blockierende Operation in einem Schauspieler. Die letzte topic in dem Dokument beschreibt, wie das Problem zu lösen ist. Und ich versuche, das Beispielexperiment in dem Dokument zu reproduzieren.Blocking-Operation in Actor NOT besetzt alle Standard-Dispatcher

Hier ist mein Code:

package dispatcher 

import akka.actor.{ActorSystem, Props} 
import com.typesafe.config.ConfigFactory 

object Main extends App{ 

    var config = ConfigFactory.parseString(
    """ 
     |my-dispatcher{ 
     |type = Dispatcher 
     | 
     |executor = "fork-join-executor" 
     | 
     |fork-join-executor{ 
     |fixed-pool-size = 32 
     |} 
     |throughput = 1 
     |} 
    """.stripMargin) 

// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf")) 


    val system = ActorSystem("block") 


    val actor1 = system.actorOf(Props(new BlockingFutureActor())) 
    val actor2 = system.actorOf(Props(new PrintActor())) 

    for(i <- 1 to 1000){ 
    actor1 ! i 
    actor2 ! i 
    } 

} 

package dispatcher 

import akka.actor.Actor 

import scala.concurrent.{ExecutionContext, Future} 

class BlockingFutureActor extends Actor{ 
    override def receive: Receive = { 
    case i: Int => 
     Thread.sleep(5000) 
     implicit val excutionContext: ExecutionContext = context.dispatcher 
     Future { 
     Thread.sleep(5000) 
     println(s"Blocking future finished ${i}") 
     } 
    } 
} 
package dispatcher 

import akka.actor.Actor 

class PrintActor extends Actor{ 
    override def receive: Receive = { 
    case i: Int => 
     println(s"PrintActor: ${i}") 
    } 
} 

ich einfach eine ActorSystem mit den Standard-Disponenten erstellen und alle Akteure hängen von denen. Die BlockingFutureActor hat eine Blockierung, die in einem Future gekapselt ist. Die PrintActor druckt nur eine Nummer sofort.

In der Erläuterung des Dokuments werden die Standard-Dispatcher von Future s in BlockingFutureActor besetzt, was zur Blockierung der Nachricht von führt. Die Anwendung bleibt irgendwo stecken wie:

> PrintActor: 44 
> PrintActor: 45 

Leider ist mein Code nicht blockiert. Alle Ausgaben von PrintActor zeigen sich reibungslos. Aber Ausgaben von BlockingFutureActor zeigen sich wie quetschen Zahnpasta. Ich versuche, meinen Thread info von IntelliJ der Debug-Monitor, ich habe: thread monitoring

Sie finden nur zwei Disponenten schlafen (BlockingFutureActor macht dies geschehen). Andere warten, was bedeutet, dass sie für die Zustellung neuer Nachrichten verfügbar sind.

Ich habe eine Antwort über blockierende Operation in Actor (page) gelesen. Es wird zitiert, dass "Dispatcher effektiv thread-pools sind. Das Trennen der beiden garantiert, dass die langsamen, blockierenden Operationen die anderen nicht verhungern. Dieser Ansatz wird im Allgemeinen als Massen-Überschrift bezeichnet, weil die Idee ist, dass Wenn ein Teil der App fehlschlägt, bleibt der Rest reaktionsfähig. "

Behindern Standard-Dispatcher einige Dispatcher für blockierende Operationen? Dadurch kann das System Nachrichten verarbeiten, selbst wenn so viele blockierende Operationen nach Dispatcher fragen.

Kann das Experiment im Akka-Dokument reproduziert werden? Stimmt etwas nicht mit meiner Konfiguration?

Vielen Dank für Ihre Vorschläge. Die besten Wünsche.

+0

Sagen Sie, dass Sie alle 1000 'println' Anweisungen von' PrintActor' sehen? – chunjef

+0

Ja, genau. 1000 'println' erscheinen in dem Moment, in dem die Anwendung gestartet wird. – jiexray

Antwort

2

Der Grund, warum Sie alle 1000 print-Anweisungen aus den PrintActor sehen vor irgendwelchen Druckanweisungen vom BlockingFutureActor ist aufgrund des ersten Thread.sleep Anrufs in den BlockingFutureActor ‚s receive Block. Diese Thread.sleep ist der entscheidende Unterschied zwischen Ihrem Code und dem Beispiel in der offiziellen Dokumentation:

override def receive: Receive = { 
    case i: Int => 
    Thread.sleep(5000) // <----- this call is not in the example in the official docs 
    implicit val excutionContext: ExecutionContext = context.dispatcher 
    Future { 
     ... 
    } 
} 

Denken Sie daran, dass die Akteure eine Nachricht zu einer Zeit verarbeiten. Die Thread.sleep(5000) simuliert im Grunde eine Nachricht, die mindestens fünf Sekunden zur Verarbeitung benötigt. Die BlockingFutureActor wird eine andere Nachricht nicht verarbeiten, bis die aktuelle Nachricht verarbeitet wurde, auch wenn sie Hunderte von Nachrichten in ihrer Mailbox hat. Während die BlockingFutureActor die erste Int Nachricht des Werts 1 verarbeitet, hat die PrintActor bereits die Verarbeitung aller 1000 Nachrichten abgeschlossen, die an sie gesendet wurden.Um diese klarer zu machen, fügen wir eine println Aussage:

override def receive: Receive = { 
    case i: Int => 
    println(s"Entering BlockingFutureActor's receive: $i") // <----- 
    Thread.sleep(5000) 
    implicit val excutionContext: ExecutionContext = context.dispatcher 
    Future { 
     ... 
    } 
} 

Eine Probe ausgegeben werden, wenn wir das Programm ausführen:

Entering BlockingFutureActor's receive: 1 
PrintActor: 1 
PrintActor: 2 
PrintActor: 3 
... 
PrintActor: 1000 
Entering BlockingFutureActor's receive: 2 
Entering BlockingFutureActor's receive: 3 
Blocking future finished 1 
... 

Wie Sie sehen können, durch die Zeit, die BlockingFutureActor beginnt tatsächlich zu verarbeiten die Nachricht 2, die PrintActor hat bereits durch alle 1000 Nachrichten aufgewühlt.

Wenn Sie entfernen Sie das erste Thread.sleep, dann werden Sie Nachrichten sehen von der BlockingFutureActor ‚Mailbox schneller aus der Warteschlange entfernt, weil die Arbeit an einem Future‚delegiert‘wird. Sobald der Future erstellt ist, greift der Schauspieler die nächste Nachricht aus seinem Postfach, ohne auf die Beendigung der Future zu warten. Im Folgenden finden Sie eine Beispielausgabe, ohne dass zuerst Thread.sleep (es ist nicht genau jedes Mal die gleiche sein wird, Sie es ausführen): „Alle Ausgänge von` PrintActor` glatt erscheinen“

Entering BlockingFutureActor's receive: 1 
PrintActor: 1 
PrintActor: 2 
... 
PrintActor: 84 
PrintActor: 85 
Entering BlockingFutureActor's receive: 2 
Entering BlockingFutureActor's receive: 3 
Entering BlockingFutureActor's receive: 4 
Entering BlockingFutureActor's receive: 5 
PrintActor: 86 
PrintActor: 87 
... 
+0

Es tut mir sehr leid, dass ich wegen meiner Unachtsamkeit einen dummen Käfer gemacht habe. Es ist wahr, dass 'Thread.sleep (5000)' in einer 'Zukunft' eingekapselt sein sollte. Und mein erstes 'Thread.sleep (5000)' ist ein dummer Bug. Ich bin Ihnen sehr dankbar für Ihre Geduld und Hilfe. Die besten Wünsche. – jiexray