Mein RDD ist \ n Datensätze getrennt, diejeden Datensatz in RDD zu einem Array [Karte] mit scala und Funken
aussehenSingle RDD
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
und wollen es in ein Array konvertieren [Map [k, v] ],
wobei jedes Element in Array eine andere Karte [k, v] ist, die einem Datensatz entspricht.
Array enthält N Anzahl solcher Karten, abhängig von den Datensätzen in einer einzelnen RDD.
Ich bin neu in beiden Scala und Funke. Jede Hilfe bei der Konvertierung wird helfen.
object SparkApp extends Logging with App {
override def main(args: Array[ String ]): Unit = {
val myConfigFile = new File("../sparkconsumer/conf/spark.conf")
val fileConfig = ConfigFactory.parseFile(myConfigFile).getConfig(GlobalConstants.CONFIG_ROOT_ELEMENT)
val propConf = ConfigFactory.load(fileConfig)
val topicsSet = propConf.getString(GlobalConstants.KAFKA_WHITE_LIST_TOPIC).split(",").toSet
val kafkaParams = Map[ String, String ]("metadata.broker.list" -> propConf.getString(GlobalConstants.KAFKA_BROKERS))
//logger.info(message = "Hello World , You are entering Spark!!!")
val conf = new SparkConf().setMaster("local[2]").setAppName(propConf.getString(GlobalConstants.JOB_NAME))
conf.set("HADOOP_HOME", "/usr/local/hadoop")
conf.set("hadoop.home.dir", "/usr/local/hadoop")
//Lookup
// logger.info("Window of 5 Seconds Enabled")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("/tmp/checkpoint")
val apiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.API_FILE))
val arrayApi = ssc.sparkContext.broadcast(apiFile.distinct().collect())
val nonApiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.NON_API_FILE))
val arrayNonApi = ssc.sparkContext.broadcast(nonApiFile.distinct().collect())
val messages = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, topicsSet)
writeTOHDFS2(messages)
ssc.start()
ssc.awaitTermination()
}
def writeTOHDFS2(messages: DStream[ (String, String) ]): Unit = {
val records = messages.window(Seconds(10), Seconds(10))
val k = records.transform(rdd => rdd.map(r =>r._2)).filter(x=> filterNullImpressions(x))
k.foreachRDD { singleRdd =>
if (singleRdd.count() > 0) {
val maps = singleRdd.map(line => line.split("\n").flatMap(x => x.split(",")).flatMap(x => x.split("=")).foreach(x => new mutable.HashMap().put(x(0),x(1)))
val r = scala.util.Random
val sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm")
maps.saveAsTextFile("hdfs://localhost:8001/user/hadoop/spark/" + sdf.format(new Date())+r.nextInt)
}
}
}
}
Was ist das Format in Ihrer RDD? Ist es (k1, v1), (k2, v2) usw.? – jtitusj
Willkommen bei StackOverflow :) Können Sie das, was Sie versucht haben, mit einbeziehen - es wird uns leichter verständlich machen, mit welchen Konzepten Sie kämpfen. –
Das sieht einfach aus. Was hast du probiert? Wie viele Zeilen in Ihrer RDD - wollen Sie wirklich eine 'RDD [Map [...]]'? –