2015-09-13 6 views
19

Ich habe einen Java-typisierten Aktor, der für die Filter-/Wiederholungslogik auf einer externen Ressource verantwortlich ist, die vorübergehend nicht verfügbar ist. Die Felder des Akteurs und gemeinsame Methoden sind:Warten auf unbegrenzte Zeit für eine Nachricht, die möglicherweise nie eintreffen wird

public class MyActorImpl implements MyActor { 
    private static final long MINWAIT = 50; 
    private static final long MAXWAIT = 1000; 
    private static final long DEFAULTWAIT = 0; 
    private static final double BACKOFFMULTIPLIER = 1.5; 

    private long updateWait(long currentWait) { 
     return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT); 
    } 

    // mutable 
    private long opWait = DEFAULTWAIT; 
    private final Queue<OpInput> opBuffer = new ArrayDeque<>(); 

    // called from external actor 
    public void operation(OpInput opInput) { 
     operation(opInput, DEFAULTWAIT); 
    } 

    // called internally 
    public void operation(OpInput opInput, long currentWait); 
} 

Der Schauspieler mehrere Operationen hat, dass alle mehr oder weniger die gleiche Retry/Pufferlogik; Jede Operation hat ihre eigenen Felder [op]Wait und [op]Buffer.

  1. Die Mutter Schauspieler rufen void operation(OpInput opInput)
  2. Die vorhergehende Methode ruft void operation(OpInput opInput, long currentWait)DEFAULTWAIT für den zweiten Parameter unter Verwendung
  3. Wenn die currentWait Parameter nicht gleich opWait dann wird der Eingang in opBuffer gespeichert, andernfalls die Eingabe an gesendet wird die externe Ressource.
  4. Wenn die externe Ressource einen Erfolg zurückgibt, wird opWait auf DEFAULTWAIT festgelegt, und der Inhalt von opBuffer wird über die Methode operation(opInput) zurückgesendet. Wenn die externe Ressource (oder wahrscheinlicher das Netzwerk) einen Fehler zurückgibt, aktualisiere ich opWait = updateWait(opWait) und plane operation(opInput, opWait) auf dem Aktorsystem-Scheduler mit einer Verzögerung von opWait ms.

I.e. Ich verwende den Actor-System-Scheduler, um Exponential-Backoff zu implementieren; Ich verwende den Parameter currentWait, um die Nachricht zu identifizieren, die ich erneut versuche, und puffere die anderen Nachrichten, bis die primäre Nachricht erfolgreich von der externen Ressource verarbeitet wurde.

Das Problem ist, dass, wenn die geplante operation(opInput, currentWait) Nachricht verloren ist, dann werde ich Nachrichten für immer Puffer werden, weil die currentWait == opWait Wache für alle anderen Nachrichten fehlschlagen wird. Ich könnte etwas wie spring-retry verwenden, um exponentielles Backoff zu implementieren, aber ich sehe keine Möglichkeit, die Wiederholungsschleifen der Operationen zusammenzuführen, was bedeutet, dass ich einen Thread pro Wiederholungsschleife verwenden könnte (wobei die Verwendung des Schedulers des Aktorsystems nicht viel mehr bedeutet) einer Belastung für das System).

Ich bin auf der Suche nach einer mehr fehlertoleranten Möglichkeit, Pufferung und exponentielle Backoff auf der Schnittstelle zwischen einem Akteur und einer externen Ressource zu implementieren, ohne zu viele Ressourcen für die Aufgabe zuweisen.

Antwort

8

Wenn ich verstehe Sie richtig, wenn das einzige Problem, die geplante Nachricht verliert, warum Sie nicht verwenden nur so etwas wie die Reliable Proxy Pattern für diese bestimmte Nachricht, und dann, wenn es nicht opWait = DEFAULTWAIT;

Es gibt etwas über deinen Code, den ich bekomme, verstehe ich nicht, was du meinst, wenn du sagst, dass public void operation(OpInput opInput) extern aufgerufen wird. Meinst du, dass diese Methode mit dem Netzwerk interagiert, das Ressourcen verwendet, die manchmal nicht verfügbar sind?

Wenn ich kann kann ich eine Alternative vorschlagen. Von dem, was ich verstehe, ist Ihr Hauptproblem, Sie haben eine Ressourcen, die manchmal nicht verfügbar ist, so dass Sie eine Art von que/Puffer mit einer Art von Wartelogik implementieren, so dass die Nachricht verarbeitet wird, sobald es wieder verfügbar ist, was leider beinhaltet einige Nachrichten, die verloren gehen können und zu einer unendlichen Wartezeit führen. Ich denke, dass Sie mit Futures mit Timeouts erreichen können, was Sie wollen. Dann versuche es erneut, wenn die Zukunft in dieser bestimmten Zeit nicht abgeschlossen ist, bis zu 3 Wiederholungen. Sie können diese Zeit sogar basierend auf der Serverlast und der Dauer einer Nachricht anpassen. Ich hoffe, das hilft.

+0

Mit "extern aufgerufen" meinte ich, dass die Methode von einem anderen Akteur aufgerufen wurde, während die "intern aufgerufenen" Methoden nur über 'self.operation (...) 'aufgerufen werden. Und vielen Dank für den Vorschlag, statt einer geplanten Nachricht eine Zukunft zu verwenden, die den Code erheblich vereinfacht –

Verwandte Themen