2016-12-14 2 views
0

Ich arbeite an Temperaturprognosedaten mit PySpark.Reduziere ResultIterable Objekte nach groupByKey in PySpark

Die rohen Temperaturdaten in folgendem Format:

station;date;time,temperature;quality 
102170;2012-11-01;06:00:00;6.8;G 
102185;2012-11-02;06:00:00;5.8;G 
102170;2013-11-01;18:00:00;2.8;G 
102185;2013-11-01;18:00:00;7.8;G 

Das Ziel Ergebnis ist immer die Min-/Max-Temperatur für jedes Jahr, erwähnten in der Station, wie die folgenden:

year;station;max_temp 
2013;102185;7.8 
2012;102170;6.8 

Mein aktueller Code lautet wie folgt:

sc = SparkContext(appName="maxMin") 
lines = sc.textFile('data/temperature-readings.csv') 
lines = lines.map(lambda a: a.split(";")) 
lines = lines.filter(lambda x: int(x[1][0:4]) >= 1950 and int(x[1][0:4]) <= 2014) 
temperatures = lines.map(lambda x: (x[1][0:4], (x[0], float(x[3])))) 

bisher das Ergebnis wie folgt:

temperatures.take(4) 

(2012, (102170,6.8)) 
(2012, (102185,5.8)) 
(2013, (102170,2.8)) 
(2013, (102185,7.8)) 

Nachdem durch Tastengruppierung wird der wie folgt zusammen:

temperatures = temperatures.groupByKey() 
temperatures.take(2) 

[(u'2012', <pyspark.resultiterable.ResultIterable object at 0x2a0be50>), 
(u'2013', <pyspark.resultiterable.ResultIterable object at 0x2a0bc50>)] 

So, wie ich diese resultiterable Objekte reduzieren kann nur das Element mit min oder max Temperatur zu bekommen.

+0

gibt es einen Grund für die Verwendung von 'rdd's anstelle von' DataFrame's? – mtoto

+0

Glaubst du, dass es einen Unterschied machen wird? –

Antwort

1

Nur nicht. Verwenden Sie reduzieren um Schlüssel:

lines.map(lambda x: (x[1][0:4], (x[0], float(x[3])))).map(lambda x: (x, x)) \ 
    .reduceByKey(lambda x, y: (
     min(x[0], y[0], key=lambda x: x[1]), 
     max(x[1], y[1], , key=lambda x: x[1]))) 
Verwandte Themen