Ich habe Setup-Spark-2.0 und Cassandra 3.0 auf einem lokalen Rechner (8 Cores, 16 GB RAM) zum Testen und bearbeitet spark-defaults.conf
wie folgt:Funke: PySpark + Cassandra Abfrageleistung
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
Next I 1.500.000 importiert in Cassandra Reihen:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev
ist eine Liste numerischer Werte, dh [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
Jetzt im Kabeljau enthält Ich habe gerade eine SparkSession
, verbunden mit Cassandra e, um das Ganze zu testen und eine einfache select count machen:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
An diesem Punkt Funken gibt das count
und dauert etwa 28 Sekunden, um die Job
zu beenden, verteilt in 13 Tasks
(in Spark UI
, die Gesamt Eingang für die Aufgaben ist 331.6MB)
Fragen:
- ist das die erwartete pro Formung? Wenn nicht, was fehlt mir?
- Die Theorie sagt die Anzahl der Partitionen eines DataFrame bestimmt die Anzahl der Aufgaben Spark verteilt den Job in. Wenn ich die
spark.sql.shuffle.partitions
auf 4 setzen, warum 13 Aufgaben erstellen? (Auch stellte sicher, dass die Anzahl der Partitionenrdd.getNumPartitions()
auf meinem Dataframe Aufruf)
aktualisieren
Eine gemeinsame Operation würde Ich mag diese Daten testen über:
- Abfrage eine große Datensatz, sagen wir, von 100.000 ~ N Zeilen gruppiert von
pid
- Wählen Sie
ev
, einlist<double>
- an jedem Element eine durchschnittliche Durchführen, inzwischen jeder Liste unter der Annahme, hat die gleiche Länge dh
df.groupBy('pid').agg(avg(df['ev'][1]))
Als
@ zero323 vorgeschlagen, ich eine externe Maschine (2 GB RAM, 4 Kerne, SSD) im Einsatz mit Cassandra nur für diesen Test, und den gleichen Datensatz geladen. Das Ergebnis der
df.select().count()
war eine erwartete größere Latenz und insgesamt schlechtere Leistung im Vergleich zu meinem vorherigen Test (dauerte etwa 70 Sekunden, um die
Job
zu beenden).
Edit: Ich missverstanden seinen Vorschlag. @ zero323 bedeuteten die Zählung statt mit Spark-SQL lassen Cassandra zuführen, wie in here
Auch erklärte ich, dass ich bin mir bewusst, das inhärenten anti-Musters der Einrichtung eine list<double>
stattdessen eine breite Reihe, darauf hinzuweisen wollte Diese Art von Daten, aber meine Bedenken in diesem Moment sind mehr die Zeit, die für das Abrufen eines großen Datasets verbracht wird, als die tatsächliche durchschnittliche Berechnungszeit.
Wenn Sie eine Zählung durchführen möchten, ist die Abfrage externer Quellen viel effizienter. Im Allgemeinen hängt viel davon ab, was Sie tun. Bei Partitionen wird 'spark.sql.shuffle.partitions' hier nicht verwendet. Die anfängliche Anzahl von Partitionen wird durch die Datenquelle festgelegt und die Anzahl verwendet immer 1 Task für die endgültige Aggregation. – zero323
danke nochmal @ zero323. Bitte überprüfe mein Update. Wenn Sie das richtig verstanden haben, sagen Sie, dass die Anzahl der Partitionen von Cassandra festgelegt wurde? – TMichel
OK
zero323