2017-07-17 1 views
1

enter image description hereMeine Fehlermethode in der Auslaufklasse funktioniert nur für die erste Schraube, ab der zweiten Schraube funktioniert sie nicht.

Hinweis:
Bolt1 enthält eine Liste der ersten drei Primzahl (2,3,5).
Bolt2 enthält eine Liste der zweiten drei Sätze der Primzahl (7,11,13).
In Bolt3 überprüfen Sie einfach die Nummer ist prime oder nicht.
Von der ersten Schraube kann ich Fail() aus der Auslaufklasse aufrufen, aber ab der zweiten Schraube kann ich Fail() aus der Auslaufklasse nicht aufrufen.

Topologie Klasse:

 ...... 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("spout", new SpoutClass(), 1); 
     builder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("spout"); 
     builder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1"); 
     builder.setBolt("bolt3", new Bolt3(), 1).shuffleGrouping("bolt2"); 

Spout Klasse:

SpoutClass implements IRichSpout{ 
    private SpoutOutputCollector collector; 
    private TopologyContext context; 

    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     this.context = context; 
     this.collector = collector; 
     } 

    public void nextTuple() { 
     try { 
      //messageQueue is blocking queue which contains data 
      String msg = messageQueue.take(); 
      String ackId = msg; 
      this.collector.emit(new Values(msg), ackId); 

     }catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
    public void ack(Object msgId) { 

     System.out.println("Acknowledges that this tuple has been processed ........... " + msgId); 

    } 

    public void fail(Object msgId) { 

     System.out.println("FAILED To Process Message :-" + msgId); 

    } 
} 

Bolt1 Klasse:

public class Bolt1 extends BaseRichBolt { 
private OutputCollector collector; 
ArrayList<Integer> firstthreePrime = new ArrayList<Integer>(); 
     firstthreePrime.add(2); 
     firstthreePrime.add(3); 
     firstthreePrime.add(5); 
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 
    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt1."); 
     Integer number = Integer.valueOf(message); 
     if (check this number contains bolt1 or not) { 
      //if number is contains 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt1"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("msg")); 
    } 
} 

Bolt2 Klasse:

public class Bolt2 extends BaseRichBolt { 
private OutputCollector collector; 
ArrayList<Integer> secondthreePrime = new ArrayList<Integer>(); 
     secondthreePrime.add(7); 
     secondthreePrime.add(11); 
     secondthreePrime.add(13); 
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 

    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt2."); 
     Integer number = Integer.valueOf(message); 
     if (check this number contains bolt2 or not) { 
      //if number is contains 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt2"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("msg")); 
    } 
} 

Bolt3 Klasse:

public class Bolt3 extends BaseRichBolt { 
private OutputCollector collector; 

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 

    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt3."); 
     Integer number = Integer.valueOf(message); 
     if (check this number is prime or not) { 
      //if number is prime 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt3"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    } 
} 

Antwort

1

Da Sie BaseRichBolt verwenden, wollen Sie nicht Ihre ausgehenden Tupeln zu verankern?

_collector.emit(tuple, new Values(message)); 

Wenn Sie sie nicht verankern, haben sie keine Verbindung zu dem Tupel, das aus der Tülle kam. Überprüfen Sie die Dokumentation: Guaranteeing Message Processing

+0

In der Auslauf-Klasse können wir nicht mit Tupel emittieren. Es funktioniert nach dem Hinzufügen in die Schraubenklasse: collector.emit (Tupel, neue Werte (Nachricht)); – Ashish

+0

Ja sorry, das war ein Tippfehler gemeint BaseRichBolt. Wenn das Problem behoben wurde, akzeptiere bitte meine Antwort. –

Verwandte Themen