2017-07-03 5 views
1

Ich habe folgendes Pojo:Wie aggregiert man am Tag?

public class MyPojo { 
    Date startDate; 
    Double usageAMount; 
    // ... bla bla bla 
} 

So habe ich eine Liste von MyPojo Objekte, auf eine Funktion als Argument übergeben:

public Map<Date, Double> getWeeklyCost(@NotNull List<MyPojo> reports) { 
     JavaRDD<MyPojo> rdd = context.parallelize(reports); 
     JavaPairRDD<Date, Double> result = rdd.mapToPair(
       (PairFunction<MyPojo, Date, Double>) x -> 
         new Tuple2<>(x.getStartDate(), x.getUsageAmount())) 
       .reduceByKey((Function2<Double, Double, Double>) (x, y) -> x + y); 

     return result.collectAsMap(); 
} 
jedoch

, kehre ich so etwas wie:

"2017-06-28T22:00:00.000+0000": 0.02916666, 
"2017-06-29T16:00:00.000+0000": 0.02916666, 
"2017-06-27T13:00:00.000+0000": 0.03888888, 
"2017-06-26T05:00:00.000+0000": 0.05833332000000001, 
"2017-06-28T21:00:00.000+0000": 0.03888888, 
"2017-06-27T02:00:00.000+0000": 0.03888888, 
"2017-06-28T03:00:00.000+0000": 0.07777776000000002, 
"2017-06-28T20:00:00.000+0000": 0.01944444, 
"2017-06-30T04:00:00.000+0000": 0.00972222, 
"2017-06-28T02:00:00.000+0000": 0.05833332000000001, 
"2017-06-29T21:00:00.000+0000": 0.03888888, 
"2017-06-29T23:00:00.000+0000": 0.06805554000000001, 
"2017-06-27T00:00:00.000+0000": 0.05833332000000001, 
"2017-06-26T06:00:00.000+0000": 0.03888888, 
"2017-06-28T01:00:00.000+0000": 0.09722220000000002, 
"2017-06-29T22:00:00.000+0000": 0.01944444, 
"2017-06-28T00:00:00.000+0000": 0.11666664000000003, 
"2017-06-27T12:00:00.000+0000": 0.01944444, 
"2017-06-26T11:00:00.000+0000": 0.01944444, 
"2017-06-29T03:00:00.000+0000": 0.01944444, 
"2017-06-26T04:00:00.000+0000": 0.07777776000000002, 
"2017-06-27T19:00:00.000+0000": 0.01944444, 
"2017-06-29T20:00:00.000+0000": 0.048611100000000004, 
"2017-06-29T02:00:00.000+0000": 0.02916666, 
"2017-06-29T15:00:00.000+0000": 0.01944444, 
"2017-06-27T17:00:00.000+0000": 0.01944444, 
"2017-06-29T14:00:00.000+0000": 0.02916666, 
"2017-06-30T01:00:00.000+0000": 0.02916666, 
"2017-06-29T00:00:00.000+0000": 0.01944444, 
"2017-06-27T18:00:00.000+0000": 0.03888888, 
"2017-06-26T03:00:00.000+0000": 0.07777776000000002, 
"2017-06-28T05:00:00.000+0000": 0.05833332000000001, 
"2017-06-29T13:00:00.000+0000": 0.01944444, 
"2017-06-30T03:00:00.000+0000": 0.00972222, 
"2017-06-27T11:00:00.000+0000": 0.01944444, 
"2017-06-28T04:00:00.000+0000": 0.05833332000000001, 
"2017-06-29T12:00:00.000+0000": 0.00972222, 
"2017-06-30T02:00:00.000+0000": 0.06805554000000001, 
"2017-06-27T23:00:00.000+0000": 0.09722220000000002, 
"2017-06-27T16:00:00.000+0000": 0.01944444, 
"2017-06-26T15:00:00.000+0000": 0.01944444, 
"2017-06-29T06:00:00.000+0000": 0.00972222, 
"2017-06-30T07:00:00.000+0000": 0.00138889, 
"2017-06-30T00:00:00.000+0000": 0.01944444, 
"2017-06-27T21:00:00.000+0000": 0.01944444, 
"2017-06-26T02:00:00.000+0000": 0.07777776000000002, 
"2017-06-29T19:00:00.000+0000": 0.00972222, 
"2017-06-27T03:00:00.000+0000": 0.03888888, 
"2017-06-27T20:00:00.000+0000": 0.01944444, 
"2017-06-30T05:00:00.000+0000": 74.1458333, 
"2017-06-29T18:00:00.000+0000": 0.00972222, 
"2017-06-29T17:00:00.000+0000": 0.01944444, 
"2017-06-28T23:00:00.000+0000": 0.00972222, 
"2017-06-27T01:00:00.000+0000": 0.01944444, 
"2017-06-27T22:00:00.000+0000": 0.05833332000000001 

