2015-10-29 6 views
9

Update von 2015-10-30Akka-Stream-Umsetzung langsamer als Single-Threaded-Implementierung


basierend auf Roland Kuhn awnser:

Akka Streams verwendet asynchrone Nachricht zwischen Akteuren vorbei zu implementieren Stream Verarbeitungsstufen. Das Übergeben von Daten über eine asynchrone Grenze hat einen Overhead, den Sie hier sehen: Ihre Berechnung scheint nur etwa 160 ns zu benötigen (abgeleitet von der single-threaded Messung), während die Streaming-Lösung etwa 1μs pro Element benötigt Die Nachricht wird weitergeleitet.

Ein weiteres Missverständnis ist, dass zu sagen „Strom“ Parallelität bedeutet: in Code alle Berechnung läuft nacheinander in einem einzigen Darsteller (die Karte Stufe), so kann kein Vorteil gegenüber der primitiven Single-Threaded-Lösung erwarten.

Um von der Parallelität von Akka Streams Sie gewährt profitieren Notwendigkeit, mehrere Verarbeitungsstufen haben, die jeweils

1 us pro Element Aufgaben von

ausführen, finden Sie auch die Dokumentation.

Ich habe einige Änderungen vorgenommen. Mein Code sieht jetzt aus wie:

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform) 

    val eventToFactorial = Flow[Event].map(SharedFunctions.transform2) 

    val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder => 
    import FlowGraph.Implicits._ 

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4)) 
    val mergeEvents = builder.add(Merge[Int](4)) 

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0) 
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1) 
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2) 
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3) 

    (dispatchTuple.in, mergeEvents.out) 
    } 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 
    } 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 
    actorSystem.shutdown 
    actorSystem.awaitTermination 
    System.exit(-1) 
    } 

    def main(args: Array[String]) { 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink) 
    // in.via(eventChef).runWith(sink) 
    } 

} 

ich nicht sicher, ob ich etwas total falsch, aber immer noch meine Implementierung mit akka-Streams ist viel langsamer (jetzt auch nach wie vor langsamer), aber was habe ich herausgefunden ist: Wenn ich Erhöhen Sie die Arbeit zum Beispiel, indem Sie eine Aufteilung vornehmen, die Implementierung mit akka-streams wird schneller. Wenn ich es richtig verstehe (korrigiere mich sonst), scheint es in meinem Beispiel zu viel Aufwand zu geben. Sie profitieren also nur dann von akka-streams, wenn der Code schwer arbeiten muss?




Ich bin in beide scala & akka-Strom relativ neu. Ich habe ein kleines Testprojekt geschrieben, das einige Ereignisse erzeugt, bis ein Zähler eine bestimmte Anzahl erreicht hat. Für jedes Ereignis wird die Fakultät für ein Feld des Ereignisses berechnet. Ich habe das zweimal umgesetzt. Einmal mit akka-stream und einmal ohne akka-stream (single threaded) und die Laufzeit verglichen.

Ich habe nicht erwartet, dass: Wenn ich ein einzelnes Ereignis erstellen, sind die Laufzeit beider Programme fast identisch. Aber wenn ich 70.000.000 Ereignisse erstelle, ist die Implementierung ohne akka-Streams viel schneller.Hier sind meine Ergebnisse (die folgenden Daten anhand von 24 Messungen):


  • Einzelereignis ohne Akka-Strom: 403 (+ - 2) ms
  • Einzelereignis mit akka-streams: 444 (+ -13) ms


  • 70Mio Ereignisse ohne akka Ströme: 11778 (+ -70) ms

  • 70Mio Ereignisse mit akka-dampft: 75424 (+ - 2959) ms

Also meine Frage ist: Was ist los? Warum ist meine Implementierung mit akka-stream langsamer?

hier mein Code:

Umsetzung mit Akka

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 
    } 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 
    actorSystem.shutdown 
    actorSystem.awaitTermination 
    System.exit(-1) 
    } 

    def main(args: Array[String]) { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink).onComplete(_ => endAkka()) 
    } 

} 

