2017-01-30 5 views
0

Ich muss eine Spark-Anwendung entwickeln, ich muss Spark 1.3 verwenden und so kann ich keine Fensterfunktionen verwenden. Ich habe gewählt, um über einzelne Gruppen von Elementen zu iterieren, erzeugte Gruppierung durch Schlüssel die rdd. Die Lösung, die ich bis jetzt gefunden habe, ist, die Schlüssel zu sammeln und dann die entsprechende RDD mit einem Nachschlag (Schlüssel) zu nehmen. Ich weiß, dass mein Ansatz sehr ineffizient ist, aber ich weiß nicht, wie ich meine Funktion anwende, die den RDD in eine Liste konvertiert und dann eine andere Liste auf eine andere Weise zurückgibt.Spark iterieren über RDD gruppiert nach Schlüsseln

logon_dhcp = logons.map(lambda logon: (logon.user, (logon.dhcpscopename, logon.city, logon.timestamp))) 
logon_dhcp = logon_dhcp.groupByKey() 

dhcp_change_list = [] 
for key in logon_dhcp.keys().collect(): 
    new_list = dhcp_changed(key,logon_dhcp.lookup(key)) 
    dhcp_change_list = list(set().union(dhcp_change_list,new_list)) 

def dhcp_changed(key,group): 
    values = list(group[0]) 
    values_sorted = sorted(values, key=lambda tup: tup[2]) 
    prevCity = None 
    prevValue = None 
    prevTime = None 
    res = list() 
    for value in values_sorted: 
     if prevCity != None and prevCity != value[1] and notEnoughTime(prevTime,value[2]): 
      res.append((key, prevTime.strftime('%Y-%m-%d %H:%M:%S'), prevCity, value[2].strftime('%Y-%m-%d %H:%M:%S'), value[1])) 
     prevCity = value[1] 
     prevTime = value[2] 
     prevValue = value 
    return res 

Wie kann ich das gleiche mit wie aggregateByKey() tun?

Antwort

0

Ok, eine einfache Karte funktioniert, da die RDD ist bereits im Format (Schlüssel, IterableList)

result = logon_dhcp.map(lambda x: dhcp_changed(x)) 

mit der Funktion modifiziert:

def dchp_changed(group): 
    key = str(group[0]) 
    values = list(group[1]) 

Jeder Vorschlag, mein Code ist zu verbessern Leistung ist willkommen