2016-12-01 5 views
1

Ich muss Daten aus Chargen für einige Zeit für spätere Verarbeitung akkumulieren. Ich benutze Spark 1.6.3.
Ich muss sie in Form (tag, [[time, value],..] akkumulieren. Bisher habe ich updateStateByKey versucht:Speichern der Daten der Charge in pyspark

time = [0] 
def updateFunc(new_values, last_sum,time): 
    time[0] += 5 
    if time == 10: 
     time = 0 
     return None 
    return (last_sum or []) + new_values 

data = lines.flatMap(lambda line: line.split(" "))\ 
        .map(lambda word: (word, ['t','t1'])) \ 
        .updateStateByKey(lambda x,y :updateFunc(x,y,time)) 
data.pprint() 

Die Daten hinzugefügt wird. Aber Versuch, Daten nach 10 Sekunden zu löschen, funktioniert nicht. (Ich tue es falsch)

Auch habe ich versucht, window zu verwenden:

data= lines.flatMap(lambda lime: line.split(' ')\ 
    .map(lambda tag: (tag: ['time', 'value']))\ 
    .window(10, 2)\ 
    .reduceByKey(lambda x,y : y + x)` 


Aber dies ergibt eine eindimensionale lange Liste. Was nicht nützlich ist. Irgendwelche Leads? Vielen Dank.

Antwort

0
items = lines.flatMap(lambda x: list(x)).map(lambda x: (x, [('time', 'value')])) 
counts = items.reduceByKeyAndWindow(lambda x, y: x + y, invFunc=None, windowDuration=3, slideDuration=2) 

Versuchen Sie, diese

Verwandte Themen