2017-09-18 1 views
0

Mit Rubys "bunny" RabbitMQ-Client möchte ich, dass mein Producer (etwas Ruby-Code) eine Nachricht an einen Consumer sendet (ein Worker, der das "Sneakers" -Gemein verwendet). und ich möchte, dass mein Producer keine weitere Zeile seines Ruby-Codes ausführt, bis der Producer eine Bestätigung erhält, dass der Consumer meine Nachricht erhalten hat und etwas damit gearbeitet hat.Warte, bis der Nachrichtenproduzent die Bestätigung erhält, dass die Arbeit beendet ist

In meinem Verbraucher, mache ich etwas Arbeit und dann Aufruf Sneakers ack! Methode zu bestätigen, dass die Nachricht empfangen wurde und die Arbeit erledigt wurde.

In meinem Produzenten, bin ich confirm_select auf meine Bunny::Channel Beispiel nannte es in Bestätigungscode zu setzen, und nach publish -ing meine Botschaften, nenne ich wait_for_confirms auf dem Kanal angeblich warten, bis alle meine Nachrichten ack! -ed wurden durch der Verbraucher. (Ich habe versucht, zu implementieren, was ich in der Hase docs gefunden.)

Allerdings scheint es, dass mein Produzent nicht darauf wartet, dass der Verbraucher ack! aufrufen. Ich melde mich sowohl bei meinem Produzenten als auch bei meinen Kunden an und stelle fest, dass mein Produzent glaubt, Nachrichten seien bestätigt worden, bevor der Verbraucher sie tatsächlich anerkennt.

Wie kann ich einen RabbitMQ-Producer warten lassen, bis der Kunde seine Arbeit in Ruby beendet hat?

Ruby 2.3.3, RabbitMQ 3.6.12, Erlang 17.3.

Hier ist meine lockfile:

GEM 
    specs: 
    amq-protocol (2.2.0) 
    bunny (2.7.0) 
     amq-protocol (>= 2.2.0) 
    concurrent-ruby (1.0.5) 
    serverengine (1.5.11) 
     sigdump (~> 0.2.2) 
    sigdump (0.2.4) 
    sneakers (2.6.0) 
     bunny (~> 2.7.0) 
     concurrent-ruby (~> 1.0) 
     serverengine (~> 1.5.11) 
     thor 
    thor (0.20.0) 

PLATFORMS 
    ruby 

DEPENDENCIES 
    bunny 
    sneakers 

BUNDLED WITH 
    1.14.6 

Hier ist meine Verbraucher/Arbeiter (consumer_worker.rb):

class ConsumerWorker 
    include Sneakers::Worker 

    from_queue 'do-work-here', 
      exchange: 'do-work-here', 
      exchange_type: :direct, 
      durable: true, 
      prefetch: 1, 
      arguments: { 
       :'x-dead-letter-exchange' => 'do-work-here-retry' 
      }, 
      timeout_job_after: 5, 
      retry_timeout: 60000, 
      ack: true 

    def work(msg) 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "message received: #{msg}" 
    end 
    sleep 1 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "acknowledging message at: #{Time.now.to_i}" 
    end 
    ack! 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "acknowledged message at: #{Time.now.to_i}" 
    end 
    end 
end 

In einer Endanschlussstreifen ich diese Arbeiter leite mit:

bundle exec sneakers work ConsumerWorker --require consumer_worker.rb 

Hier ist mein Verlag():

require 'bunny' 
connection = Bunny.new('amqp://guest:[email protected]:5672').tap(&:start) 
channel = connection.create_channel 
channel.confirm_select 
queue = channel.queue('do-work-here', 
         {arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'}, 
         durable: true}) 
queue.publish('hello world', persistent: true) 
channel.wait_for_confirms 
open('ruby-debug.log', 'a') do |f| 
    f.puts "messages confirmed at: #{Time.now.to_i}" 
end 

Wenn ich den folgenden Befehl in einem anderen Tab:

ruby ./publisher.rb 

Dann meine Protokolldatei (./ruby-debug.log) enthält die folgenden Zeilen:

message received: hello world 
messages confirmed at: 1505774819 
acknowledging message at: 1505774820 
acknowledged message at: 1505774820 

Was ich will, ist für die Bestellung von Ereignisse zu sein wie folgt:

Wie ziehe ich das ab?

+0

Ich denke, mein Verständnis, wie Herausgeber bestätigt, soll funktionieren, ist falsch. Siehe https://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2014-Januar/033269.html. Dieses Tutorial scheint, als könnte es relevant sein, um mein Problem zu lösen: https://www.rabbitmq.com/tutorials/tutorial-six-ruby.html – Jackson

Antwort

0

Der Herausgeber bestätigt nur Cover-Publisher-to-RabbitMQ-Kommunikation. Publisher sind sich der Konsumenten nicht bewusst.

Siehe Tutorial 6 für ein Beispiel, wenn das Anfrage/Antwort-Muster: https://www.rabbitmq.com/getstarted.html.

ConditionVariable ist eine häufig verwendete Nebenläufigkeit, die dazu dient, weitere Aktionen zu verzögern, bis ein Ereignis eintritt, mit einem optionalen Timeout.

Der Herausgeber bestätigt und Verbraucher Bestätigungen sind unter http://www.rabbitmq.com/confirms.htm dokumentiert.

+0

Irgendeine Idee, wie man einen RPC mit Turnschuhen durchführt? – Jackson

Verwandte Themen