EDIT2: Schließlich hat ich meinen eigenen Produzenten mit Java gemacht und es funktioniert gut, so das Problem in dem Kafka-console-Produzenten ist. Der Kafka-Konsolen-Consumer funktioniert gut.Kafka + Spark-Streaming: konstante Verzögerung von 1 Sekunde
EDIT: Ich habe bereits mit der Version 0.9.0.1 versucht und hat das gleiche Verhalten.
Ich arbeite an meinem Bachelor-Abschluss-Projekt, einem Vergleich zwischen Spark Streaming und Flink. Vor beiden Frameworks verwende ich Kafka und ein Skript, um die Daten zu generieren (siehe unten). Mein erster Test besteht darin, die Latenz zwischen beiden Frameworks mit einfachen Workloads zu vergleichen und Kafka gibt mir eine wirklich hohe Latenz (1 Sekunde dauernd). Der Einfachheit halber laufe ich im Moment nur in einer Maschine, Kafka und Spark.
Ich habe bereits nach ähnlichen Problemen gesucht und gefunden, und versuchte die Lösungen, die sie geben, aber nichts änderte. Ich habe auch alle Kafka-Konfigurationen in der offiziellen Dokumentation geprüft und legte die importants für die Latenz in meine Config-Dateien, das ist meine Konfiguration:
Kafka 0.10.2.1 - Funken 2.1.0
server.properties :
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=2
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=1000
log.flush.interval.ms=50
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
flush.messages=100
flush.ms=10
producer.properties:
compression.type=none
max.block.ms=200
linger.ms=50
batch.size=0
Streaming-Programm 10
Funken: (die die empfangenen Daten ausdruckt, und die Differenz zwischen dem, wenn die Daten erzeugt wurde, und wenn für die Funktion verarbeitet wird)
package com.tfg.spark1.spark1;
import java.util.Map;
import java.util.HashMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.spark.streaming.kafka.*;
public final class Timestamp {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: Timestamp <topics> <numThreads>");
System.exit(1);
}
SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
int numThreads = Integer.parseInt(args[1]);
topicMap.put(args[0], numThreads);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2>
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call (Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> newLine = lines.map(new Function<String, String>() {
private static final long serialVersionUID = 1L;
public String call(String line) {
String[] tuple = line.split(" ");
String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1]));
//String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime);
return totalTime;
}
});
lines.print();
newLine.print();
jssc.start();
jssc.awaitTermination();
}
}
Die Daten erzeugt hat das folgende Format:
"Random bits" + " " + "current time in ms"
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
Schließlich, wenn ich mein Spark-Streaming-Programm ausführen und den Skript-Generator, der die Daten alle 200 ms erzeugt, Funke (Batch-Intervall = 100ms) druckt 9 leer Chargen, und jede Sekunde (immer 900ms Moment, wie in diesem Beispiel : Tim e: 1496421619 ms) ergibt:
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
1416
1006
599
1214
803
Auch wenn ich ein Kafka-Befehlszeile-Hersteller und andere Befehlszeilen-Consumer ausführen, dauert es immer eine gewisse Zeit, um die erzeugten Daten drucken in der Verbraucher.
Vielen Dank im Voraus für die Hilfe!
versuchen einfachen Verbraucher zuerst zu sehen, wenn seine Funken spezifisch oder kafka spezifisch sind. Es gibt wenige Posts (auch von linkedin), die 30 ms Latenz melden. –
Meinst du den Kafka-Konsolen-Consumer? Ich habe es schon ausprobiert und es erhalten die Elemente auch mit Verzögerung. Ich habe auch von mehreren Seiten gelesen, dass es diese Latenz erreichen kann. Ich werde versuchen, auch eine ältere Kafka-Version zu verwenden. Vielen Dank! : D – Franmoti
Es hängt möglicherweise auch von Ihrer Hardware ab (z. B. Anzahl der Threads). Versuchen Sie auch, System im stabilen Zustand zu sehen (nicht nur ein-zwei Nachrichten) vielleicht braucht es Zeit zum "Aufwärmen" –