Sie können Java API und Maven Abhängigkeiten (kafka und zookeeper) haben, um kafka Thema wie unten zu erstellen. Sie können den Code aus dem Code aufrufen, an den Sie die Funke-Anwendung senden.
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import kafka.utils.ZKStringSerializer$;
import kafka.admin.AdminUtils;
public final class KafkaUtils {
public static void main(String[] args) throws Exception {
KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());
}
public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) {
ZkClient zkClient = null;
try {
zkClient = getZkClient(zkHosts);
AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties);
} catch (Exception exception) {
exception.printStackTrace();
} finally {
if (zkClient != null) {
try {
zkClient.close();
} catch (ZkInterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private static ZkClient getZkClient(String zkHosts) {
ZkClient zkClient = null;
// Zookeeper sessionTimeoutMs
final int sessionTimeoutMs = 10000;
// Zookeeper connectionTimeoutMs
final int connectionTimeoutMs = 10000;
zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
return zkClient;
}
}
Hier x.x.x.x und y.y.y.y sind zk Cluster-Hosts für kafka. Hoffe das hilft.
Dies hilft sehr. Vielen Dank. Aber gibt es irgendeine Möglichkeit, die ich mit Shell-Skripten aufrufen kann? wie /kafka-topics.sh --zookeeper --topic Von Funken Cluster .. Ich bin nicht sicher über SSH, da beide in verschiedenen Cluster sind .. –
Aru
Wenn es n/w Konnektivität zwischen Spark und Kafka gibt und sein Tierpfleger (2181 Port offen) Cluster-Knoten, dann eine andere einfache Lösung sein kann, laden Sie die Kafka-Binärdatei von http://kafka.apache.org/downloads.html und extrahieren Sie auf einem der Knoten des Funke-Clusters und gehen Sie zu KAFKA_HOME und verwende den folgenden Befehl, der Teil deines Shell-Skripts sein kann. –
bin/kafka-topics.sh --zookeeper xxxx: 2181, yyyy: 2181 --create --topic topicName --partitionen 1 --replication-factor 1 –