2016-09-30 2 views
0

Hallo, ich bin neu zu funken und scala. Ich versuche, einige Tweets durch Funken mit dem folgenden Code-Streaming zu streamen:Spark Ausführung für Twitter Streaming

object TwitterStreaming { 

    def main(args: Array[String]): Unit = { 
    if (args.length < 1) { 
     System.err.println("WrongUsage: PropertiesFile, [<filters>]") 
     System.exit(-1) 
    } 

    StreamingExamples.setStreaningLogLevels() 
    val myConfigFile = args(0) 
    val batchInterval_s = 1 
    val fileConfig = ConfigFactory.parseFile(new File(myConfigFile)) 
    val appConf = ConfigFactory.load(fileConfig) 
    // Set the system properties so that Twitter4j library used by twitter stream 
    // can use them to generate OAuth credentials 

    System.setProperty("twitter4j.oauth.consumerKey", appConf.getString("consumerKey")) 
    System.setProperty("twitter4j.oauth.consumerSecret", appConf.getString("consumerSecret")) 
    System.setProperty("twitter4j.oauth.accessToken", appConf.getString("accessToken")) 
    System.setProperty("twitter4j.oauth.accessTokenSecret", appConf.getString("accessTokenSecret")) 

    val sparkConf = new SparkConf().setAppName("TwitterStreaming").setMaster(appConf.getString("SPARK_MASTER"))//local[2] 

    val ssc = new StreamingContext(sparkConf, Seconds(batchInterval_s)) // creating spark streaming context 
    val stream = TwitterUtils.createStream(ssc, None) 
    val tweet_data = stream.map(status => TweetData(status.getId, "@" + status.getUser.getScreenName, status.getText.trim())) 
    tweet_data.foreachRDD(rdd => { 
     println(s"A sample of tweets I gathered over ${batchInterval_s}s: ${rdd.take(10).mkString(" ")} (total tweets fetched: ${rdd.count()})") 
    }) 
    } 

} 

case class TweetData(id: BigInt, author: String, tweetText: String) 

Mein Fehler:

Exception in thread "main" com.typesafe.config.ConfigException$WrongType:/WorkSpace/InputFiles/application.conf: 5: Cannot concatenate object or list with a non-object-or-list, ConfigString("local") and SimpleConfigList([2]) are not compatible 
at com.typesafe.config.impl.ConfigConcatenation.join(ConfigConcatenation.java:116) 

kann jemand überprüfen Sie den Code und mir sagen, wo ich falsch mache?

+0

@gsamaras ich TweetData Klasse zuordnen wollen ... Ist irgend etwas falsch, das zu tun? – Anji

+0

wie bei mir ist der Fehler bei der Übergabe "local [2]" als SPARK_MASTER.Wenn das der Fall ist, wie SET SPARK_MASTER – Anji

+0

zeigen Sie bitte die entsprechende Zeile in Ihrer Konfigurationsdatei (die mit SPARK_MASTER) - es scheint das Format zu verletzen erwartet von 'typesafe.config'. Sie können die Frage so eingrenzen, dass sie nur den Inhalt der Datei und die zwei Codezeilen enthält, die die Konfigurationsdatei laden - das ist die Ausnahme, keine Notwendigkeit für den Rest ... –

Antwort

1

Wenn Ihre Konfigurationsdatei enthält:

SPARK_MASTER=local[2] 

Ändern Sie es an:

SPARK_MASTER="local[2]" 
+0

Das macht Sinn! – gsamaras

+0

@Tzach Zohar Nach dem Ändern bekomme ich Set SPARK_LOCAL_IP, wenn du an eine andere Adresse binden musst – Anji

+0

Das hört sich nach einer ganz anderen Frage an, also kann ich es hier nicht beantworten. Versuchen Sie, SO/Google zu suchen, das ist Standard-Spark-Zeug ... In der Zwischenzeit wäre es für zukünftige Leser hilfreich, wenn Sie die Frage bearbeiten, um alle relevanten Informationen zu enthalten und _only_ it - was den Eintrag der Konfigurationsdatei und den Code bedeutet lädt es. –