Update von 2015-10-30Akka-Stream-Umsetzung langsamer als Single-Threaded-Implementierung
basierend auf Roland Kuhn awnser:
Akka Streams verwendet asynchrone Nachricht zwischen Akteuren vorbei zu implementieren Stream Verarbeitungsstufen. Das Übergeben von Daten über eine asynchrone Grenze hat einen Overhead, den Sie hier sehen: Ihre Berechnung scheint nur etwa 160 ns zu benötigen (abgeleitet von der single-threaded Messung), während die Streaming-Lösung etwa 1μs pro Element benötigt Die Nachricht wird weitergeleitet.
Ein weiteres Missverständnis ist, dass zu sagen „Strom“ Parallelität bedeutet: in Code alle Berechnung läuft nacheinander in einem einzigen Darsteller (die Karte Stufe), so kann kein Vorteil gegenüber der primitiven Single-Threaded-Lösung erwarten.
Um von der Parallelität von Akka Streams Sie gewährt profitieren Notwendigkeit, mehrere Verarbeitungsstufen haben, die jeweils
1 us pro Element Aufgaben von
ausführen, finden Sie auch die Dokumentation.
Ich habe einige Änderungen vorgenommen. Mein Code sieht jetzt aus wie:
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)
val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)
val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
import FlowGraph.Implicits._
val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
val mergeEvents = builder.add(Merge[Int](4))
dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)
(dispatchTuple.in, mergeEvents.out)
}
val sink = Sink.foreach[Int]{
v => counter += 1
oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
if(counter == SharedFunctions.maxEventCount) endAkka()
}
def endAkka() = {
val duration = new Duration(SharedFunctions.startTime, DateTime.now)
println("Time: " + duration.getMillis + " || Data: " + counter)
actorSystem.shutdown
actorSystem.awaitTermination
System.exit(-1)
}
def main(args: Array[String]) {
println("MultiThread started: " + SharedFunctions.startTime)
in.via(flow).runWith(sink)
// in.via(eventChef).runWith(sink)
}
}
ich nicht sicher, ob ich etwas total falsch, aber immer noch meine Implementierung mit akka-Streams ist viel langsamer (jetzt auch nach wie vor langsamer), aber was habe ich herausgefunden ist: Wenn ich Erhöhen Sie die Arbeit zum Beispiel, indem Sie eine Aufteilung vornehmen, die Implementierung mit akka-streams wird schneller. Wenn ich es richtig verstehe (korrigiere mich sonst), scheint es in meinem Beispiel zu viel Aufwand zu geben. Sie profitieren also nur dann von akka-streams, wenn der Code schwer arbeiten muss?
Ich bin in beide scala & akka-Strom relativ neu. Ich habe ein kleines Testprojekt geschrieben, das einige Ereignisse erzeugt, bis ein Zähler eine bestimmte Anzahl erreicht hat. Für jedes Ereignis wird die Fakultät für ein Feld des Ereignisses berechnet. Ich habe das zweimal umgesetzt. Einmal mit akka-stream und einmal ohne akka-stream (single threaded) und die Laufzeit verglichen.
Ich habe nicht erwartet, dass: Wenn ich ein einzelnes Ereignis erstellen, sind die Laufzeit beider Programme fast identisch. Aber wenn ich 70.000.000 Ereignisse erstelle, ist die Implementierung ohne akka-Streams viel schneller.Hier sind meine Ergebnisse (die folgenden Daten anhand von 24 Messungen):
- Einzelereignis ohne Akka-Strom: 403 (+ - 2) ms
Einzelereignis mit akka-streams: 444 (+ -13) ms
70Mio Ereignisse ohne akka Ströme: 11778 (+ -70) ms
- 70Mio Ereignisse mit akka-dampft: 75424 (+ - 2959) ms
Also meine Frage ist: Was ist los? Warum ist meine Implementierung mit akka-stream langsamer?
hier mein Code:
Umsetzung mit Akka
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val sink = Sink.foreach[Int]{
v => counter += 1
oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
if(counter == SharedFunctions.maxEventCount) endAkka()
}
def endAkka() = {
val duration = new Duration(SharedFunctions.startTime, DateTime.now)
println("Time: " + duration.getMillis + " || Data: " + counter)
actorSystem.shutdown
actorSystem.awaitTermination
System.exit(-1)
}
def main(args: Array[String]) {
import scala.concurrent.ExecutionContext.Implicits.global
println("MultiThread started: " + SharedFunctions.startTime)
in.via(flow).runWith(sink).onComplete(_ => endAkka())
}
}
Implementierung ohne Akka
Objekt SingleThread {
def main(args: Array[String]) {
println("SingleThread started at: " + SharedFunctions.startTime)
println("0%")
val i = createEvent(0)
val duration = new Duration(SharedFunctions.startTime, DateTime.now());
println("Time: " + duration.getMillis + " || Data: " + i)
}
def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
if (count == SharedFunctions.maxEventCount) count
else {
val e = SharedFunctions.transform((randDate, name, age, myFloat))
SharedFunctions.transform2(e)
val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)
}
}
def createEvent(count: Int): Int = {
createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)
}
}
SharedFunctions
object SharedFunctions {
val maxEventCount = 70000000
val startTime = DateTime.now
def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
def transform2(e : Event) : Int = factorial(e.getAgeYrs)
def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100)/totalValue)
def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
val cProgress = calculatePercentage(fileSize, currentSize)
if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")
cProgress
}
private def factorialWorker(n1: Int, n2: Int): Int = {
if (n1 == 0) n2
else factorialWorker(n1 -1, n2*n1)
}
def factorial (n : Int): Int = {
factorialWorker(n, 1)
}
}
Implementation Ereignis
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public long timestampMS;
@Deprecated public CharSequence name;
@Deprecated public int ageYrs;
@Deprecated public float sizeCm;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Event() {}
/**
* All-args constructor.
*/
public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
this.timestampMS = timestampMS;
this.name = name;
this.ageYrs = ageYrs;
this.sizeCm = sizeCm;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public Object get(int field$) {
switch (field$) {
case 0: return timestampMS;
case 1: return name;
case 2: return ageYrs;
case 3: return sizeCm;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, Object value$) {
switch (field$) {
case 0: timestampMS = (Long)value$; break;
case 1: name = (CharSequence)value$; break;
case 2: ageYrs = (Integer)value$; break;
case 3: sizeCm = (Float)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'timestampMS' field.
*/
public Long getTimestampMS() {
return timestampMS;
}
/**
* Sets the value of the 'timestampMS' field.
* @param value the value to set.
*/
public void setTimestampMS(Long value) {
this.timestampMS = value;
}
/**
* Gets the value of the 'name' field.
*/
public CharSequence getName() {
return name;
}
/**
* Sets the value of the 'name' field.
* @param value the value to set.
*/
public void setName(CharSequence value) {
this.name = value;
}
/**
* Gets the value of the 'ageYrs' field.
*/
public Integer getAgeYrs() {
return ageYrs;
}
/**
* Sets the value of the 'ageYrs' field.
* @param value the value to set.
*/
public void setAgeYrs(Integer value) {
this.ageYrs = value;
}
/**
* Gets the value of the 'sizeCm' field.
*/
public Float getSizeCm() {
return sizeCm;
}
/**
* Sets the value of the 'sizeCm' field.
* @param value the value to set.
*/
public void setSizeCm(Float value) {
this.sizeCm = value;
}
/** Creates a new Event RecordBuilder */
public static Event.Builder newBuilder() {
return new Event.Builder();
}
/** Creates a new Event RecordBuilder by copying an existing Builder */
public static Event.Builder newBuilder(Event.Builder other) {
return new Event.Builder(other);
}
/** Creates a new Event RecordBuilder by copying an existing Event instance */
public static Event.Builder newBuilder(Event other) {
return new Event.Builder(other);
}
/**
* RecordBuilder for Event instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
implements org.apache.avro.data.RecordBuilder<Event> {
private long timestampMS;
private CharSequence name;
private int ageYrs;
private float sizeCm;
/** Creates a new Builder */
private Builder() {
super(Event.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(Event.Builder other) {
super(other);
if (isValidValue(fields()[0], other.timestampMS)) {
this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.name)) {
this.name = data().deepCopy(fields()[1].schema(), other.name);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.ageYrs)) {
this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.sizeCm)) {
this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
fieldSetFlags()[3] = true;
}
}
/** Creates a Builder by copying an existing Event instance */
private Builder(Event other) {
super(Event.SCHEMA$);
if (isValidValue(fields()[0], other.timestampMS)) {
this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.name)) {
this.name = data().deepCopy(fields()[1].schema(), other.name);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.ageYrs)) {
this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.sizeCm)) {
this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
fieldSetFlags()[3] = true;
}
}
/** Gets the value of the 'timestampMS' field */
public Long getTimestampMS() {
return timestampMS;
}
/** Sets the value of the 'timestampMS' field */
public Event.Builder setTimestampMS(long value) {
validate(fields()[0], value);
this.timestampMS = value;
fieldSetFlags()[0] = true;
return this;
}
/** Checks whether the 'timestampMS' field has been set */
public boolean hasTimestampMS() {
return fieldSetFlags()[0];
}
/** Clears the value of the 'timestampMS' field */
public Event.Builder clearTimestampMS() {
fieldSetFlags()[0] = false;
return this;
}
/** Gets the value of the 'name' field */
public CharSequence getName() {
return name;
}
/** Sets the value of the 'name' field */
public Event.Builder setName(CharSequence value) {
validate(fields()[1], value);
this.name = value;
fieldSetFlags()[1] = true;
return this;
}
/** Checks whether the 'name' field has been set */
public boolean hasName() {
return fieldSetFlags()[1];
}
/** Clears the value of the 'name' field */
public Event.Builder clearName() {
name = null;
fieldSetFlags()[1] = false;
return this;
}
/** Gets the value of the 'ageYrs' field */
public Integer getAgeYrs() {
return ageYrs;
}
/** Sets the value of the 'ageYrs' field */
public Event.Builder setAgeYrs(int value) {
validate(fields()[2], value);
this.ageYrs = value;
fieldSetFlags()[2] = true;
return this;
}
/** Checks whether the 'ageYrs' field has been set */
public boolean hasAgeYrs() {
return fieldSetFlags()[2];
}
/** Clears the value of the 'ageYrs' field */
public Event.Builder clearAgeYrs() {
fieldSetFlags()[2] = false;
return this;
}
/** Gets the value of the 'sizeCm' field */
public Float getSizeCm() {
return sizeCm;
}
/** Sets the value of the 'sizeCm' field */
public Event.Builder setSizeCm(float value) {
validate(fields()[3], value);
this.sizeCm = value;
fieldSetFlags()[3] = true;
return this;
}
/** Checks whether the 'sizeCm' field has been set */
public boolean hasSizeCm() {
return fieldSetFlags()[3];
}
/** Clears the value of the 'sizeCm' field */
public Event.Builder clearSizeCm() {
fieldSetFlags()[3] = false;
return this;
}
@Override
public Event build() {
try {
Event record = new Event();
record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
}
Nur der Vollständigkeit halber, können Sie bitte die Ereignisdefinition angeben. Ich möchte versuchen, Ihren Multithread-Code zu optimieren ... –
sicher, ich habe es hinzugefügt –