Implementierung ohne Akka

Objekt SingleThread {

def main(args: Array[String]) { 
    println("SingleThread started at: " + SharedFunctions.startTime) 
    println("0%") 
    val i = createEvent(0) 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now()); 
    println("Time: " + duration.getMillis + " || Data: " + i) 
    } 

    def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = { 
    if (count == SharedFunctions.maxEventCount) count 
    else { 
     val e = SharedFunctions.transform((randDate, name, age, myFloat)) 
     SharedFunctions.transform2(e) 
     val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count, 
     DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
     createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f) 
    } 
    } 

    def createEvent(count: Int): Int = { 
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f) 
    } 
} 

SharedFunctions

object SharedFunctions { 
    val maxEventCount = 70000000 
    val startTime = DateTime.now 

    def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4) 
    def transform2(e : Event) : Int = factorial(e.getAgeYrs) 

    def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100)/totalValue) 
    def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = { 
    val cProgress = calculatePercentage(fileSize, currentSize) 
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms") 
    cProgress 
    } 

    private def factorialWorker(n1: Int, n2: Int): Int = { 
    if (n1 == 0) n2 
    else factorialWorker(n1 -1, n2*n1) 
    } 
    def factorial (n : Int): Int = { 
    factorialWorker(n, 1) 
    } 
} 

Implementation Ereignis

/** 
* Autogenerated by Avro 
* 
* DO NOT EDIT DIRECTLY 
*/ 

