2017-04-12 6 views
0

Ich bin ein Anfänger versucht, Tweets mit Spark-Streaming mit Scala mit einigen Filter-Keywords zu erhalten. Gibt es eine Möglichkeit, nach dem Streaming nur die Tweets zu filtern, die keinen geolocation haben? Ich versuche die Tweets in ElasticSearch zu speichern. Kann ich also vor dem Speichern der Tweet-Map in ElasticSearch diejenigen mit Geolocation-Informationen filtern und sie dann speichern? Ich erstelle JSON mit json4s.JSONDSL mit Feldern aus dem Tweet. Dies ist der BeispielcodeSpark-Streaming - Filter Tweets nach dem Streaming mit Geolocation

val stream = TwitterUtils.createStream (ssc, None, Filter) val tweetMap = stream.map (status => { val tweetMap =

 ("location" -> Option(status.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" })) ~ 
     ("UserLang" -> status.getUser.getLang) ~ 
     ("UserLocation" -> Option(status.getUser.getLocation)) ~ 
     ("UserName" -> status.getUser.getName) ~ 
     ("Text" -> status.getText) ~ 
     ("TextLength" -> status.getText.length) ~ 
     //Tokenized the tweet message and then filtered only words starting with # 
     ("HashTags" -> status.getText.split(" ").filter(_.startsWith("#")).mkString(" ")) ~ 
     ("PlaceCountry" -> Option(status.getPlace).map (pl => {s"${pl.getCountry}"})) 

tweetMap.map (s => Liste ("Tweet heraus~~POS=TRUNC")). drucken

// Each batch is saved to Elasticsearch 
tweetMap.foreachRDD { tweets => EsSpark.saveToEs(tweets, "sparksender/tweets")) } 

// vor diesem Schritt ist es eine Möglichkeit, Tweets, um herauszufiltern, die "location" als null haben?

Ich bezog den Code von GitHub: https://github.com/luvgupta008/ScreamingTwitter/blob/master/src/main/scala/com/spark/streaming/TwitterTransmitter.scala

Antwort

0

Überprüfen Sie die filter Methode auf der RDD. Nimmt eine Prädikatfunktion (a: A) => Boolean. Wenn der Rückgabewert wahr ist, wird das Element zur Liste hinzugefügt. Wenn es falsch ist, wird das Element nicht zur Liste hinzugefügt.

+0

Ich habe versucht, aber es gibt Fehler beim Kompilieren "Wert getGeoLocation ist kein Mitglied von scala.collection.immutable.Map [String, Any]" –

Verwandte Themen