2016-04-18 7 views
0

Problemstellung: Ich möchte zwei Streams von zwei verschiedenen Kafka Tüllen (sagen S1 und S2) verbinden und wollen die Tupel von jedem Strom basierend auf einigen gemeinsamen Feld in Sie. wenn „S1“ BEKOMMT unten json als TupelJoin zwei Ströme basierend auf gemeinsamen Feld in Sturm Schraube

{"l7ProtocolID":"dhcp", 
"packets_out":1, 
"bytes_out":400, 
"start_time":1454281199898, 
"flow_sample":0, 
"duration":102, 
"path":["base","ip","udp","dhcp"], 
"bytes_in":1200, 
"l4":[{"client":"68","server":"67","level":0}], 
"l2":[{"client":"52:54:00:50:04:B2","server":"FF:FF:FF:FF:FF:FF","level":0}], 
"l3":[{"client":"::ffff:0.0.0.0","server":"::ffff:255.255.255.255","level":0}], 
"flow_id":"81454281200000731489", 
"applicationID":"dhcp", 
"packets_in":1} 

und „S2“ BEKOMMT unten JSON als Tupel

{"portGroupName":"dhcp", 
"hypervisorName":1, 
"bytes_out":400, 
"monitoredIP":1454281199898, 
"monitoredInstance":0, 
"duration":102, 
"bytes_in":1200, 
"flow_id":"81454281200000731489", 
"tenant":1} 

Ich mag würden beide auf einem gemeinsamen Feld basiert beizutreten sagen: „flow_id“ hier für den Fall. Beispiel oder Ansatz vorschlagen. Verwirrt mit .fieldsGrouping, Ist das eine Lösung für meinen Anwendungsfall.

Antwort

0

Sie Tident API verwenden, kann ein tun beitreten:

TridentTopology topology = new TridentTopology(); 
// do some stuff here 
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c")); 

See documenation für weitere Informationen: https://storm.apache.org/releases/1.0.0/Trident-API-Overview.html

Wenn Sie Low-Level-API verwenden möchten, fieldsGrouping verwenden wäre richtig (natürlich , müssen Sie über "Windowing" von sich selbst denken)

Etwas wie folgt aus:

TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("spout1",...); 
builder.setSpout("spout2",...); 

builder.setSpout("join",...) 
     .fieldsGrouping("spout1", new Fields("flow_id")) 
     .fieldsGrouping("spout2", new Fields("flow_id")); 
Verwandte Themen