2015-01-27 6 views
5

Ich habe ein sehr einfaches Spark-Programm (mit Flambo in Clojure, sollte aber leicht zu folgen sein). Dies sind alles Objekte auf der JVM. Ich teste auf einer local Instanz (obwohl ich rate, dass Spark immer noch serialisiert und deserialisiert).Verwenden von JodaTime in Sparks groupByKey und countByKey

Eingabe ist eine RDD von vier Tupeln mit jeweils demselben Datumsobjekt. Die erste Map erzeugt eine Schlüsselwert-RDD von date => x.

Der Inhalt input wird, wie erwartet:

=> (= dt dt) 
true 
=> (.hashCode dt) 
1260848926 
=> (.hashCode dt) 
1260848926 

Sie sind Fälle von JodaTime des DateTime, die implement equals as expected:

=> (f/foreach input prn) 
[#<DateTime 2014-01-01T00:00:00.000Z> "A"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "B"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "C"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "D"] 

Gerade klar, Gleichheit und .hashCode Arbeit am Tag Objekt zu sein .

Wenn ich versuche, countByKey, erhalte ich die erwartete:

=> (f/count-by-key by-date) 
{#<DateTime 2014-01-01T00:00:00.000Z> 4} 

Aber wenn ich groupByKey, es scheint nicht zu funktionieren.

=> (f/foreach (f/group-by-key by-date) prn) 
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]] 

Die Tasten sind alle identisch, so würde ich das Ergebnis erwartet ein einzelner Eintrag mit dem Datum als Schlüssel und ["A", "B", "C", "D"] als Wert zu sein. Etwas passiert, weil die Werte alle Listen sind.

Irgendwie entspricht groupByKey nicht richtig den Schlüsseln. Aber countByKey ist. Was ist der Unterschied zwischen den beiden? Wie kann ich sie dazu bringen, sich gleich zu verhalten?

Irgendwelche Ideen?

Antwort

3

Ich komme näher auf eine Antwort. Ich denke, das gehört eher in den Antwortbereich als in den Fragebereich.

Diese Gruppen nach Schlüssel, verwandelt sich in eine lokale sammeln, extrahiert das erste Element (Datum).

=> (def result-dates (map first (f/collect (f/group-by-key by-date)))) 
=> result-dates 
(#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z>) 

Die Hashcodes sind alle die gleichen

=> (map #(.hashCode %) result-dates) 
(1260848926 
1260848926 
1260848926 
1260848926) 

Die Millisekunden sind alle gleich:

=> (map #(.getMillis %) result-dates) 
(1388534400000 
1388534400000 
1388534400000 
1388534400000) 

equals ausfällt, aber isEquals gelingt

=> (.isEqual (first result-dates) (second result-dates)) 
true 

=> (.equals (first result-dates) (second result-dates)) 
false 

documentation for .equals says:

Vergleicht dieses Objekt mit dem angegebenen Objekt für Gleichheit auf der Grundlage des Millisekunden-Zeitpunkt und das Chronologie

Ihre Millisekunden sind alle gleich und ihre Chronologien zu sein scheinen:

=> (map #(.getChronology %) result-dates) 
(#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]>) 

Allerdings sind die Chronologien nicht gleichzusetzen.

=> (def a (first result-dates)) 
=> (def b (second result-dates)) 

=> (= (.getChronology a) (.getChronology b)) 
false 

Obwohl die Hashcodes tun

=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b))) 
true 

Aber joda.time.Chronology nicht its own equals method bereitstellt und erbt sie von Object, die nur Referenz Gleichheit verwendet.

Meine Theorie ist, dass diese Daten alle mit ihren eigenen individuellen, verschiedenen, konstruierten Chronologie-Objekten deserialisiert werden, aber JodaTime hat its own serializer, was wahrscheinlich damit zu tun hat. Vielleicht würde ein kundenspezifischer Kryo Serializer in dieser Hinsicht helfen.

Vorerst meine Lösung JodaTime in Spark verwendet, ist org.joda.time .Instant zu verwenden, indem toInstant oder ein java.util.Date eher als ein org.joda.time.DateTime Aufruf.

Beide beinhalten Zeitzone Informationen wegwerfen, die nicht ideal ist, also wenn jemand mehr Informationen hat, wäre es sehr willkommen!

+0

vielleicht könnten Sie Epoch Times in Millis-Darstellung anstelle von Datum/Uhrzeit-Objekten verwenden. Sieht nach der sichereren Alternative aus. Wir hatten Probleme, Daten mit anderen Hashes basierend auf dem Speicherort, wie Java-Enums, zu verschlüsseln. Sie funktionieren nicht in einer verteilten Umgebung. – maasg

+0

Danke, das habe ich (glaube ich) mit Instant vorgeschlagen. Gut zu wissen, dass ich nicht der Einzige mit diesem Problem bin! – Joe

+0

Verwenden Sie eine heterogene Mischung aus Chronologien und Zeitzonen innerhalb einer RDD? Wenn nicht, würde ich diese Informationen auf der RDD-Ebene beibehalten und den Speicherbedarf jedes Datensatzes speichern (wie Sie es mit "Instant" tun). – climbage

Verwandte Themen