2016-10-27 5 views
2

Ich versuche, Twitter Streaming Beispiel in Zeppelin zu laufen. Nachdem ich gesucht habe, habe ich "org.apache.bahir: spark-streaming-twitter_2.11: 2.0.0" in Spark Interpreter eingefügt. So kann ich den ersten Teil der Arbeit machen, wie in:Zeppelin Twitter Streaming Beispiel funktioniert nicht

Apache Zeppelin 0.6.1: Run Spark 2.0 Twitter Stream App

Jetzt versuche ich, die zweite Hälfte hinzufügen, wie:

case class Tweet(createdAt:Long, text:String, screenName:String) 
twt.map(status=> 
    Tweet(status.getCreatedAt().getTime()/1000, status.getText(), status.getUser().getScreenName()) 
).foreachRDD(rdd=> 
    rdd.toDF().registerTempTable("tweets") 
) 

Jetzt habe ich den Fehler bekam:

<console>:56: error: not found: type StreamingContext 
     val ssc = new StreamingContext(sc, Seconds(2)) 
        ^
<console>:56: error: not found: value Seconds 
     val ssc = new StreamingContext(sc, Seconds(2)) 
             ^
<console>:61: error: not found: value Seconds 
     val twt = tweets.window(Seconds(60)) 

Eigentlich habe ich die Falllinie hinzugefügt, ich habe den obigen Fehler. Ich hatte wirklich keine Ahnung, was hier passiert ist.

Jeder hat eine Ahnung hier?

Hier sind Details Funke: 2.0.0 Zeppelin: 0.6.2

Vielen Dank.

============================================== =======================

// All codes for your reference: 
import org.apache.spark.streaming.twitter 
import org.apache.spark.streaming._ 
import org.apache.spark.storage.StorageLevel 
import scala.io.Source 
import scala.collection.mutable.HashMap 
import java.io.File 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 
import sys.process.stringSeqToProcess 
import org.apache.spark.SparkConf 

// ********************************* Configures the Oauth Credentials for accessing Twitter **************************** 
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {...} 

// ***************************************** Configure Twitter credentials ******************************************** 
val apiKey = ... 
val apiSecret = ... 
val accessToken = ... 
val accessTokenSecret = ... 
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) 

// ************************************************* The logic itself ************************************************* 
val ssc = new StreamingContext(sc, Seconds(2)) 
val tweets = TwitterUtils.createStream(ssc, None) 
val twt = tweets.window(Seconds(60)) 
twt.print 
// above codes work correctly 

// If added the following line, it failed with the above error 
case class Tweet(createdAt:Long, text:String, screenName:String) 

Antwort

3

ich hatte das gleiche Problem, und ich habe keine Ahnung, warum von der Spitze der Einfuhr Aussagen bewegen zu kurz bevor der neue StreamingContext es behoben hat, aber es ist passiert.

import org.apache.spark.streaming._ //moved here from top 
import org.apache.spark.streaming.twitter._ //moved here from top 
val ssc = new StreamingContext(sc, Seconds(2)) //existing 
0

Ich hatte ein ähnliches Problem. Die Verwendung der FQCNs funktionierte in Ordnung, also benutzte ich das als Workaround.