@SuppressWarnings("all") 
@org.apache.avro.specific.AvroGenerated 
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { 
    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}"); 
    public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } 
    @Deprecated public long timestampMS; 
    @Deprecated public CharSequence name; 
    @Deprecated public int ageYrs; 
    @Deprecated public float sizeCm; 

    /** 
    * Default constructor. Note that this does not initialize fields 
    * to their default values from the schema. If that is desired then 
    * one should use <code>newBuilder()</code>. 
    */ 
    public Event() {} 

    /** 
    * All-args constructor. 
    */ 
    public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) { 
    this.timestampMS = timestampMS; 
    this.name = name; 
    this.ageYrs = ageYrs; 
    this.sizeCm = sizeCm; 
    } 

    public org.apache.avro.Schema getSchema() { return SCHEMA$; } 
    // Used by DatumWriter. Applications should not call. 
    public Object get(int field$) { 
    switch (field$) { 
    case 0: return timestampMS; 
    case 1: return name; 
    case 2: return ageYrs; 
    case 3: return sizeCm; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 
    } 
    } 
    // Used by DatumReader. Applications should not call. 
    @SuppressWarnings(value="unchecked") 
    public void put(int field$, Object value$) { 
    switch (field$) { 
    case 0: timestampMS = (Long)value$; break; 
    case 1: name = (CharSequence)value$; break; 
    case 2: ageYrs = (Integer)value$; break; 
    case 3: sizeCm = (Float)value$; break; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 
    } 
    } 

    /** 
    * Gets the value of the 'timestampMS' field. 
    */ 
    public Long getTimestampMS() { 
    return timestampMS; 
    } 

    /** 
    * Sets the value of the 'timestampMS' field. 
    * @param value the value to set. 
    */ 
    public void setTimestampMS(Long value) { 
    this.timestampMS = value; 
    } 

    /** 
    * Gets the value of the 'name' field. 
    */ 
    public CharSequence getName() { 
    return name; 
    } 

    /** 
    * Sets the value of the 'name' field. 
    * @param value the value to set. 
    */ 
    public void setName(CharSequence value) { 
    this.name = value; 
    } 

    /** 
    * Gets the value of the 'ageYrs' field. 
    */ 
    public Integer getAgeYrs() { 
    return ageYrs; 
    } 

    /** 
    * Sets the value of the 'ageYrs' field. 
    * @param value the value to set. 
    */ 
    public void setAgeYrs(Integer value) { 
    this.ageYrs = value; 
    } 

    /** 
    * Gets the value of the 'sizeCm' field. 
    */ 
    public Float getSizeCm() { 
    return sizeCm; 
    } 

    /** 
    * Sets the value of the 'sizeCm' field. 
    * @param value the value to set. 
    */ 
    public void setSizeCm(Float value) { 
    this.sizeCm = value; 
    } 

    /** Creates a new Event RecordBuilder */ 
    public static Event.Builder newBuilder() { 
    return new Event.Builder(); 
    } 

    /** Creates a new Event RecordBuilder by copying an existing Builder */ 
    public static Event.Builder newBuilder(Event.Builder other) { 
    return new Event.Builder(other); 
    } 

    /** Creates a new Event RecordBuilder by copying an existing Event instance */ 
    public static Event.Builder newBuilder(Event other) { 
    return new Event.Builder(other); 
    } 

    /** 
    * RecordBuilder for Event instances. 
    */ 
    public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event> 
    implements org.apache.avro.data.RecordBuilder<Event> { 

    private long timestampMS; 
    private CharSequence name; 
    private int ageYrs; 
    private float sizeCm; 

    /** Creates a new Builder */ 
    private Builder() { 
     super(Event.SCHEMA$); 
    } 

    /** Creates a Builder by copying an existing Builder */ 
    private Builder(Event.Builder other) { 
     super(other); 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     } 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     } 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     } 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 
     } 
    } 

    /** Creates a Builder by copying an existing Event instance */ 
    private Builder(Event other) { 
      super(Event.SCHEMA$); 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     } 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     } 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     } 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 
     } 
    } 

    /** Gets the value of the 'timestampMS' field */ 
    public Long getTimestampMS() { 
     return timestampMS; 
    } 

    /** Sets the value of the 'timestampMS' field */ 
    public Event.Builder setTimestampMS(long value) { 
     validate(fields()[0], value); 
     this.timestampMS = value; 
     fieldSetFlags()[0] = true; 
     return this; 
    } 

    /** Checks whether the 'timestampMS' field has been set */ 
    public boolean hasTimestampMS() { 
     return fieldSetFlags()[0]; 
    } 

    /** Clears the value of the 'timestampMS' field */ 
    public Event.Builder clearTimestampMS() { 
     fieldSetFlags()[0] = false; 
     return this; 
    } 

    /** Gets the value of the 'name' field */ 
    public CharSequence getName() { 
     return name; 
    } 

    /** Sets the value of the 'name' field */ 
    public Event.Builder setName(CharSequence value) { 
     validate(fields()[1], value); 
     this.name = value; 
     fieldSetFlags()[1] = true; 
     return this; 
    } 

    /** Checks whether the 'name' field has been set */ 
    public boolean hasName() { 
     return fieldSetFlags()[1]; 
    } 

    /** Clears the value of the 'name' field */ 
    public Event.Builder clearName() { 
     name = null; 
     fieldSetFlags()[1] = false; 
     return this; 
    } 

    /** Gets the value of the 'ageYrs' field */ 
    public Integer getAgeYrs() { 
     return ageYrs; 
    } 

    /** Sets the value of the 'ageYrs' field */ 
    public Event.Builder setAgeYrs(int value) { 
     validate(fields()[2], value); 
     this.ageYrs = value; 
     fieldSetFlags()[2] = true; 
     return this; 
    } 

    /** Checks whether the 'ageYrs' field has been set */ 
    public boolean hasAgeYrs() { 
     return fieldSetFlags()[2]; 
    } 

    /** Clears the value of the 'ageYrs' field */ 
    public Event.Builder clearAgeYrs() { 
     fieldSetFlags()[2] = false; 
     return this; 
    } 

    /** Gets the value of the 'sizeCm' field */ 
    public Float getSizeCm() { 
     return sizeCm; 
    } 

    /** Sets the value of the 'sizeCm' field */ 
    public Event.Builder setSizeCm(float value) { 
     validate(fields()[3], value); 
     this.sizeCm = value; 
     fieldSetFlags()[3] = true; 
     return this; 
    } 

    /** Checks whether the 'sizeCm' field has been set */ 
    public boolean hasSizeCm() { 
     return fieldSetFlags()[3]; 
    } 

    /** Clears the value of the 'sizeCm' field */ 
    public Event.Builder clearSizeCm() { 
     fieldSetFlags()[3] = false; 
     return this; 
    } 

    @Override 
    public Event build() { 
     try { 
     Event record = new Event(); 
     record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]); 
     record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]); 
     record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]); 
     record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]); 
     return record; 
     } catch (Exception e) { 
     throw new org.apache.avro.AvroRuntimeException(e); 
     } 
    } 
    } 
} 
+0

