2016-04-15 4 views
2

Mein RDD ist \ n Datensätze getrennt, diejeden Datensatz in RDD zu einem Array [Karte] mit scala und Funken

aussehen
Single RDD 

k1=v1,k2=v2,k3=v3 
k1=v1,k2=v2,k3=v3 
k1=v1,k2=v2,k3=v3 

und wollen es in ein Array konvertieren [Map [k, v] ],

wobei jedes Element in Array eine andere Karte [k, v] ist, die einem Datensatz entspricht.

Array enthält N Anzahl solcher Karten, abhängig von den Datensätzen in einer einzelnen RDD.

Ich bin neu in beiden Scala und Funke. Jede Hilfe bei der Konvertierung wird helfen.

object SparkApp extends Logging with App { 


    override def main(args: Array[ String ]): Unit = { 
    val myConfigFile = new File("../sparkconsumer/conf/spark.conf") 
    val fileConfig = ConfigFactory.parseFile(myConfigFile).getConfig(GlobalConstants.CONFIG_ROOT_ELEMENT) 
    val propConf = ConfigFactory.load(fileConfig) 
    val topicsSet = propConf.getString(GlobalConstants.KAFKA_WHITE_LIST_TOPIC).split(",").toSet 
    val kafkaParams = Map[ String, String ]("metadata.broker.list" -> propConf.getString(GlobalConstants.KAFKA_BROKERS)) 


    //logger.info(message = "Hello World , You are entering Spark!!!") 
    val conf = new SparkConf().setMaster("local[2]").setAppName(propConf.getString(GlobalConstants.JOB_NAME)) 
    conf.set("HADOOP_HOME", "/usr/local/hadoop") 
    conf.set("hadoop.home.dir", "/usr/local/hadoop") 
    //Lookup 

    // logger.info("Window of 5 Seconds Enabled") 
    val ssc = new StreamingContext(conf, Seconds(5)) 
    ssc.checkpoint("/tmp/checkpoint") 

    val apiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.API_FILE)) 
    val arrayApi = ssc.sparkContext.broadcast(apiFile.distinct().collect()) 

    val nonApiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.NON_API_FILE)) 
    val arrayNonApi = ssc.sparkContext.broadcast(nonApiFile.distinct().collect()) 


    val messages = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, topicsSet) 
    writeTOHDFS2(messages) 
    ssc.start() 
    ssc.awaitTermination() 
    } 



    def writeTOHDFS2(messages: DStream[ (String, String) ]): Unit = { 
    val records = messages.window(Seconds(10), Seconds(10)) 
    val k = records.transform(rdd => rdd.map(r =>r._2)).filter(x=> filterNullImpressions(x)) 

    k.foreachRDD { singleRdd => 
     if (singleRdd.count() > 0) { 


     val maps = singleRdd.map(line => line.split("\n").flatMap(x => x.split(",")).flatMap(x => x.split("=")).foreach(x => new mutable.HashMap().put(x(0),x(1))) 


     val r = scala.util.Random 
     val sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm") 
     maps.saveAsTextFile("hdfs://localhost:8001/user/hadoop/spark/" + sdf.format(new Date())+r.nextInt) 
     } 
    } 

    } 

} 
+0

Was ist das Format in Ihrer RDD? Ist es (k1, v1), (k2, v2) usw.? – jtitusj

+2

Willkommen bei StackOverflow :) Können Sie das, was Sie versucht haben, mit einbeziehen - es wird uns leichter verständlich machen, mit welchen Konzepten Sie kämpfen. –

+0

Das sieht einfach aus. Was hast du probiert? Wie viele Zeilen in Ihrer RDD - wollen Sie wirklich eine 'RDD [Map [...]]'? –

Antwort

3

Hier ist ein Code, der ziemlich selbst erklärend sein sollte.

val lines = "k1=v1,k2=v2,k3=v3\nk1=v1,k2=v2\nk1=v1,k2=v2,k3=v3,k4=v4" 

val maps = lines.split("\n") 
.map(line => line.split(",") 
.map(kvPairString => kvPairString.split("=")) 
.map(kvPairArray => (kvPairArray(0), kvPairArray(1)))) 
.map(_.toMap) 

// maps is of type Array[Map[String, String]] 

println(maps.mkString("\n")) 

// prints: 
// Map(k1 -> v1, k2 -> v2, k3 -> v3) 
// Map(k1 -> v1, k2 -> v2) 
// Map(k1 -> v1, k2 -> v2, k3 -> v3, k4 -> v4) 

Wort des Rates - SO ist keine "schreiben Code für mich" -Plattform. Ich verstehe, dass es ziemlich schwierig ist, einfach in Scala und Spark einzutauchen, aber bitte versuchen Sie das nächste Mal, das Problem selbst zu lösen und veröffentlichen Sie, was Sie bisher versucht haben und welche Probleme Sie hatten.

+0

Danke slouc, Totally arbeitete für mich ohne jede Änderung. –

+0

Hatte viele Variationen ausprobiert, die sind: singleRdd.map (line => line.split ("\ n"). FlatMap (x => x.split (",")). FlatMap (x => x.split "=")). foreach (x => new mutable.HashMap(). put (x (0), x (1))) –

+0

@krupavarughese Ja, nun, fühle dich nicht beschämt zu schreiben, was du versucht hast:) Es zeigt guten Willen und trennt Sie von Benutzern, die nur ihr Problem posten und darauf warten, dass jemand die Lösung festlegt (diese werden schnell downvotiert). – slouc

Verwandte Themen