2016-08-02 4 views
0

Ich habe Datensatz wie 10, "Name", 2016, "Country" 11, "Name1", 2016, "country1" 10, "Name", 2016, "Country" 10, "Name", 2016, "Country" 12, "Name2", 2017, "country2"Funken Finden Sie doppelten Datensätze für ein Feld in RDD

Mein Problem Aussage ist, ich habe Gesamtzählung zu finden und Duplikate von Jahr zählen. Mein Ergebnis sollte sein (Jahr, Gesamtzahlen, Duplikate) 2016,4,3 2017,1,0.

Ich habe versucht, dieses Problem durch

bis zu 10 GB Daten
val records = rdd.map { 
       x => 
       val array = x.split(",") 
       (array(2),x) 
      }.groupByKey() 
val duplicates = records.map { 
       x => val totalcount = x._2.size 
         val duplicates = // find duplicates in iterator 
        (x._1,totalcount,duplicates) 
       } 

Es läuft gut zu lösen. Wenn ich es mit mehr Daten ausführe, dauert es lange. Ich fand, dass groupByKey nicht der beste Ansatz ist.

Bitte schlagen Sie den besten Ansatz vor, um dieses Problem zu lösen.

Antwort

0

Ich bin nicht ganz der SQL-Experte, um die Duplikate in der Art zu zählen, wie Ihr Beispiel zeigt. Ich denke jedoch, dass Sie damit anfangen werden, Datenrahmen zu verwenden. Mein Verständnis ist, dass Datenrahmen deutlich besser als nur RDD durchführen können.

scala> import com.databricks.spark.csv._ 
import com.databricks.spark.csv._ 

scala> 

scala> val s = List("""10,"Name",2016,"Country"""", """11,"Name1",2016,"country1"""", """10,"Name",2016,"Country"""", """10,"Name",2016,"Country"""", """12,"Name2",2017,"Country2"""") 
s: List[String] = List(10,"Name",2016,"Country", 11,"Name1",2016,"country1", 10,"Name",2016,"Country", 10,"Name",2016,"Country", 12,"Name2",2017,"Country2") 

scala> val rdd = sc.parallelize(s) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[92] at parallelize at <console>:38 

scala> 

scala> val df = new CsvParser().withDelimiter(',').withInferSchema(true).withParseMode("DROPMALFORMED").csvRdd(sqlContext, rdd) 
df: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> 

scala> df.registerTempTable("test") 

scala> 

scala> val dfCount = sqlContext.sql("select C2, count(*), count(distinct C0,C2,C1,C3) from test group by C2") 
dfCount: org.apache.spark.sql.DataFrame = [C2: int, _c1: bigint, _c2: bigint] 

scala> 

scala> dfCount.show 
+----+---+---+                 
| C2|_c1|_c2| 
+----+---+---+ 
|2016| 4| 2| 
|2017| 1| 1| 
+----+---+---+ 
Verwandte Themen