ich eine Lookup rdd von Größe haben 6000, lookup_rdd: RDD [Zeichenfolge]Funke: access innerhalb eines anderen rdd RDD
a1 a2 a3 a4 a5 .....
und andere rdd, data_rdd: RDD [(String, Iterable [(String, Int)])]: (id, (Artikel, count)), die eindeutigen IDs hat,
(id1,List((a1,2), (a3,4))) (id2,List((a2,1), (a4,2), (a1,1))) (id3,List((a5,1)))
FOREACH Element in lookup_rdd möchte ich überprüfen, ob jedes id dieses Element hat oder nicht, wenn es da ist Ich zähle die Zählung und wenn nicht, gebe ich 0 und in einer Datei speichern.
Was ist der effiziente Weg, dies zu erreichen. Ist Hashing möglich? z.B. Ausgabe ich will, ist:
id1,2,0,4,0,0 id2,1,1,0,2,0 id3,0,0,0,0,1
ich das versucht:
val headers = lookup_rdd.zipWithIndex().persist()
val indexing = data_rdd.map{line =>
val id = line._1
val item_cnt_list = line._2
val arr = Array.fill[Byte](6000)(0)
item_cnt_list.map(c=>(headers.lookup(c._1),c._2))
}
indexing.collect().foreach(println)
Ich bekomme die Ausnahme:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations
6000 Gesamt ist ein sehr kleiner Datensatz. Erwägen Sie, auf dem Treiber zu sammeln und senden Sie dann –