2016-05-13 7 views
0

Ich habe eine Textdatei (61 GB) enthält auf jeder Zeile, eine Zeichenfolge, die ein Datum, z. Do 16. Dezember 18:53:32 +0000 2010Wie Frequenzen eines Tages eines bestimmten Jahres mit mapreduce und pyspark zu finden

Das Iterieren der Datei auf einem einzelnen Kern würde zu lange dauern, daher würde ich gerne Pyspark und die Mapreduce-Technologie verwenden, um schnell Zeilenhäufigkeiten für einen bestimmten Tag zu finden Jahr.

Was ich denke, ist ein guter Anfang:

import dateutil.parser 
text_file = sc.textFile('dates.txt') 
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line)) \ 
     .map(lambda date: date + 1) \ 
     .reduceByKey(lambda a, b: a + b) 

Leider kann ich nicht verstehen, wie man auf einem bestimmten Jahr filtern und durch Schlüssel reduzieren. Der Schlüssel ist der Tag.

Beispiel Ausgabe:

Do 16. Dezember 26543

Do 17. Dezember 345 usw.

Antwort

2

Wie angespielt in another answer, dateutil.parser.parse gibt eine datetime object zurück, diehat 10, month und day Attribute:

>>> dt = dateutil.parser.parse('Thu Dec 16 18:53:32 +0000 2010') 
>>> dt.year 
2010 
>>> dt.month 
12 
>>> dt.day 
16 

mit diesem RDD Beginn:

>>> rdd = sc.parallelize([ 
...  'Thu Oct 21 5:12:38 +0000 2010', 
...  'Thu Oct 21 4:12:38 +0000 2010', 
...  'Wed Sep 22 15:46:40 +0000 2010', 
...  'Sun Sep 4 22:28:48 +0000 2011', 
...  'Sun Sep 4 21:28:48 +0000 2011']) 

Hier ist, wie Sie die Zählungen für alle Jahr-Monat-Tag-Kombinationen bekommen können:

>>> from operator import attrgetter 
>>> counts = rdd.map(dateutil.parser.parse).map(
...  attrgetter('year', 'month', 'day')).countByValue() 
>>> counts 
defaultdict(<type 'int'>, {(2010, 9, 22): 1, (2010, 10, 21): 2, (2011, 9, 4): 2}) 

Um die gewünschte Ausgabe zu erhalten:

>>> for k, v in counts.iteritems(): 
...  print datetime.datetime(*k).strftime('%a %b %y'), v 
... 
Wed Sep 10 1 
Thu Oct 10 2 
Sun Sep 11 2 

Wenn Sie zählen wollen für nur ein bestimmtes Jahr, können Sie die RDD filtern, bevor die Zählung tun:

>>> counts = rdd.map(dateutil.parser.parse).map(
... attrgetter('year', 'month', 'day')).filter(
... lambda (y, m, d): y == 2010).countByValue() 
>>> counts 
defaultdict(<type 'int'>, {(2010, 9, 22): 1, (2010, 10, 21): 2}) 
1

Etwas nach dem Vorbild hierfür könnte ein guter Anfang sein:

import dateutil.parser 
text_file = sc.textFile('dates.txt') 
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line)) 
    .keyBy((_.year, _.month, _.day)) // somehow get the year, month, day to key by 
    .countByKey() 
0

ich, dass dateutil hinzufügen sollte in Python nicht Standard ist. Wenn Sie Sudo nicht direkt auf Ihrem Cluster haben, könnte dies ein Problem darstellen. Als Lösung würde Ich mag Datetime schlagen mit:

import datetime 
def parse_line(d): 
    f = "%a %b %d %X %Y" 
    date_list = d.split() 
    date = date_list[:4] 
    date.append(date_list[5]) 
    date = ' '.join(date) 
    return datetime.datetime.strptime(date, f) 

counts = rdd.map(parse_line)\ 
    .map(attrgetter('year', 'month', 'day'))\ 
    .filter(lambda (y, m, d): y == 2015)\ 
    .countByValue() 

Ich interessiere mich für bessere Lösungen mit: Parkett, Reihe/Spalten usw.

Verwandte Themen