2015-01-05 7 views
5

Ich habe 2 Funken RDDs, dataRDD und newPairDataRDD, die für Spark-SQL-Abfrage verwendet werden. Wenn meine Anwendung initialisiert wird, wird das dataRDD initialisiert. Alle Daten in einer angegebenen HBase-Entität werden in dataRDD gespeichert.es ist sehr langsam für Funken RDD Union

Wenn Client SQL-Abfrage kommt, wird meine APP alle neuen Updates und Einfügungen zu newPairDataRDD erhalten. die DataRDD Union die newPairDataRDD und registrieren als Tabelle in Spark SQL-Kontext.

Ich fand sogar 0 Datensatz in dataRDD und 1 neu eingefügt Datensatz in newPairDataRDD. Es dauert 4 Sekunden für die Vereinigung. Das ist zu langsam

Ich denke, es ist nicht vernünftig. Wer weiß, wie man es schneller macht? Dank einfachen Code wie unten

// Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD(); 
    dataRDD.cache(); 
    dataRDD.persist(StorageLevel.MEMORY_ONLY()); 
    logger.info(dataRDD.count()); 

    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD 

    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD(); 
    // Step3: if count>0 do union and reduce 

     if(newPairDataRDD.count() > 0) { 

     JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD); 

    // if data was updated in DB, need to delete the old version from the dataRDD. 

     dataRDD = unionedRDD.reduceByKey(
      new Function2<Row, Row, Row>() { 
      // @Override 
      public Row call(Row r1, Row r2) { 
      return r2; 
      } 
      }); 
    } 
//step4: register the dataRDD 
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema); 

//step5: execute sql query 
retRDD = sqlContext.sql(sql); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Von der Funkenbahn ui, kann ich unten. Offenbar braucht es 4s Vereinigung

Abgeschlossene Stages (8)

StageId Beschreibung Übermittelt Dauer Aufgaben: Gelang/Total Input Shuffle lesen Shuffle schreiben

6 sammeln bei SparkPlan.scala: 85 + Details 1/4/2015 8.17 2 s 156,0 8-Aug B

7 Union SparkSqlQueryForMarsNew.java:389+details 2015.01.04 8.17 s 4 8-Aug 64,0 B 156,0 B

Antwort

1

eine effizientere Weg zu erreichen, was Sie wollen, ist einezu verwendenund flatMapValues(), verwendet eine Union sehr wenig außer dem Hinzufügen neuer Partitionen zu dataRDD, was bedeutet, dass alle Daten vor dem reduceByKey() gemischt werden müssen. A cogroup() und flatMapValues() bewirken nur eine Neupartitionierung des newPairDataRDD.

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD); 
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() { 
     public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) { 
      if (grouped._2.nonEmpty()) { 
       return grouped._2; 
      } else { 
       return grouped._1; 
      } 
     } 
    }); 

Oder in Scala

val unioned = dataRDD.cogroup(newPairDataRDD) 
val updated = unioned.flatMapValues { case (oldVals, newVals) => 
    if (newVals.nonEmpty) newVals else oldVals 
} 

Haftungsausschluss, ich bin zu schreiben Funken in Java nicht verwendet! Bitte korrigieren Sie mich, wenn das oben genannte falsch ist!

0

Versuchen Sie RDDs repartitioning:

JavaPairRDD unionedRDD = dataRDD.repartition (sc.defaultParallelism * 3) .Union (newPairDataRDD.repartition (sc.defaultParallelism * 3));