Ich muss eine Spark-Anwendung entwickeln, ich muss Spark 1.3 verwenden und so kann ich keine Fensterfunktionen verwenden. Ich habe gewählt, um über einzelne Gruppen von Elementen zu iterieren, erzeugte Gruppierung durch Schlüssel die rdd. Die Lösung, die ich bis jetzt gefunden habe, ist, die Schlüssel zu sammeln und dann die entsprechende RDD mit einem Nachschlag (Schlüssel) zu nehmen. Ich weiß, dass mein Ansatz sehr ineffizient ist, aber ich weiß nicht, wie ich meine Funktion anwende, die den RDD in eine Liste konvertiert und dann eine andere Liste auf eine andere Weise zurückgibt.Spark iterieren über RDD gruppiert nach Schlüsseln
logon_dhcp = logons.map(lambda logon: (logon.user, (logon.dhcpscopename, logon.city, logon.timestamp)))
logon_dhcp = logon_dhcp.groupByKey()
dhcp_change_list = []
for key in logon_dhcp.keys().collect():
new_list = dhcp_changed(key,logon_dhcp.lookup(key))
dhcp_change_list = list(set().union(dhcp_change_list,new_list))
def dhcp_changed(key,group):
values = list(group[0])
values_sorted = sorted(values, key=lambda tup: tup[2])
prevCity = None
prevValue = None
prevTime = None
res = list()
for value in values_sorted:
if prevCity != None and prevCity != value[1] and notEnoughTime(prevTime,value[2]):
res.append((key, prevTime.strftime('%Y-%m-%d %H:%M:%S'), prevCity, value[2].strftime('%Y-%m-%d %H:%M:%S'), value[1]))
prevCity = value[1]
prevTime = value[2]
prevValue = value
return res
Wie kann ich das gleiche mit wie aggregateByKey() tun?