Ich muss Daten aus Chargen für einige Zeit für spätere Verarbeitung akkumulieren. Ich benutze Spark 1.6.3.
Ich muss sie in Form (tag, [[time, value],..]
akkumulieren. Bisher habe ich updateStateByKey
versucht:Speichern der Daten der Charge in pyspark
time = [0]
def updateFunc(new_values, last_sum,time):
time[0] += 5
if time == 10:
time = 0
return None
return (last_sum or []) + new_values
data = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, ['t','t1'])) \
.updateStateByKey(lambda x,y :updateFunc(x,y,time))
data.pprint()
Die Daten hinzugefügt wird. Aber Versuch, Daten nach 10 Sekunden zu löschen, funktioniert nicht. (Ich tue es falsch)
Auch habe ich versucht, window
zu verwenden:
data= lines.flatMap(lambda lime: line.split(' ')\
.map(lambda tag: (tag: ['time', 'value']))\
.window(10, 2)\
.reduceByKey(lambda x,y : y + x)`
Aber dies ergibt eine eindimensionale lange Liste. Was nicht nützlich ist. Irgendwelche Leads? Vielen Dank.