und ich möchte es aggregiert nach dem Tag, in absteigender Reihenfolge mit dem Datum sortiert. Zum Beispiel:

"2017-06-28T03:00:00.000+0000": 0.07777776000000002, 
"2017-06-28T20:00:00.000+0000": 0.01944444, 

sind am selben Tag, so dass ihre Werte (usageAmount) hinzugefügt werden soll. Ich sorge mich nur um den Tag, nicht um die Stunde. Wie kann ich meine RDD reduzieren oder aggregieren, um das gewünschte Ergebnis zu erzielen?

** Update ** Die Antwort muss eine Funken RDD Lösung sein ...

+1

können Sie Spark-SQL der Datenrahmen verwenden? Das ist so viel einfacher zu schreiben und später zu verstehen. –

+0

@JacekLaskowski Die Daten stammen von MongoDB .... – cybertextron

+0

Keine Antwort angenommen? – c0der

Antwort

0

Relativ einfach (obwohl es werde eine Menge Code sein wird)

die mit einer Implementierung von Pojo Beginnen wir:

static class Record 
{ 
    private Date date; 
    private double amount; 
    public Record(Date d, double a) 
    { 
     this.date = d; 
     this.amount = a; 
    } 
    @Override 
    public String toString() { 
     return date.toString() + "\t" + amount; 
    } 
} 

Jetzt ein Dienstprogramm Methode zu prüfen, ob zwei Datensätze im selben Tag sind:

private static boolean sameDay(Record r0, Record r1) 
{ 
    Date d0 = r0.date; 
    Date d1 = r1.date; 

    Calendar cal = new GregorianCalendar(); 
    cal.setTime(d0); 

    int[] dateParts0 = {cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.MONTH), cal.get(Calendar.YEAR)}; 

    cal.setTime(d1); 

    return cal.get(Calendar.DAY_OF_MONTH) == dateParts0[0] && 
      cal.get(Calendar.MONTH) == dateParts0[1] && 
      cal.get(Calendar.YEAR) == dateParts0[2]; 
} 

Jetzt, da wir das haben, können wir mit dem Hauptteil des Algorithmus beginnen. Die Idee hier ist, die Liste der Eingaben nach Tag zu sortieren. Und dann Schleife über die Liste. Für jeden Eintrag, den wir verarbeiten, prüfen wir, ob es sich um den Tag handelt, an dem der letzte bekannte Tag unseres aggregierten Datensatzes stattgefunden hat. Wenn dies der Fall ist, fügen wir die Menge des Datensatzes hinzu, wenn nicht, fügen wir einen neuen Eintrag hinzu.

public static List<Record> aggregate(Collection<Record> rs) 
{ 
    List<Record> tmp = new ArrayList<>(rs); 
    java.util.Collections.sort(tmp, new Comparator<Record>() { 
     @Override 
     public int compare(Record o1, Record o2) { 
      return o1.date.compareTo(o2.date); 
     } 
    }); 

    List<Record> out = new ArrayList<>(); 
    out.add(new Record(tmp.get(0).date, 0)); 
    for(int i=0;i<tmp.size();i++) 
    { 
     Record last = out.get(out.size() - 1); 
     Record recordBeingProcessed = tmp.get(i); 
     if(sameDay(last, recordBeingProcessed)) 
     { 
      last.amount += recordBeingProcessed.amount; 
     } 
     else 
     { 
      out.add(recordBeingProcessed); 
     } 
    } 

    return out; 
} 

