2017-03-18 8 views
0

Ich habe eine Cassandra Tabelle XYX mit Spalten ( id UUID, einen Zeitstempel einzufügen, Kopftext)Duplikate entfernen, ohne Shuffle Funken

Wo id und Einsatz sind zusammengesetzte Primärschlüssel.

Ich benutze Dataframe und in meiner Spark-Shell holen ich ID und Header-Spalte. Ich möchte verschiedene Zeilen basierend auf ID und Header-Spalte haben.

Ich sehe viele Shuffles, die nicht der Fall sein, da Spark Cassandra-Connector sicherstellt, dass alle Zeilen für eine bestimmte Cassandra-Partition in derselben Spark-Partition sind.

Nach dem Abrufen verwende ich DropDuplicates, um verschiedene Datensätze zu erhalten.

Antwort

0

Spark Dataframe API unterstützt noch keine benutzerdefinierten Partitionierer. Daher konnte der Connector den C * -Partitionierer nicht in die Dataframe-Engine einführen. Eine RDD Spark-API unterstützt benutzerdefinierte Partitionierung von anderer Seite. So könnten Sie Ihre Daten in RDD laden und dann zu df konvertieren. Hier ist ein Stecker doc über C * Partitionierungs Nutzung: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md

keyBy() Funktion ermöglicht es Ihnen, Schlüsselspalten zu definieren, für die Gruppierung

Hier verwenden arbeitet Beispiel. Es ist nicht kurz, also erwarte ich, dass jemand es verbessern könnte:

//load data into RDD and define a group key 
val rdd = sc.cassandraTable[(String, String)] ("test", "test") 
    .select("id" as "_1", "header" as "_2") 
    .keyBy[Tuple1[Int]]("id") 
// check that partitioner is CassandraPartitioner 
rdd.partitioner 
// call distinct for each group, flat it, get two column DF 
val df = rdd.groupByKey.flatMap {case (key,group) => group.toSeq.distinct} 
    .toDF("id", "header")