0

EDIT2: Schließlich hat ich meinen eigenen Produzenten mit Java gemacht und es funktioniert gut, so das Problem in dem Kafka-console-Produzenten ist. Der Kafka-Konsolen-Consumer funktioniert gut.Kafka + Spark-Streaming: konstante Verzögerung von 1 Sekunde

EDIT: Ich habe bereits mit der Version 0.9.0.1 versucht und hat das gleiche Verhalten.

Ich arbeite an meinem Bachelor-Abschluss-Projekt, einem Vergleich zwischen Spark Streaming und Flink. Vor beiden Frameworks verwende ich Kafka und ein Skript, um die Daten zu generieren (siehe unten). Mein erster Test besteht darin, die Latenz zwischen beiden Frameworks mit einfachen Workloads zu vergleichen und Kafka gibt mir eine wirklich hohe Latenz (1 Sekunde dauernd). Der Einfachheit halber laufe ich im Moment nur in einer Maschine, Kafka und Spark.

Ich habe bereits nach ähnlichen Problemen gesucht und gefunden, und versuchte die Lösungen, die sie geben, aber nichts änderte. Ich habe auch alle Kafka-Konfigurationen in der offiziellen Dokumentation geprüft und legte die importants für die Latenz in meine Config-Dateien, das ist meine Konfiguration:

Kafka 0.10.2.1 - Funken 2.1.0

server.properties :

num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
num.partitions=2 
num.recovery.threads.per.data.dir=1 
log.flush.interval.messages=1000 
log.flush.interval.ms=50 
log.retention.hours=24 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=6000 
flush.messages=100 
flush.ms=10 

producer.properties:

compression.type=none 
max.block.ms=200 
linger.ms=50 
batch.size=0 
Streaming-Programm 10

Funken: (die die empfangenen Daten ausdruckt, und die Differenz zwischen dem, wenn die Daten erzeugt wurde, und wenn für die Funktion verarbeitet wird)

package com.tfg.spark1.spark1; 

import java.util.Map; 
import java.util.HashMap; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.*; 
import scala.Tuple2; 
import org.apache.spark.streaming.kafka.*; 

public final class Timestamp { 

    public static void main(String[] args) throws Exception { 
     if (args.length < 2) { 
      System.err.println("Usage: Timestamp <topics> <numThreads>"); 
      System.exit(1); 
     } 

     SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp"); 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100)); 


     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
     int numThreads = Integer.parseInt(args[1]); 
     topicMap.put(args[0], numThreads); 

     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2> 

     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      private static final long serialVersionUID = 1L; 

      public String call (Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 

     JavaDStream<String> newLine = lines.map(new Function<String, String>() { 
      private static final long serialVersionUID = 1L; 

      public String call(String line) { 
       String[] tuple = line.split(" "); 
       String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1])); 
       //String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime); 

       return totalTime; 
      } 
     }); 

     lines.print(); 
     newLine.print(); 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

Die Daten erzeugt hat das folgende Format:

"Random bits" + " " + "current time in ms" 
01 1496421618634 
11 1496421619044 
00 1496421619451 
00 1496421618836 
10 1496421619247 

Schließlich, wenn ich mein Spark-Streaming-Programm ausführen und den Skript-Generator, der die Daten alle 200 ms erzeugt, Funke (Batch-Intervall = 100ms) druckt 9 leer Chargen, und jede Sekunde (immer 900ms Moment, wie in diesem Beispiel : Tim e: 1496421619 ms) ergibt:

------------------------------------------- 
Time: 1496421619900 ms 
------------------------------------------- 
01 1496421618634 
11 1496421619044 
00 1496421619451 
00 1496421618836 
10 1496421619247 
------------------------------------------- 
Time: 1496421619900 ms 
------------------------------------------- 
1416 
1006 
599 
1214 
803 

Auch wenn ich ein Kafka-Befehlszeile-Hersteller und andere Befehlszeilen-Consumer ausführen, dauert es immer eine gewisse Zeit, um die erzeugten Daten drucken in der Verbraucher.

Vielen Dank im Voraus für die Hilfe!

+1

versuchen einfachen Verbraucher zuerst zu sehen, wenn seine Funken spezifisch oder kafka spezifisch sind. Es gibt wenige Posts (auch von linkedin), die 30 ms Latenz melden. –

+0

Meinst du den Kafka-Konsolen-Consumer? Ich habe es schon ausprobiert und es erhalten die Elemente auch mit Verzögerung. Ich habe auch von mehreren Seiten gelesen, dass es diese Latenz erreichen kann. Ich werde versuchen, auch eine ältere Kafka-Version zu verwenden. Vielen Dank! : D – Franmoti

+0

Es hängt möglicherweise auch von Ihrer Hardware ab (z. B. Anzahl der Threads). Versuchen Sie auch, System im stabilen Zustand zu sehen (nicht nur ein-zwei Nachrichten) vielleicht braucht es Zeit zum "Aufwärmen" –

Antwort

1

Ich habe gerade die JIRA aktualisiert, die Sie mit dem Grund geöffnet haben, warum Sie immer die Verzögerung von 1000 ms sehen.

https://issues.apache.org/jira/browse/KAFKA-5426

ich hier der Grund berichten ...

der linger.ms Parameter wird auf der Kommandozeile die --timeout Option, die, wenn nicht 1000 ms angegeben ist. Gleichzeitig die Charge.Größe Parameter wird mit der --max-partition-memory-bytes Option in der Befehlszeile festgelegt, die, wenn nicht angegeben ist 16384. Es bedeutet, dass, auch wenn Sie linger.ms und batch.size mit --producer-Eigenschaft oder --producer.config angeben, werden sie immer überschrieben mit den oben genannten "spezifischen" Optionen.

Verwandte Themen