Ich bin ein Anfänger versucht, Tweets mit Spark-Streaming mit Scala mit einigen Filter-Keywords zu erhalten. Gibt es eine Möglichkeit, nach dem Streaming nur die Tweets zu filtern, die keinen geolocation haben? Ich versuche die Tweets in ElasticSearch zu speichern. Kann ich also vor dem Speichern der Tweet-Map in ElasticSearch diejenigen mit Geolocation-Informationen filtern und sie dann speichern? Ich erstelle JSON mit json4s.JSONDSL mit Feldern aus dem Tweet. Dies ist der BeispielcodeSpark-Streaming - Filter Tweets nach dem Streaming mit Geolocation
val stream = TwitterUtils.createStream (ssc, None, Filter) val tweetMap = stream.map (status => { val tweetMap =
("location" -> Option(status.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" })) ~
("UserLang" -> status.getUser.getLang) ~
("UserLocation" -> Option(status.getUser.getLocation)) ~
("UserName" -> status.getUser.getName) ~
("Text" -> status.getText) ~
("TextLength" -> status.getText.length) ~
//Tokenized the tweet message and then filtered only words starting with #
("HashTags" -> status.getText.split(" ").filter(_.startsWith("#")).mkString(" ")) ~
("PlaceCountry" -> Option(status.getPlace).map (pl => {s"${pl.getCountry}"}))
tweetMap.map (s => Liste ("Tweet heraus~~POS=TRUNC")). drucken
// Each batch is saved to Elasticsearch
tweetMap.foreachRDD { tweets => EsSpark.saveToEs(tweets, "sparksender/tweets")) }
// vor diesem Schritt ist es eine Möglichkeit, Tweets, um herauszufiltern, die "location" als null haben?
Ich bezog den Code von GitHub: https://github.com/luvgupta008/ScreamingTwitter/blob/master/src/main/scala/com/spark/streaming/TwitterTransmitter.scala
Ich habe versucht, aber es gibt Fehler beim Kompilieren "Wert getGeoLocation ist kein Mitglied von scala.collection.immutable.Map [String, Any]" –