2014-01-21 5 views
10

Ich habe begonnen Gewitter, so schaffe ich einfache Topologie this tutorialSturm des Auslauf nicht Ack bekommen

Verwendung Wenn ich meine Topologie mit LocalCluster laufen und alle scheinen in Ordnung, Mein Problem ist, dass ich immer ACK nicht auf dem Tupel, was bedeutet, dass mein Auslauf ack nie aufgerufen wird.

mein Code ist unten - wissen Sie, warum ack nicht aufgerufen wird?

so meine Topologie aussehen wie diese

public StormTopology build() { 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
      helloWorldSpout, spoutParallelism); 

     HelloWorldBolt bolt = new HelloWorldBolt(); 

     builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
        bolt, boltParallelism) 
       .shuffleGrouping(HelloWorldSpout.class.getSimpleName()); 
} 

Meiner Spout so aussehen

public class HelloWorldSpout extends BaseRichSpout implements ISpout { 
    private SpoutOutputCollector collector; 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("int")); 
    } 

    public void open(Map conf, TopologyContext context, 
      SpoutOutputCollector collector) { 
     this.collector = collector; 
    } 

    private static Boolean flag = false; 
    public void nextTuple() { 
     Utils.sleep(5000); 

      //emit only 1 tuple - for testing 
     if (!flag){ 
      this.collector.emit(new Values(6)); 
      flag = true; 
     } 
    } 

    @Override 
    public void ack(Object msgId) { 
     System.out.println("[HelloWorldSpout] ack on msgId" + msgId); 
    } 

    public void fail(Object msgId){ 
     System.out.println("[HelloWorldSpout] fail on msgId" + msgId); 
    } 
} 

und mein Bolzen aussehen wie dieser

@SuppressWarnings("serial") 
public class HelloWorldBolt extends BaseRichBolt{ 
    private OutputCollector collector; 

    public void prepare(Map conf, TopologyContext context, 
        OutputCollector collector) { 
     this.collector = collector; 
     logger.info("preparing HelloWorldBolt"); 
    } 

    public void execute(Tuple tuple) { 
     System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0)); 
     this.collector.ack(tuple); 
    } 

    public void cleanup() { 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // TODO Auto-generated method stub 

    } 
} 
+2

+1 Ich mag Ihr Muster von „HelloWorldSpout.class.getSimpleName()“ in der Shuffle-Gruppierung wirklich. Ich verstehe nicht, warum so viele Java-Apis auf magische Strings und magische Zahlen angewiesen sind (im Gegensatz zu Enums), aber dein Muster ist eine nette Art, sich nicht zu verbrennen. –

Antwort

15

Ihre emit() -Methode in Der Auslauf hat nur ein Argument, so dass das Tupel nicht verankert ist. Aus diesem Grund erhalten Sie keinen Aufruf an die ack() -Methode in der Tülle, obwohl Sie das Tupel in der Schraube quittieren.

Um dies zum Laufen zu bringen, müssen Sie Ihren Auslauf ändern, um ein zweites Argument auszugeben, das die Nachrichten-ID ist. Es ist diese ID, die zurück zur ack() -Methode in dem Auslauf übergeben wird:

public void nextTuple() { 
    Utils.sleep(5000); 

     //emit only 1 tuple - for testing 
    if (!flag){ 
     Object msgId = "ID 6"; // this can be any object 
     this.collector.emit(new Values(6), msgId); 
     flag = true; 
    } 
} 


@Override 
public void ack(Object msgId) { 
    // msgId should be "ID 6" 
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId); 
} 
+0

funktioniert gut! 10x – Mzf