2017-06-18 2 views
0

Ich bin neu bei Kafka und habe angefangen, am Producer-Consumer-Modell zu arbeiten.
Ich habe ein Programm für Kafka Producer geschrieben, das eine Tabelle von MySQL lesen wird und ich diese Meldungen in der Eingabeaufforderung lese.
Das Problem ist, dass ich nur Daten aus der letzten Spalte der Tabelle sehen konnte.Fehler in meiner Kafka-Producer-Klasse

MySQL Tabellenstruktur und Daten:

+----------+---------+----------------+ 
| dept  | empName | salary(double) | 
+----------+---------+----------------+ 
| Engg  | Fred | 2000   | 
| Engg  | Bob  | 3000   | 
| Engg  | Joe  | 1000   | 
| Arts  | Jack | 5000   | 
| Commerce | Jill | 2400   | 
| Arts  | James | 3000   | 
| Commerce | Rob  | 8700   | 
+----------+---------+----------------+ 

Kafka Produzent Klasse:

package com.kafka.producer; 

import java.sql.Connection; 
import java.sql.DriverManager; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.util.Properties; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("deprecation") 
public class KafkaMySqlProducer { 
    public static void main(String[] args) { 

     Connection con; 
     PreparedStatement pstmnt; 
     ResultSet rs; 

     Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class","kafka.serializer.StringEncoder"); 
     props.put("metadata.broker.list","localhost:9092"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer producer = new Producer(config); 

     try { 
      Class.forName("com.mysql.jdbc.Driver"); 
      con = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb","root","cloudera"); 
      pstmnt = con.prepareStatement("select * from department"); 
      rs = pstmnt.executeQuery(); 
      while(rs.next()) { 
       String name = rs.getString(1); 
       String department = rs.getString(2); 
       String salary = " " + rs.getDouble(3); 
       producer.send(new KeyedMessage("dbrec", name, department, salary)); 
      } 
      con.close(); 
     } catch (ClassNotFoundException e) { 
      e.printStackTrace(); 
     } catch (SQLException e) { 
      e.printStackTrace(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 
} 

Ausgabe von meinem Verbraucher:

[[email protected] kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbrec --from-beginning 
[2017-06-18 10:23:22,428] WARN Error while fetching metadata with correlation id 2 : {dbrec=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 
[2017-06-18 10:23:22,628] WARN The following subscribed topics are not assigned to any members in the group console-consumer-33197 : [dbrec] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

2000.0 
3000.0 
1000.0 
5000.0 
2400.0 
3000.0 
8700.0 

Ich bin nur die Daten aus der letzten Spalte bekommen des Tisches.
Kann mir jemand sagen, was ist der Fehler, den ich hier mache?

+0

Wenn Sie ein neues Projekt mit Kafka starten, ist es besser, die neuen Hersteller- und Verbraucherklassen zu verwenden. Ich sehe, dass Sie den älteren verwenden, bei dem sich der Produzent mit Zookeeper verbindet, um Informationen über Broker zu erhalten und nicht direkt zu einem von ihnen. – ppatierno

Antwort

2

Der letzte Parameter des Konstruktors KeyedMessage mit 4 Argumenten ist die eigentliche Nachricht.

Die Klasse akzeptiert nicht nur eine Liste aller Ihrer Werte.

Im Einzelnen sind die von Ihnen angegebenen Parameter (topic, key, partKey, message). Sie verbrauchen nur message von topic

+1

Okay .. Korrigiert es so. producer.send (neue KeyedMessage ("dbrec", Name + "" + Abteilung + "" + Gehalt)); und arbeitet jetzt. – Sidhartha

+0

Auch ich sehe veraltete Nachrichten für diese beiden Zeilen. \t \t ProducerConfig config = new ProducerConfig (Requisiten); \t \t Produzent Produzent = neuer Produzent (Config); Können Sie mir sagen, was die neuen Optionen für diese Methoden sind, damit ich sie implementieren kann? – Sidhartha

+0

Nicht sicher, aber das JavaDoc könnte das wahrscheinlich für Sie beantworten –