2017-06-21 1 views
2

Ich habe eine Anwendung, die auf einem Server ausgeführt wird, der Anforderungen von einer Telefonanwendung annimmt und dann die Anforderung auf Arbeitsservern ausgleicht. Ich versuche, eine Zeitüberschreitung hinzuzufügen, wenn Nachrichten auf dem Hauptserver, die für die Dauer des Zeitlimits in der Ausgangs-Warteschlange waren, aus der Warteschlange entfernt werden. Genauer gesagt ist die Anwendung auf dem Hauptserver in Golang geschrieben und implementiert den Paranoid Pirate Pattern des Lastausgleichs. Der Code, den ich zur Zeit habe, ist:Wie kann ZeroMQ für Timeout-Nachrichten verwendet werden, die in der Warteschlange stehen, aber nicht innerhalb einer festgelegten Zeit gesendet wurden?

import (
    "fmt" 
    zmq "github.com/pebbe/zmq4" 
    "time" 
) 

const (
    HEARTBEAT_LIVENESS = 3 
    HEARTBEAT_INTERVAL = 1500 * time.Millisecond 

    MESSAGE_READY  = "\001" 
    MESSAGE_HEARTBEAT = "\002" 
) 

var (
    client *zmq.Socket 
    backend *zmq.Socket 
    frontend *zmq.Socket 
    workerPoller *zmq.Poller 
    brokerPoller *zmq.Poller 
    workerQueue []Worker 
) 

type Worker struct { 
    Id string 
    Expire time.Time 
} 

type RequestWrapper { 
    RequestToSend Request 

} 

func NewWorker(id string) Worker { 
    return Worker{ 
     Id: id, 
     Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS), 
    } 
} 

func AddReadyWorker(workers []Worker, worker Worker) []Worker { 
    fmt.Println(worker.Id, " joined") 
    for i, w := range workers { 
     if worker.Id == w.Id { 
      if i == 0 { 
       workers = workers[1:] 
      } else if i == len(workers)-1 { 
       workers = workers[:i] 
      } else { 
       workers = append(workers[:i], workers[i+1:]...) 
      } 
      break 
     } 
    } 
    return append(workers, worker) 
} 

func PurgeInactiveWorkers() { 
    now := time.Now() 
    for i, worker := range workerQueue { 
     if now.Before(worker.Expire) { 
      workerQueue = workerQueue[i:] 
      return 
     } 
    } 

    workerQueue = workerQueue[0:0] 
} 

func LoadBalance() { 
// Loop: 
    heartbeat := time.Tick(HEARTBEAT_INTERVAL) 
    for { 
     var sockets []zmq.Polled 

     // If you have available workers, poll on the both front and backend 
     // If not poll on backend with infinite timeout 
     if len(workerQueue) > 0 { 
      sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL) 
     } else { 
      sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL) 
     } 

     for _, socket := range sockets { 
      switch socket.Socket { 
       // backend is a router 
       case backend: 
        workerId, _ := backend.Recv(0) 
        workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId)) 
        clientId, _ := backend.Recv(0) 
        if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT { 
         route, _ := backend.Recv(0) 
         message, _ := backend.RecvBytes(0) 

         fmt.Println("Received response") 
         RouteResponse(route, message) 

         // frontend.Send(clientId, zmq.SNDMORE) 
         // frontend.Send("", zmq.SNDMORE) 
         // frontend.SendBytes(message, 0) 
        } 
       // frontend is a dealer 
       case frontend: 
        clientId, _ := frontend.Recv(0) 
        route, _ := frontend.Recv(0) 
        message, _ := frontend.RecvBytes(0) 

        backend.Send(workerQueue[0].Id, zmq.SNDMORE) 
        backend.Send(clientId, zmq.SNDMORE) 
        backend.Send(route, zmq.SNDMORE) 
        backend.SendBytes(message, 0) 

        workerQueue = workerQueue[1:] 
      } 
     } 

     select { 
      case <-heartbeat: 
       for _, worker := range workerQueue { 
        backend.Send(worker.Id, zmq.SNDMORE) 
        backend.Send(MESSAGE_HEARTBEAT, 0) 
       } 
       break 
      default: 
     } 

     PurgeInactiveWorkers() 
    } 
} 

Wenn das Back-End eine Nachricht sendet, aber es ist nicht wirklich an einen Arbeitnehmer in einem gewissen Zeitraum gesendet, ich habe es geschickt ablaufen soll und nicht immer werden. Gibt es eine Socket-Option, die dies erreichen kann? Wenn nicht, was müsste ich tun, um dies zu erreichen?

Zwei Möglichkeiten, wie ich denke ich, das sind ohne Socket-Optionen kann:

