2016-09-12 1 views
0

Ich habe zwei Cluster mit Kafka und funken getrennt. Ich möchte ein Kafka-Thema aus dem Funkencluster erstellen. Ich habe festgestellt, dass ein Thema erstellt werden muss, dass wir Kafka-topics.sh aufrufen müssen, die nicht im Spark-Cluster verfügbar sind. Befehl sollte über die Shell aufgerufen werden.Wie kann ich ein Kafka-Thema erstellen, das in einem anderen Cluster von einem anderen Funke-Cluster ausgeführt wird?

zB: /kafka_topics.sh --zookeeper: 2181 --create --topic test_topic

Dieses Skript sollte von Funken Cluster genannt werden, und es sollte auf Kafka Cluster ausgeführt werden sollen. Kann mir jemand helfen?

Antwort

1

Sie können Java API und Maven Abhängigkeiten (kafka und zookeeper) haben, um kafka Thema wie unten zu erstellen. Sie können den Code aus dem Code aufrufen, an den Sie die Funke-Anwendung senden.

<dependency> 
    <groupId>com.101tec</groupId> 
    <artifactId>zkclient</artifactId> 
    <version>0.3</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.8.2.1</version> 
</dependency> 

import java.util.Properties; 
import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.exception.ZkInterruptedException; 
import kafka.utils.ZKStringSerializer$; 
import kafka.admin.AdminUtils; 

public final class KafkaUtils { 
    public static void main(String[] args) throws Exception {  
     KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());  
    } 

    public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) { 
     ZkClient zkClient = null; 
     try { 
      zkClient = getZkClient(zkHosts); 
      AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties); 
     } catch (Exception exception) { 
      exception.printStackTrace(); 
     } finally { 
      if (zkClient != null) { 
       try { 
        zkClient.close(); 
       } catch (ZkInterruptedException ex) { 
        ex.printStackTrace(); 
       } 

      } 
     } 
    } 

    private static ZkClient getZkClient(String zkHosts) { 
     ZkClient zkClient = null; 
     // Zookeeper sessionTimeoutMs 
     final int sessionTimeoutMs = 10000; 
     // Zookeeper connectionTimeoutMs 
     final int connectionTimeoutMs = 10000; 
     zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); 
     return zkClient; 
    } 
} 

Hier x.x.x.x und y.y.y.y sind zk Cluster-Hosts für kafka. Hoffe das hilft.

+0

Dies hilft sehr. Vielen Dank. Aber gibt es irgendeine Möglichkeit, die ich mit Shell-Skripten aufrufen kann? wie /kafka-topics.sh --zookeeper --topic Von Funken Cluster .. Ich bin nicht sicher über SSH, da beide in verschiedenen Cluster sind .. – Aru

+0

Wenn es n/w Konnektivität zwischen Spark und Kafka gibt und sein Tierpfleger (2181 Port offen) Cluster-Knoten, dann eine andere einfache Lösung sein kann, laden Sie die Kafka-Binärdatei von http://kafka.apache.org/downloads.html und extrahieren Sie auf einem der Knoten des Funke-Clusters und gehen Sie zu KAFKA_HOME und verwende den folgenden Befehl, der Teil deines Shell-Skripts sein kann. –

+0

bin/kafka-topics.sh --zookeeper xxxx: 2181, yyyy: 2181 --create --topic topicName --partitionen 1 --replication-factor 1 –

Verwandte Themen