2016-04-26 9 views
1

Ich habe eine Datei, bestehend aus 3 Feldern (Emp_ids, Gruppen, Gehälter)Funken: Aggregieren auf einer Säule basiert

  • 100 A 430
  • 101 A 500
  • 201 B 300

führen zu erhalten, wie

1) Gruppenname

ich will und COUNT (*)

Gruppe

2) Name und max (Gehalt)

val myfile = "/home/hduser/ScalaDemo/Salary.txt" 
val conf = new SparkConf().setAppName("Salary").setMaster("local[2]") 
val sc= new SparkContext(conf) 
val sal= sc.textFile(myfile) 

Antwort

2

Scala DSL:

case class Data(empId: Int, group: String, salary: Int) 
val df = sqlContext.createDataFrame(lst.map {v => 
    val arr = v.split(' ').map(_.trim()) 
    Data(arr(0).toInt, arr(1), arr(2).toInt) 
    }) 
df.show() 
+-----+-----+------+ 
|empId|group|salary| 
+-----+-----+------+ 
| 100| A| 430| 
| 101| A| 500| 
| 201| B| 300| 
+-----+-----+------+ 

df.groupBy($"group").agg(count("*") as "count").show() 
+-----+-----+ 
|group|count| 
+-----+-----+ 
| A| 2| 
| B| 1| 
+-----+-----+ 


df.groupBy($"group").agg(max($"salary") as "maxSalary").show() 
+-----+---------+ 
|group|maxSalary| 
+-----+---------+ 
| A|  500| 
| B|  300| 
+-----+---------+ 

Oder mit einfacher SQL:

df.registerTempTable("salaries") 

sqlContext.sql("select group, count(*) as count from salaries group by group").show() 
+-----+-----+ 
|group|count| 
+-----+-----+ 
| A| 2| 
| B| 1| 
+-----+-----+ 

sqlContext.sql("select group, max(salary) as maxSalary from salaries group by group").show() 
+-----+---------+ 
|group|maxSalary| 
+-----+---------+ 
| A|  500| 
| B|  300| 
+-----+---------+ 

Während Spark-SQL ist Art und Weise empfohlen solche Aggregationen zu tun Aus Leistungsgründen kann dies problemlos mit der RDD-API durchgeführt werden:

val rdd = sc.parallelize(Seq(Data(100, "A", 430), Data(101, "A", 500), Data(201, "B", 300))) 

rdd.map(v => (v.group, 1)).reduceByKey(_ + _).collect() 
res0: Array[(String, Int)] = Array((B,1), (A,2)) 

rdd.map(v => (v.group, v.salary)).reduceByKey((s1, s2) => if (s1 > s2) s1 else s2).collect() 
res1: Array[(String, Int)] = Array((B,300), (A,500)) 
+0

Ich versuche, dies ohne Sql Kontext zu tun. – jeet

+0

Überprüfen Sie meine letzte Aktualisierung bitte –

+0

Danke. Wie führe ich den Spark SQL als Batch-Datei über die Eingabeaufforderung aus oder plane ihn im Farbton? – jeet

Verwandte Themen