2017-08-31 2 views
1

So habe ich einen Apache Spark-Stream, der alle 20 Minuten nach Tag und Stunde in S3 Parkett Dateien Partition schreibt.
Es scheint, dass jeder Stapel vor dem Schreiben "ls" und "head" in allen Ordnern dieses Tabellenname (/ root-Ordners) ausführt.Spark schreiben in Datei mit append an s3 - Kosten Problem

Da wir mehrere Tage X 24 Stunden X verschiedene Tabellen haben, verursacht dies insgesamt relativ hohe S3-Kosten.

Bitte beachten Sie, dass unser Schema dynamisch geändert wird.

So sind meine Fragen:

  1. es richtig ist, dass die die Schreibvorgänge alle Parkett Kopf rekursiv lesen?

  2. Warum der Stream diese Informationen nicht zwischenspeichert/Ist es möglich, sie zwischenzuspeichern?

  3. Können Sie die Best Practice vorschlagen?

// Code über:

withPartition.write() 
       .format(format) 
       .mode(SaveMode.Append) 
       .partitionBy("day","hour") 
       .save(path); 

Es scheint, dass dieses Problem zu zusammenhängt:

https://issues.apache.org/jira/browse/SPARK-20049

Spark partitionBy much slower than without it

Antwort

0

fand ich durch diesen Funken Partition aus ist die Ursache dieses Problems:

Spark partitionBy much slower than without it

ich es also wie folgt implementieren und es das Problem behoben, zudem verbessert es performence:

withPartition = withPartition.persist(StorageLevel.MEMORY_AND_DISK()); 
    Dataset<DayAndHour> daysAndHours = withPartition.map(mapToDayHour(), Encoders.bean(DayAndHour.class)).distinct(); 

    DayAndHour[] collect = (DayAndHour[])daysAndHours.collect(); 
    Arrays.sort(collect); 
    logger.info("found " + collect.length +" different days and hours: " 
      + Arrays.stream(collect).map(DayAndHour::toString).collect(Collectors.joining(",")) ); 
    long time = System.currentTimeMillis(); 
    for(DayAndHour dayAndHour : collect){ 
     int day = dayAndHour.getDay(); 
     int hour = dayAndHour.getHour(); 
     logger.info("Start filter on " + dayAndHour); 
     Dataset<Row> filtered = withPartition.filter(filterDayAndHour(day, hour)) 
       .drop("day", hour"); 

      String newPath = path + "/" 
        + "day" +"=" +day +"/" 
        + "hour" +"=" + hour; 

      long specificPathCount = filtered.count(); 
      long timeStart = System.currentTimeMillis(); 
      logger.info("writing " + specificPathCount+ " event to " + newPath ); 

      filtered.write() 
        .format(format) 
        .mode(SaveMode.Append) 
        .save(newPath); 

      logger.info("Finish writing partition of " + dayAndHour+ " to "+ newPath+ ". Wrote [" + specificPathCount +"] events in " + TimeUtils.tookMinuteSecondsAndMillis(timeStart, System.currentTimeMillis())); 
} 
    logger.info("Finish writing " + path+ ". Wrote [" + cnt +"] events in " + MinuteTimeUtils.tookMinuteSecondsAndMillis(time, System.currentTimeMillis())); 
    withPartition.unpersist(); 

private static MapFunction<Row, DayAndHour> mapToDayHour() { 
    return new MapFunction<Row, DayAndHour>() { 
     @Override 
     public DayAndHour call(Row value) throws Exception { 
      int day = value.getAs("day"); 
      int hour = value.getAs(hour"); 
      DayAndHour dayAndHour = new DayAndHour(); 
      dayAndHour.setDay(day); 
      dayAndHour.setHour(hour); 
      return dayAndHour; 
     } 
    }; 
} 

private static FilterFunction<Row> filterDayAndHour(int day, int hour) { 
    return new FilterFunction<Row>() { 
     @Override 
     public boolean call(Row value) throws Exception { 
      int cDay = value.getAs("day"); 
      int cHour = value.getAs(hour"); 

      return day == cDay && hour == cHour; 
     } 
    }; 
} 

// Und eine andere POJO

public class DayAndHour implements Serializable , Comparable<DayAndHour>{ 

    private int day; 
    private int hour; 

    public int getDay() { 
     return day; 
    } 

    public void setDay(int day) { 
     this.day = day; 
    } 

    public int getHour() { 
     return hour; 
    } 

    public void setHour(int hour) { 
     this.hour = hour; 
    } 

    @Override 
    public boolean equals(Object o) { 
     if (this == o) return true; 
     if (o == null || getClass() != o.getClass()) return false; 

     DayAndHour that = (DayAndHour) o; 

     if (day != that.day) return false; 
     return hour == that.hour; 
    } 

    @Override 
    public int hashCode() { 
     int result = day; 
     result = 31 * result + hour; 
     return result; 
    } 

    @Override 
    public String toString() { 
     return "(" + 
       "day=" + day + 
       ", hour=" + hour + 
       ')'; 
    } 

    @Override 
    public int compareTo(DayAndHour dayAndHour) { 
     return Integer.compare((day * 100) + hour, (dayAndHour.day * 100) + dayAndHour.hour); 
    } 
}