Nur der Vollständigkeit halber, können Sie bitte die Ereignisdefinition angeben. Ich möchte versuchen, Ihren Multithread-Code zu optimieren ... –

+0

sicher, ich habe es hinzugefügt –

Antwort

11

Zusätzlich zu Rolands Erklärung, der ich voll und ganz zustimme, sollte klar sein, dass akka Streams nicht nur ein simultanes Programmierframework sind.Streams bieten auch einen Gegendruck, was bedeutet, dass Ereignisse nur von der Source erzeugt werden, wenn es erforderlich ist, sie in der Sink zu verarbeiten. Diese Mitteilung der Nachfrage fügt bei jedem Verarbeitungsschritt einen zusätzlichen Aufwand hinzu.

Daher ist Ihr Single-Thread- und Multithread-Vergleich nicht "Äpfel-zu-Äpfel".

Wenn Sie eine rohe Multithread-Ausführungsleistung wünschen, dann sind Futures/Actors ein besserer Weg.

+0

Wahr. Gute Anwendungsfälle für Streams sind Echtzeit-Streaming oder das Streaming großer Datenmengen. Das Analysieren riesiger Videodateien, ohne alles in den Speicher zu schreiben, das Sammeln von Millionen von Datensätzen über eine Internetverbindung oder das Verwenden von Dateiwächtern, um darauf zu warten, dass eine Datei vor dem Parsen abgelegt wird, sind gute Beispiele. Sie begrenzen entweder die Komplexität in Umgebungen, in denen die ultimative Geschwindigkeit nicht erforderlich ist, und sind möglicherweise unklug (Daten werden über einen längeren Zeitraum übertragen oder erfordern Millionen von Netzwerkaufrufen) oder helfen, die Komplexität dort zu reduzieren, wo eine enorme Datenmenge vorhanden wäre. –

27

Akka Streams unter Verwendung von zwischen den Akteuren Passieren asynchronen Nachrichtenstrom Verarbeitungsstufen zu implementieren. Das Übergeben von Daten über eine asynchrone Grenze hat einen Overhead, den Sie hier sehen: Ihre Berechnung scheint nur etwa 160 ns (abgeleitet von der single-threaded Messung) zu benötigen, während die Streaming-Lösung ungefähr 1 μs pro Element benötigt, die von der Nachrichtenübergabe dominiert wird. Ein weiteres Missverständnis ist, dass "stream" bedeutet Parallelität bedeutet: in Ihrem Code wird die gesamte Berechnung nacheinander in einem einzigen Actor ausgeführt (die map Bühne), so dass kein Vorteil gegenüber der primitiven Singlethread-Lösung erwartet werden kann.

Um von der Parallelität von Akka Streams profitieren zu können, benötigen Sie mehrere Verarbeitungsstufen, die Aufgaben von> 1μs pro Element ausführen, siehe auch the docs.

+5

nicht sicher, warum dies nicht die akzeptierte Antwort war - es älter als die akzeptierte Antwort, die tatsächlich damit übereinstimmt und nur ein kleiner Punkt dazu sekundär Antworten. – doug