Ich habe einen Java-typisierten Aktor, der für die Filter-/Wiederholungslogik auf einer externen Ressource verantwortlich ist, die vorübergehend nicht verfügbar ist. Die Felder des Akteurs und gemeinsame Methoden sind:Warten auf unbegrenzte Zeit für eine Nachricht, die möglicherweise nie eintreffen wird
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
Der Schauspieler mehrere Operationen hat, dass alle mehr oder weniger die gleiche Retry/Pufferlogik; Jede Operation hat ihre eigenen Felder [op]Wait
und [op]Buffer
.
- Die Mutter Schauspieler rufen
void operation(OpInput opInput)
- Die vorhergehende Methode ruft
void operation(OpInput opInput, long currentWait)
DEFAULTWAIT
für den zweiten Parameter unter Verwendung - Wenn die
currentWait
Parameter nicht gleichopWait
dann wird der Eingang inopBuffer
gespeichert, andernfalls die Eingabe an gesendet wird die externe Ressource. - Wenn die externe Ressource einen Erfolg zurückgibt, wird
opWait
aufDEFAULTWAIT
festgelegt, und der Inhalt vonopBuffer
wird über die Methodeoperation(opInput)
zurückgesendet. Wenn die externe Ressource (oder wahrscheinlicher das Netzwerk) einen Fehler zurückgibt, aktualisiere ichopWait = updateWait(opWait)
und planeoperation(opInput, opWait)
auf dem Aktorsystem-Scheduler mit einer Verzögerung vonopWait
ms.
I.e. Ich verwende den Actor-System-Scheduler, um Exponential-Backoff zu implementieren; Ich verwende den Parameter currentWait
, um die Nachricht zu identifizieren, die ich erneut versuche, und puffere die anderen Nachrichten, bis die primäre Nachricht erfolgreich von der externen Ressource verarbeitet wurde.
Das Problem ist, dass, wenn die geplante operation(opInput, currentWait)
Nachricht verloren ist, dann werde ich Nachrichten für immer Puffer werden, weil die currentWait == opWait
Wache für alle anderen Nachrichten fehlschlagen wird. Ich könnte etwas wie spring-retry verwenden, um exponentielles Backoff zu implementieren, aber ich sehe keine Möglichkeit, die Wiederholungsschleifen der Operationen zusammenzuführen, was bedeutet, dass ich einen Thread pro Wiederholungsschleife verwenden könnte (wobei die Verwendung des Schedulers des Aktorsystems nicht viel mehr bedeutet) einer Belastung für das System).
Ich bin auf der Suche nach einer mehr fehlertoleranten Möglichkeit, Pufferung und exponentielle Backoff auf der Schnittstelle zwischen einem Akteur und einer externen Ressource zu implementieren, ohne zu viele Ressourcen für die Aufgabe zuweisen.
Mit "extern aufgerufen" meinte ich, dass die Methode von einem anderen Akteur aufgerufen wurde, während die "intern aufgerufenen" Methoden nur über 'self.operation (...) 'aufgerufen werden. Und vielen Dank für den Vorschlag, statt einer geplanten Nachricht eine Zukunft zu verwenden, die den Code erheblich vereinfacht –