2016-08-30 3 views
2

Ich versuche zu verstehen, wie Hochverfügbarkeit in Flink mit RabbiMQ zu verwenden ist, Korrelations-IDs verwenden und Prüfpunkte aktivieren, aber es funktioniert nicht. Mein Produzent Code:Flink RabbitMQ Korrelation ID

connection = factory.newConnection(); 
Channel channel = connection.createChannel(); 

String corrId = java.util.UUID.randomUUID().toString(); 
BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrId).build(); 

channel.queueDeclare("flink-poc", true, false, false, null); 
MessageQueue queue = new MessageQueue(500); //Queue of messages to be sent to rabbitmq 
Message msg = null; 
while ((msg = queue.takeMessage()) != null) 
    channel.basicPublish("", "flink-poc", props, mapper.writeValueAsBytes(msg)); 
channel.close(); 
connection.close(); 

Consumer Code:

StreamExecutionEnvironment env = StreamExecutionEnvironment 
      .getExecutionEnvironment(); 
env.setBufferTimeout(100); 
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE); // start a checkpoint every 1000 ms 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

RabbitSource<Message> rabbitSource = new RabbitSource<Message>(Host, 5672, username, pass, "flink-poc", VirtualHost, true, schema); 
messages = env.addSource(rabbitSource, TypeInformation.of(Message.class) 

Mit diesem Code wird kein ACK an RabbitMQ zurückgegeben. Jede Hilfe wird geschätzt.

Update: Mit einem Prefetch (channel.basicQos(15);) beginnt es zu arbeiten, aber mit einer sehr niedrigen Rate. Gibt es eine Möglichkeit, es zu verbessern? Wenn setStreamTimeCharacteristic deaktiviert ist, sind die Ergebnisse geordnet und verbessert die Rate um das 10-fache, aber immer noch sehr niedrige Rate, wie ist das möglich?

Antwort

1

Sie verwenden dieselbe correlationId für alle Nachrichten, Sie müssen für jede Nachricht eine neue verwenden. Von https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.html 'usesCorrelationId - Gibt an, ob die empfangenen Nachrichten mit einer eindeutigen ID versehen sind, um Nachrichten zu deduplizieren (im Falle von fehlgeschlagenen Bestätigungen). Wird nur verwendet, wenn Checkpointing aktiviert ist. '

+0

Vielen Dank j s, ich war so besessen, beobachtete den Rest des Codes und der API, die ich nicht wichtiger Detail bemerkt. – jag