0

Ich versuche meine Hände auf Kafka in Intellij mit Spark & Scala. Beim Erstellen des Producer-Objekts kann ich den Fehler nicht beheben.Kafka Producer Objekt kann nicht in Intellij erstellt werden

import java.util.Properties 
import org.apache.kafka.clients.producer._ 
import kafka.producer.KeyedMessage 
import org.apache.spark._ 
object kafkaProducer { 

    def main(args: Array[String]){ 

     val topic = "jovis" 

     val props = new Properties() 
     props.put("metadata.broker.list", "localhost:9092") 
     props.put("serializer.class", "kafka.serializer.StringEncoder") 
     val config = new ProducerConfig(props) 

//Error in Line below 

val producer = new Producer[String, String](config) 
     val conf = new SparkConf().setAppName("Kafka").setMaster("local") 
     //val ssc = new StreamingContext(conf, Seconds(10)) 
     val sc = new SparkContext(conf) 
     val data = sc.textFile("/home/hdadmin/empname.txt") 

     var i = 0 
     while(i <= data.count){ 
     data.collect().foreach(x => { 
      println(x) 
      producer.send(new KeyedMessage[String, String](topic, x)) 
      Thread.sleep(1000) 
     }) 
     } 

Fehlerprotokoll: Der Code in Scala Objekt ist unten angegeben

constructor ProducerConfig in class ProducerConfig cannot be accessed in object kafkaProducer 
val config = new ProducerConfig(props) 

Trait Producer is abstract;Cannot be instantiated. 
val producer = new Producer[String, String](config) 

ich die Abhängigkeits Gläser unten importiert haben: http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.8.2.0/kafka-clients-0.8.2.0.jar http://central.maven.org/maven2/org/apache/kafka/kafka_2.11/0.10.2.1/kafka_2.11-0.10.2.1.jar

von Apart, dass ich zookeeper Server haben begonnen auch.

Wohin gehe ich falsch?

Antwort

1

Kann dies helfen Ihnen what is the difference between kafka ProducerRecord and KeyedMessage

Bitte, versuchen Sie die neue API "org.apache.kafka" %% "kafka" % "0.8.2.0"

import org.apache.kafka.clients.producer.ProducerRecord 
import org.apache.kafka.clients.producer.KafkaProducer 
val producer = new KafkaProducer[String, String](props) 
producer.send(new ProducerRecord[String, String](topic, key, value) 
Verwandte Themen