2017-09-18 1 views
1

Ich versuche herauszufinden, wie es passiert: Ich habe ein Programm lesen von mehreren socketTextStream und diese Text-Streams Feed in verschiedenen Datenfluss (und diese Datenströme nie verbinden in meinem Job). Es sieht so etwas wie unten:Flink SocketTextStream Quelle für einen einzelnen Rechner geplant

for(int i =0; i< hosts.length; i++) { 

    DataStream<String> someStream = env.socketTextStream(hosts[i], ports[i]); 
    DataStream<Tuple2<String, String>> joinedAdImpressions = rawMessageStream.rebalance() ... 
} 

Allerdings, wenn ich den Job auf einem Cluster ausgeführt fand ich, dass alle Quell Aufgabe einer Maschine geplant sind, so dass die Maschine ein schwerer Engpass für die Leistung wird. Irgendwelche Ideen wie würde das passieren?

Danke!

+0

Ich habe nicht viel mit socketTextStream herumgetan, also kann ich nur eine Richtung empfehlen, in die man schauen muss. Wenn Sie ein Kafka-Thema als Datenquelle verwenden (env.addSource (FlinkKafkaConsumer)), wenn nur eine Partition für den Cluster vorhanden ist, werden alle von der Kafka-Datenquelle empfangenen Daten nur an eine einzige Maschine gesendet. Wenn ich also eine Parallelität von 3 habe, fließen die Daten nur durch eine der 3 (ich will sicher sein, dass du meinst, wenn du sagst, dass sie nur durch eine Maschine fließt). Es klingt, als wäre das eine ähnliche Sache, nur mit einer anderen Art von Datenquelle. – Jicaar

Antwort

0

Der Grund, warum alle verschiedenen SocketTextStreamFunction Quellen auf derselben Maschine geplant sind, liegt an der gemeinsamen Nutzung von Steckplätzen. Mit der Steckplatzfreigabe kann Flink Tasks verschiedener Betreiber in denselben Steckplatz einplanen. Dies ermöglicht zum Beispiel, eine bessere Kollokation zwischen Aufgaben zu erreichen, die voneinander abhängen (z. B. Build-Side-, Probe-Side- und tatsächliche Join-Operatoren, die in demselben Slot laufen). Darüber hinaus erleichtert es Ihnen, über die Anzahl der Slots, die Ihre Anwendung benötigt, nachzudenken. Dies ist die maximale Parallelität Ihres Jobs.

Der Nachteil besteht jedoch darin, dass unabhängige Komponenten Ihres Jobs nicht über den Cluster verteilt werden, sondern aufgrund von Slots gemeinsam in denselben (n) Slot (s) (folglich auch auf demselben Rechner) landen.

Sie können die Steckplatzfreigabe für Teile Ihres Auftrags deaktivieren, wenn Sie explizit einen anderen Namen für die Steckplatzfreigabe festlegen. Dann unterliegen nur Betreiber, die derselben Steckplatz-Sharing-Gruppe zugeordnet sind, der Steckplatz-Sharing. Downstream-Operatoren erben die Slots-Sharing-Gruppe von ihren Eingaben. Wenn Sie also einen peinlich parallelen Job haben, genügt es, nur die Slot-Sharing-Gruppe an den Quellen zu setzen.

for(int i =0; i< hosts.length; i++) { 
    DataStream<String> someStream = env 
     .socketTextStream(hosts[i], ports[i]) 
     .slotSharingGroup("socket_" + i); 

    DataStream<Tuple2<String, String>> joinedAdImpressions = rawMessageStream.rebalance() ... 
} 
+0

Danke, das hat mein Problem perfekt gelöst. –

Verwandte Themen