2015-10-05 7 views
8

Ich habe RDD der Foo-Klasse: class Foo(name : String, createDate : Date). Ich möchte eine andere RDD mit 10 Prozent älter Foo. Meine erste Idee war, nach createDate zu sortieren und um 0,1 * zu zählen, aber es gibt keine Limit-Funktion.Wie sortiere ich eine RDD und limit in Spark?

Haben Sie eine Idee?

Antwort

14

Foo Angenommen ist ein Fall, Klasse wie folgt:

import java.sql.Date 
case class Foo(name: String, createDate: java.sql.Date) 
  1. mit Normalpapier RDDs:

    import org.apache.spark.rdd.RDD 
    import scala.math.Ordering 
    
    val rdd: RDD[Foo] = sc 
        .parallelize(Seq(
        ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"), 
        ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23"))) 
        .toDF("name", "createDate") 
        .withColumn("createDate", $"createDate".cast("date")) 
        .as[Foo].rdd 
    
    rdd.cache() 
    val n = scala.math.ceil(0.1 * rdd.count).toInt 
    
    • Daten passt in den Treiber-Speicher:

      • und Bruch Sie wollen, ist relativ klein

        rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime)) 
        // Array[Foo] = Array(Foo(a,2009-11-23)) 
        
      • Fraktion Sie ist relativ groß:

        rdd.sortBy(_.createDate.getTime).take(n) 
        
    • sonst

      rdd 
          .sortBy(_.createDate.getTime) 
          .zipWithIndex 
          .filter{case (_, idx) => idx < n} 
          .keys 
      
  2. Verwenden von DataFrame (beachten Sie, dass dies aufgrund des Grenzverhaltens nicht optimal ist).

    import org.apache.spark.sql.Row 
    
    val topN = rdd.toDF.orderBy($"createDate").limit(n) 
    topN.show 
    
    // +----+----------+ 
    // |name|createDate| 
    // +----+----------+ 
    // | a|2009-11-23| 
    // +----+----------+ 
    
    
    // Optionally recreate RDD[Foo] 
    topN.map{case Row(name: String, date: Date) => Foo(name, date)} 
    
+1

Hallo zero323 können Sie sagen, wirklich schnell, warum Datenrahmen der Leistung auf Grenz-Betrieb suboptimal ist? Was ist der Unterschied zu Top auf RDD in der Umsetzung vergleichen? @ Zero333 –

+0

@XinweiLiu Ich habe bereits eine Antwort auf Ihre Frage zur Verfügung gestellt. Ich hoffe, es erklärt, was vor sich geht. – zero323

+1

Große Antwort @ Zero323. Aber ich habe immer noch die gleiche Frage xinwei Liu hat. Warum ist df.limit() langsam? – guilhermecgs