2017-04-13 1 views
2

Ich habe ein paar Fragen bezüglich der Parallelität von flink. Das ist mein Setup:Apache Flink: Wie parallel ausgeführt wird, aber Reihenfolge der Nachrichten beibehalten?

Ich habe 1 Master-Knoten und 2 Slaves. In flink habe ich 3 Kafka-Konsumenten erstellt, die jeweils aus einem anderen Thema konsumieren.
Da die Reihenfolge der Elemente für mich wichtig ist, hat jedes Thema nur eine Partition und ich habe flink eingerichtet, um die Ereigniszeit zu verwenden.

Dann laufe ich die folgende Pipeline (in Pseudo-Code) auf jeder der Datenströme:

source 
.map(deserialize) 
.window 
.apply 
.map(serialize) 
.writeTo(sink) 

Bis jetzt habe ich mein flink Programm mit dem Argument gestartet -p 2 davon aus, dass dies würde mir erlauben, beide zu verwenden meiner Knoten. Das Ergebnis ist nicht das, was ich mir erhofft habe, da die Reihenfolge meiner Ausgabe manchmal durcheinander ist.

Nachdem durch die flink Dokumentation zu lesen und besser versuchen, es zu verstehen, könnte meinen folgenden „Learnings"?

1.) bestätigt -p 2 Passing konfiguriert die Task-Parallelität eines nur jemand bitte, dh die maximale Anzahl der parallelen Instanzen Aufgabe (wie map(deserialize)) wird in aufgeteilt werden. Wenn ich die Reihenfolge durch die gesamte Pipeline halten möchte, muss ich -p 1.

2.) Dies scheint mir widersprüchlich/verwirrend: auch wenn die Parallelität eingestellt ist 1, verschiedene Aufgaben können parallel (gleichzeitig) ausgeführt werden, daher werden meine 3 Pipelines auch in pa ausgeführt Parallel wenn ich -p 1 passiere.

Und als Follow-up-Frage: Gibt es eine Möglichkeit herauszufinden, welche Aufgaben zu welchem ​​Task-Slot zugeordnet wurden, so dass ich die parallele Ausführung selbst bestätigen konnte?

Ich würde jede Eingabe schätzen!

aktualisieren

Here ist Ausführungsplan des flink für -p 2.

Antwort

1

Nach der Frage nach den Apache Flink user email list hier gefragt zu haben ist die Antwort:

1.) Die -p Option definiert die Aufgabe Parallelität pro Auftrag. Wenn die Parallelität höher als 1 gewählt wird und Daten neu verteilt werden (z.B.via Rebalance() oder keyBy()) ist die Reihenfolge nicht garantiert.

2.) Mit -p auf 1 gesetzt wird nur 1 Task-Slot, d. H. 1 CPU-Core, verwendet. Daher können mehrere Threads gleichzeitig auf einem Kern laufen, aber nicht parallel.

Zu meinen Anforderungen: Um mehrere Pipelines parallel laufen zu lassen und trotzdem die Reihenfolge einzuhalten, kann ich einfach mehrere Flink Jobs ausführen, anstatt alle Pipelines innerhalb des gleichen Flink Jobs auszuführen.

0

Ich werde versuchen, mit dem zu antworten, was ich weiß.

1) Ja, mit dem CLI-Client kann der Parallelitätsparameter mit -p angegeben werden. Sie haben Recht, dies ist die maximale Anzahl von parallelen Instanzen. Ich sehe jedoch nicht die Verbindung zwischen der Parallelität und der Reihenfolge? Soweit ich weiß, wird die Bestellung von Flink mit dem Zeitstempel des Ereignisses oder seinem eigenen Aufnahmezeitstempel verwaltet. Wenn Sie die Reihenfolge in verschiedenen Datenquellen beibehalten möchten, scheint es für mich kompliziert zu sein oder Sie können diese verschiedenen Datenquellen zu einer zusammenführen.

2) Ihre 3 Pipelines können parallel laufen, wenn Sie eine Parallelität von 3 haben. Ich denke hier bedeutet Parallelität auf verschiedenen Slots.

Folgefrage) Sie können prüfen, welche Aufgaben im Web-Frontend des JobManagers unter http://localhost:8081 dem jeweiligen Task-Slot zugeordnet sind.

+0

Ich habe den Ausführungsplan meines Flink-Programms hochgeladen und dort können Sie sehen, dass vor der letzten Karte eine Neuverteilung erfolgt. Laut [this] (https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/programming-model.html#parallel-dataflows) flink docs ist die Reihenfolge nicht garantiert mit Rebalance (). Ich nehme an, dass es beim Schreiben auf Kafka zwischen den beiden Teilaufgaben Race-Bedingungen gibt, die meine Output-Reihenfolge durcheinander bringen. Daher glaube ich, dass Parallelismus> 1 meine Ergebnisse durcheinander bringt. – BenScape

0

Nachfolgend finden Sie ein Beispiel für die lokale Skalierung mithilfe von Side-Outputs und Slot-Gruppen.

package org.example 

/* 
* Licensed to the Apache Software Foundation (ASF) under one 
* or more contributor license agreements. See the NOTICE file 
* distributed with this work for additional information 
* regarding copyright ownership. The ASF licenses this file 
* to you under the Apache License, Version 2.0 (the 
* "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 

import org.apache.flink.streaming.api.functions.ProcessFunction 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.util.Collector 

/** 
    * This example shows an implementation of WordCount with data from a text socket. 
    * To run the example make sure that the service providing the text data is already up and running. 
    * 
    * To start an example socket text stream on your local machine run netcat from a command line, 
    * where the parameter specifies the port number: 
    * 
    * {{{ 
    * nc -lk 9999 
    * }}} 
    * 
    * Usage: 
    * {{{ 
    * SocketTextStreamWordCount <hostname> <port> <output path> 
    * }}} 
    * 
    * This example shows how to: 
    * 
    * - use StreamExecutionEnvironment.socketTextStream 
    * - write a simple Flink Streaming program in scala. 
    * - write and use user-defined functions. 
    */ 
object SocketTextStreamWordCount { 

    def main(args: Array[String]) { 
    if (args.length != 2) { 
     System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>") 
     return 
    } 

    val hostName = args(0) 
    val port = args(1).toInt 
    val outputTag1 = OutputTag[String]("side-1") 
    val outputTag2 = OutputTag[String]("side-2") 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.getConfig.enableObjectReuse() 

    //Create streams for names and ages by mapping the inputs to the corresponding objects 
    val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement") 
    val counts = text.flatMap { 
     _.toLowerCase.split("\\W+") filter { 
     _.nonEmpty 
     } 
    } 
     .process(new ProcessFunction[String, String] { 
     override def processElement(
            value: String, 
            ctx: ProcessFunction[String, String]#Context, 
            out: Collector[String]): Unit = { 
      if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value)) 
      else ctx.output(outputTag2, String.valueOf(value)) 
     } 
     }) 

    val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1) 
    val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2) 

    val output1 = sideOutputStream1.map { 
     (_, 1) 
    }.slotSharingGroup("map1") 
     .keyBy(0) 
     .sum(1) 

    val output2 = sideOutputStream2.map { 
     (_, 1) 
    }.slotSharingGroup("map2") 
     .keyBy(0) 
     .sum(1) 

    output1.print() 
    output2.print() 

    env.execute("Scala SocketTextStreamWordCount Example") 
    } 

} 
Verwandte Themen