2016-09-19 5 views
4

Ich habe Setup-Spark-2.0 und Cassandra 3.0 auf einem lokalen Rechner (8 Cores, 16 GB RAM) zum Testen und bearbeitet spark-defaults.conf wie folgt:Funke: PySpark + Cassandra Abfrageleistung

spark.python.worker.memory 1g 
spark.executor.cores 4 
spark.executor.instances 4 
spark.sql.shuffle.partitions 4 

Next I 1.500.000 importiert in Cassandra Reihen:

test(
    tid int, 
    cid int, 
    pid int, 
    ev list<double>, 
    primary key (tid) 
) 

test.ev ist eine Liste numerischer Werte, dh [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]

Jetzt im Kabeljau enthält Ich habe gerade eine SparkSession, verbunden mit Cassandra e, um das Ganze zu testen und eine einfache select count machen:

cassandra = spark.read.format("org.apache.spark.sql.cassandra") 
df = cassandra.load(keyspace="testks",table="test") 
df.select().count() 

An diesem Punkt Funken gibt das count und dauert etwa 28 Sekunden, um die Job zu beenden, verteilt in 13 Tasks (in Spark UI, die Gesamt Eingang für die Aufgaben ist 331.6MB)

Fragen:

  • ist das die erwartete pro Formung? Wenn nicht, was fehlt mir?
  • Die Theorie sagt die Anzahl der Partitionen eines DataFrame bestimmt die Anzahl der Aufgaben Spark verteilt den Job in. Wenn ich die spark.sql.shuffle.partitions auf 4 setzen, warum 13 Aufgaben erstellen? (Auch stellte sicher, dass die Anzahl der Partitionen rdd.getNumPartitions() auf meinem Dataframe Aufruf)

aktualisieren

Eine gemeinsame Operation würde Ich mag diese Daten testen über:

  • Abfrage eine große Datensatz, sagen wir, von 100.000 ~ N Zeilen gruppiert von pid
  • Wählen Sie ev, ein list<double>
  • an jedem Element eine durchschnittliche Durchführen, inzwischen jeder Liste unter der Annahme, hat die gleiche Länge dh df.groupBy('pid').agg(avg(df['ev'][1]))

Als @ zero323 vorgeschlagen, ich eine externe Maschine (2 GB RAM, 4 Kerne, SSD) im Einsatz mit Cassandra nur für diesen Test, und den gleichen Datensatz geladen. Das Ergebnis der df.select().count() war eine erwartete größere Latenz und insgesamt schlechtere Leistung im Vergleich zu meinem vorherigen Test (dauerte etwa 70 Sekunden, um die Job zu beenden).

Edit: Ich missverstanden seinen Vorschlag. @ zero323 bedeuteten die Zählung statt mit Spark-SQL lassen Cassandra zuführen, wie in here

Auch erklärte ich, dass ich bin mir bewusst, das inhärenten anti-Musters der Einrichtung eine list<double> stattdessen eine breite Reihe, darauf hinzuweisen wollte Diese Art von Daten, aber meine Bedenken in diesem Moment sind mehr die Zeit, die für das Abrufen eines großen Datasets verbracht wird, als die tatsächliche durchschnittliche Berechnungszeit.

+0

Wenn Sie eine Zählung durchführen möchten, ist die Abfrage externer Quellen viel effizienter. Im Allgemeinen hängt viel davon ab, was Sie tun. Bei Partitionen wird 'spark.sql.shuffle.partitions' hier nicht verwendet. Die anfängliche Anzahl von Partitionen wird durch die Datenquelle festgelegt und die Anzahl verwendet immer 1 Task für die endgültige Aggregation. – zero323

+0

danke nochmal @ zero323. Bitte überprüfe mein Update. Wenn Sie das richtig verstanden haben, sagen Sie, dass die Anzahl der Partitionen von Cassandra festgelegt wurde? – TMichel

+0

OK zero323

Antwort

3

Ist das die erwartete Leistung? Wenn nicht, was fehlt mir?

Es sieht langsam aus, aber es ist nicht gerade unerwartet. Im allgemeinen wird ausgedrückt als count

SELECT 1 FROM table 

Spark Seiten Summierung gefolgt. Während es optimiert ist, ist es immer noch ziemlich ineffizient, weil Sie N lange Integer aus der externen Quelle abgerufen haben, um diese lokal zu summieren.

Wie von the docs erklärt Cassandra unterstützt RDD (nicht Datasets) bieten optimierte cassandraCount Methode, die Server Side Counting durchführt.

Theorie sagt die Anzahl der Partitionen eines Datenrahmens die Anzahl der Aufgaben Funken ermittelt wird den Job in verteilen. Wenn ich die spark.sql.shuffle.partitions zu (...) bin die Einrichtung, warum schafft (...) Aufgaben?

Da spark.sql.shuffle.partitions hier nicht verwendet wird. Diese Eigenschaft wird verwendet, um die Anzahl der Partitionen für Shuffle zu bestimmen (wenn Daten durch eine Menge von Schlüsseln aggregiert werden), nicht für Dataset Erstellung oder globale Aggregationen wie count(*) (die immer 1 Partition für die endgültige Aggregation verwenden).

Wenn Sie Interesse Anzahl anfänglicher Partitionen in Steuern Sie einen Blick auf spark.cassandra.input.split.size_in_mb nehmen sollen, die definiert:

Ca. Menge von Daten in eine Spark-Partition geholt werden. Mindestanzahl der resultierenden Funke Partitionen ist 1 + 2 * SparkContext.defaultParallelism

Wie Sie ein weiterer Faktor hier sehen spark.default.parallelism ist, aber es ist nicht gerade eine subtile Konfiguration so abhängig von ihm in der Regel keine optimale Wahl.

+1

Das war wirklich, wirklich illustrativ. Vielen Dank. – TMichel

Verwandte Themen