1) Lassen Sie das Back-End die Nachricht in einem Wrapper wickeln und zu einer golang Warteschlange und nicht durch zeromq senden. Der Wrapper enthält die Uhrzeit, zu der die Nachricht "gesendet" wurde. Das Back-End zieht gleichzeitig nacheinander von der Vorderseite der Golang-Warteschlange und überprüft, ob die Nachricht abgelaufen ist. Wenn ja, nicht senden, wenn nicht, die Nachricht senden. Ich könnte das Backend haben, die Nachricht zuerst der Golang-Warteschlange hinzuzufügen und sie anschließend im selben Codeblock wirklich zu senden. Auf diese Weise brauche ich kein Schloss.

2) Senden Sie die Wrapper-Nachricht über zeromq an einen Retriever, und der Retriever überprüft, ob er abgelaufen ist, und kehrt vorzeitig zurück. Ich mag das nicht, weil es für seine Leistung schlecht ist.

+1

Anstatt abgelaufene Nachrichten aus der Warteschlange zu entfernen, fügen Sie der Nachricht eine "expires-at" -Eigenschaft hinzu, bevor Sie sie in die Warteschlange stellen. Arbeiter können diese Eigenschaft jedes Mal überprüfen, wenn sie eine Nachricht erhalten, und nur solche löschen, die abgelaufen sind. – colini

Antwort

0

Am Ende war die Lösung, eine expires-at-Eigenschaft wie @colini und @bazza erwähnt, und nach jeder Herzschlag Zeit abgelaufen Nachrichten aus der Warteschlange zu löschen. Es erwies sich jedoch als schwieriger, dies zu tun und alle Anforderungen meiner Anwendung zu erfüllen, als auf den ersten Blick. Daher verwendete ich RabbitMQ, dessen ttl-expires-Argument die gewünschte Funktionalität lieferte.

0

In neueren API-Versionen gibt es eine Option, alle "alten" Nachrichten zu verwerfen und immer nur die "neueste" zu liefern.

Wenn dies Ihren Erwartungen entspricht und alle Peers API v.4.0 + erfüllen, sind Sie fertig.

ZMQ_CONFLATE: Nur letzte Meldung

Wenn gesetzt, eine Buchse nur eine Nachricht in seiner Inbound/Outbound-Warteschlange halten soll, wird diese Meldung erhielt die letzte Nachricht ist/die letzte Nachricht gesendet werden. Ignoriert die Optionen ZMQ_RCVHWM und ZMQ_SNDHWM. Unterstützt keine mehrteiligen Nachrichten, insbesondere wird nur ein Teil davon in der internen Warteschlange des Sockets gehalten.
Option value typeint
Option value unitboolean
Default value0 (false)
Applicable socket typesZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER

+1

Mit dieser Option wird nur die letzte Nachricht gespeichert, die möglicherweise Nachrichten löscht, die noch nicht abgelaufen sind. Außerdem unterstützt es keine mehrteiligen Nachrichten, die ich sende. –

+0

Sicher! Dies waren genau die Gründe, die mich dazu brachten, die API-Spezifikationen zu kopieren. Und keine dieser Tatsachen ist per se ein Grund, diese Option nicht zu verwenden, wenn andere Bedingungen erfüllt sind (das Packen von Nutzlasten in BLOB-s ist eine übliche leistungsorientierte Übung). – user3666197

1

Was Sie versuchen, ist die Kommunikation als Ausführungs-Rendezvous. Der Absender möchte etwas darüber wissen, wann der Empfänger Nachrichten erhält.

ZMQ implementiert das Actor-Modell. Was Sie benötigen, ist eine Modifikation des Modells der kommunizierenden sequenziellen Prozesse (eines, bei dem das Zeitlimit gesendet wird).Im Grunde müssen Sie den Workern Kontrollnachrichten zuführen, da der Server den Worker auffordert, eine Nachricht zu empfangen, und der Server auf die Antwort wartet. Die Antwort bedeutet, dass der Sachbearbeiter sofort bereit ist, eine Nachricht zu empfangen, und dass der Server und der Sachbearbeiter beide bei einem Sende-/Empfangsvorgang in ihren Programmabläufen aufgetreten sind. Wenn diese Antwort nicht innerhalb von Timeout-Sekunden eintrifft, sendet der Server die eigentliche Nachricht nicht.

Oder Sie könnten schummeln, indem Sie alles zu den Arbeitern unabhängig, eingewickelt in einer Nachricht, die trägt ein "zum Zeitpunkt X gesendet" -Feld, und lassen Sie den Arbeiter entscheiden, alte Nachrichten zu verwerfen.

Verwandte Themen