Und schließlich eine schöne Haupt-Methode alles testen:

public static void main(String[] args) throws ParseException { 
    DateFormat format = new SimpleDateFormat("MMMM d, yyyy", Locale.ENGLISH); 
    String[] dateStrings = {"January 2, 2010", "January 2, 2010", "January 3, 2010"}; 
    List<Record> rs = new ArrayList<>(); 
    for(int i=0;i<dateStrings.length;i++) 
    { 
     rs.add(new Record(format.parse(dateStrings[i]), 1)); 
    } 
    for(Record r : aggregate(rs)) 
    { 
     System.out.println(r); 
    } 
} 

Das ausdruckt:

Sat Jan 02 00:00:00 CET 2010 2.0 
Sun Jan 03 00:00:00 CET 2010 1.0 
0
public class MyPojo { 

     Date startDate; 
     Double usageAMount; 
     static DateFormat format = new SimpleDateFormat("yyyy-mm-dd:hh"); 

    MyPojo(Date startDate, Double usageAMount) { 

     this.startDate = startDate; 
     this.usageAMount = usageAMount; 
    } 

    Date getStrartDate() { return startDate;} 
    Double getUsage() { return usageAMount;} 

    public static void main(String[] args) throws ParseException { 

     List<MyPojo> reports = getReports(); 

     //sort by date 
     reports = reports.stream().sorted(getComperator()).collect(Collectors.toList()); 
     output(reports); 

     //you can collect to map but map keys are not sorted 
     //and keys (dates) must be unique 
     Map<Date, Double> result = reports.stream().sorted(getComperator()).collect(Collectors 
       .toMap(e-> e.startDate , e-> e.usageAMount)); 
    } 

    private static List<MyPojo> getReports() throws ParseException { 

     List<MyPojo> reports = new ArrayList<>(); 

     reports.add(new MyPojo(format.parse("2017-06-28:01"), 0.02916666)); 
     reports.add(new MyPojo(format.parse("2017-06-29:01"), 0.02916666)); 
     reports.add(new MyPojo(format.parse("2017-06-27:01"), 0.03888888)); 
     reports.add(new MyPojo(format.parse("2017-06-26:01"), 0.05833332000000001)); 
     reports.add(new MyPojo(format.parse("2017-06-28:02"), 0.03888888)); 
     reports.add(new MyPojo(format.parse("2017-06-27:02"), 0.03888888)); 
     reports.add(new MyPojo(format.parse("2017-06-28:03"), 0.07777776000000002)); 
     reports.add(new MyPojo(format.parse("2017-06-28:04"), 0.01944444)); 
     reports.add(new MyPojo(format.parse("2017-06-30:01"), 0.00972222)); 

     return reports; 
    } 

    private static Comparator<? super MyPojo> getComperator() { 

     Comparator<? super MyPojo> comperator = new Comparator<MyPojo>() { 

      @Override 
      public int compare(MyPojo o1, MyPojo o2) { 

       if((o1 == o2) || ((o1 == null) && (o2 == null))) { 
        return 0; 
       } 
       if(o1 == null) { 
        return -1; 
       } 
       if(o2 == null) { 
        return 1; 
       } 

       return (o1).startDate.compareTo((o2).startDate); 
      } 

     }; 
     return comperator; 
    } 

    static void output(List<MyPojo> reports) { 

     for(MyPojo p : reports) { 
      System.out.println(format.format(p.startDate) +" - "+ p.usageAMount); 
     } 
    } 
} 

Ausgang:


2017.06.27: 01-,03888888
2017.06.27: 02-,03888888
2017.06.28: 01-0,02916666
2017.06.28: 02-,03888888
2017.06.28: 03 - ,07777776000000002
2017.06.28: 04 - 0,01944444
2017.06.29: 01-0,02916666
2017.06.30: 01 - 0,00972222

Verwandte Themen