Ich habe eine Anwendung, die auf einem Server ausgeführt wird, der Anforderungen von einer Telefonanwendung annimmt und dann die Anforderung auf Arbeitsservern ausgleicht. Ich versuche, eine Zeitüberschreitung hinzuzufügen, wenn Nachrichten auf dem Hauptserver, die für die Dauer des Zeitlimits in der Ausgangs-Warteschlange waren, aus der Warteschlange entfernt werden. Genauer gesagt ist die Anwendung auf dem Hauptserver in Golang geschrieben und implementiert den Paranoid Pirate Pattern des Lastausgleichs. Der Code, den ich zur Zeit habe, ist:Wie kann ZeroMQ für Timeout-Nachrichten verwendet werden, die in der Warteschlange stehen, aber nicht innerhalb einer festgelegten Zeit gesendet wurden?
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond
MESSAGE_READY = "\001"
MESSAGE_HEARTBEAT = "\002"
)
var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)
type Worker struct {
Id string
Expire time.Time
}
type RequestWrapper {
RequestToSend Request
}
func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, " joined")
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}
func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}
workerQueue = workerQueue[0:0]
}
func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled
// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) > 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}
for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)
fmt.Println("Received response")
RouteResponse(route, message)
// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send("", zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)
backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)
workerQueue = workerQueue[1:]
}
}
select {
case <-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}
PurgeInactiveWorkers()
}
}
Wenn das Back-End eine Nachricht sendet, aber es ist nicht wirklich an einen Arbeitnehmer in einem gewissen Zeitraum gesendet, ich habe es geschickt ablaufen soll und nicht immer werden. Gibt es eine Socket-Option, die dies erreichen kann? Wenn nicht, was müsste ich tun, um dies zu erreichen?
Zwei Möglichkeiten, wie ich denke ich, das sind ohne Socket-Optionen kann:
1) Lassen Sie das Back-End die Nachricht in einem Wrapper wickeln und zu einer golang Warteschlange und nicht durch zeromq senden. Der Wrapper enthält die Uhrzeit, zu der die Nachricht "gesendet" wurde. Das Back-End zieht gleichzeitig nacheinander von der Vorderseite der Golang-Warteschlange und überprüft, ob die Nachricht abgelaufen ist. Wenn ja, nicht senden, wenn nicht, die Nachricht senden. Ich könnte das Backend haben, die Nachricht zuerst der Golang-Warteschlange hinzuzufügen und sie anschließend im selben Codeblock wirklich zu senden. Auf diese Weise brauche ich kein Schloss.
2) Senden Sie die Wrapper-Nachricht über zeromq an einen Retriever, und der Retriever überprüft, ob er abgelaufen ist, und kehrt vorzeitig zurück. Ich mag das nicht, weil es für seine Leistung schlecht ist.
Anstatt abgelaufene Nachrichten aus der Warteschlange zu entfernen, fügen Sie der Nachricht eine "expires-at" -Eigenschaft hinzu, bevor Sie sie in die Warteschlange stellen. Arbeiter können diese Eigenschaft jedes Mal überprüfen, wenn sie eine Nachricht erhalten, und nur solche löschen, die abgelaufen sind. – colini