12

Ich habe einen Thread namens T1 zum Lesen einer flachen Datei und zum Parsing. Ich muss einen neuen Thread mit dem Namen T2 für das Parsen eines Teils dieser Datei erstellen und später dieser T2 Thread müsste den Status der ursprünglichen Entität aktualisieren, die auch von der ursprünglichen Thread T1 analysiert und aktualisiert wird. Wie kann ich damit umgehen Lage?Wie behandelt man zwei Threads, die die gleiche Zeile in einer Datenbank aktualisieren

Ich erhalte eine flache Datei, die die folgenden Beispieldatensätze mit:

AAAA 
BBBB 
AACC 
BBCC 
AADD 
BBDD 

Zuerst diese Datei in der Datenbank in Received Status gespeichert wird. Nun müssen alle Datensätze beginnend mit BB oder mit AA in einem separaten Thread bearbeitet werden. Nach erfolgreicher Analyse versuchen beide Threads, den Status dieses Dateiobjekts in einer Datenbank auf Parsed zu aktualisieren. In einigen Fällen bekomme ich staleObjectException. Edit: Und die Arbeit von einem Thread vor der Ausnahme ist verloren. Wir verwenden optimistisches Sperren. Was ist der beste Weg, um dieses Problem zu vermeiden?

Possible hibernate exceptions when two threads update the same Object?

Der obige Beitrag hilft einen Teil davon zu verstehen, aber es hilft nicht, mein Problem zu lösen.

+2

Sie haben also eine Flat-Datei und ein Rennen zwischen zwei Threads, um ein Feld zu aktualisieren, das bei Abschluss "geparst" wird? Also ist es erlaubt, bei einer Datei mit einem AA und einem agazillion BB, dass AA-Parsing in Millisekunden endet, das BB-Parsing "nie" endet und Ihr Status auf "geparst" gesetzt ist? Oder sollte es "teilweise geparst" und nur "vollständig geparst" werden, wenn sowohl AA als auch BB getan werden? –

+0

Guter Punkt. In den meisten Fällen (99,99 ‰) möchten beide Threads den Status als geparst aktualisieren. Nicht zu viel Unterschied. AA sind Zahlungen .. BB sind Schecks. Nicht zu viel Unterschied in der Menge. Der Anfangsstatus wird empfangen und der Endstatus wird analysiert. Nein zwischen den Status. –

+0

Sie können das Schlüsselwort 'synchronize' in Java verwenden, um gleichzeitige Threads zu verarbeiten. – Jodo1992

Antwort

8

Teil 1 - Ihr Problem - wie ich es sehe.

Der Hauptgrund für diese Ausnahme ist, dass Sie Hibernate mit möglicherweise optimistisch Sperren verwenden. Dies bedeutet im Wesentlichen, dass entweder der Thread T1 oder der Thread T2 den Status bereits auf PARSED aktualisiert hat und der andere Thread nun die alte Version der Zeile mit einer kleineren Version als die in der Datenbank gehaltene Version hält und versucht, den Status ebenfalls auf PARSED zu aktualisieren .

Die große Frage ist hier "Sind die beiden Threads versucht, die gleichen Daten zu erhalten?". Wenn die Antwort auf diese Frage ja ist, sollte selbst dann, wenn die letzte Aktualisierung erfolgreich ist, kein Problem auftreten, da sie schließlich die Zeile in denselben Zustand aktualisieren. In diesem Fall brauchen Sie das Optimistische Sperren nicht wirklich, da Ihre Daten in jedem Fall synchron sind.

Das Hauptproblem tritt auf, wenn der Status auf RECIEVED gesetzt ist, wenn die beiden Threads T1 und T2 beim Zurücksetzen auf den nächsten Status tatsächlich voneinander abhängig sind. In diesem Fall müssen Sie sicherstellen, dass T2, wenn T1 zuerst ausgeführt wurde (oder umgekehrt), die Daten für die aktualisierte Zeile aktualisieren und die Änderungen basierend auf den bereits von T1 ausgelösten Änderungen erneut anwenden muss. In diesem Fall ist die Lösung die folgende.Wenn staleObjectException auftritt, müssen Sie Ihre Daten grundsätzlich von der Datenbank aktualisieren und den Vorgang neu starten.

Teil 2 Analyse auf den Link gepostetPossible hibernate exceptions when two threads update the same Object? Ansatz 1, das ist mehr oder weniger die letzten Siege Situation zu aktualisieren. Es vermeidet mehr oder weniger das optimistische Sperren (die Versionszählung). Falls Sie keine Abhängigkeit von T1 zu T2 oder umgekehrt haben, um den Status PARSED zu setzen. Das sollte gut sein.

**** Aproach 2 ** Optimistische Verriegelung ** Dies ist, was Sie jetzt haben. Die Lösung besteht darin, die Daten zu aktualisieren und den Vorgang neu zu starten.

Approach 3 Zeilenebene DB-Sperre Die Lösung ist hier mehr oder weniger die gleiche wie für Ansatz 2 mit der kleinen Korrektur, dass die Pessimistic lock dure. Der Hauptunterschied besteht darin, dass es in diesem Fall eine READ-Sperre sein kann und Sie möglicherweise nicht einmal in der Lage sind, die Daten aus der Datenbank zu lesen, um sie zu aktualisieren, wenn es PESSIMISTIC READ ist.

Approach 4 Synchronisation auf Anwendungsebene Es gibt viele verschiedene Möglichkeiten zur Synchronisation. Ein Beispiel wäre es, alle Ihre Aktualisierungen in einer BlockingQueue- oder JMS-Warteschlange anzuordnen (wenn Sie möchten, dass sie persistent ist) und alle Aktualisierungen von einem einzigen Thread zu übertragen. Um es zu visualisieren, setzen T1 und T2 Elemente auf die Warteschlange und es wird eine einzelne T3-Thread-Leseoperation geben und sie zum Datenbankserver schieben.

Wenn Sie die Synchronisation auf Anwendungsebene verwenden, sollten Sie beachten, dass bei einer Bereitstellung mit mehreren Servern nicht alle Strukturen verteilt werden können.

Nun kann ich mir nichts anderes für jetzt :)

+1

"Versuchen die zwei Threads, die gleichen Daten zu erhalten?". gleiche Daten oder nicht, aber im Falle, dass der Thread eine Ausnahmebearbeitung erhält, bevor der Status verloren geht. Entschuldigung, ich habe es nicht so gut erklärt. Wird die Frage verbessern. –

+0

Ja Ich denke, aktualisieren Sie den Objektstatus im Ausnahmefall oder sogar in allen Fällen mit Synchronisierung ist in Ordnung. Beide Threads müssen dieselbe Funktion aufrufen. –

+0

Nun, es hängt von Ihren Bedürfnissen ab. Es ist vollkommen in Ordnung, sie zu synchronisieren und auf einander zu warten, da es in Ordnung ist, sie einfach anzuordnen. Für einen allgemeinen Gebrauch ist es wirklich egal. Es wird von Bedeutung sein, wenn Sie mit riesigen Datenmengen und massivem Parallelismus zu tun haben. Mein Rat ist, die Simple Paria zu benutzen, die möglich ist. Auch das sauberste und lesbarste. Leistung ist immer sekundär. –

2

Angenommen, dass jeder Thread T1, T2 verschiedene Teile der Datei analysiert, heißt das, dass niemand das andere Thread-Parsing außer Kraft setzt. Das Beste ist, entkoppeln Ihre Parsing-Prozess von der DB-Commit.

T1, T2 wird das Parsen T3 oder Haupt-Thread wird das Commit tun, nachdem beide T1, T2 fertig ist. und ich denke, in diesem Ansatz ist es korrekter, den Dateistatus nur dann auf Parsed zu ändern, wenn beide Threads beendet sind.

Sie von T3 als CommitService Klasse denken können, die bis T1, T2 finsih warten und dann verpflichten DB

CountDownLatch ein hilfreiches Werkzeug, es zu tun ist. und hier ist ein Example

+0

Ich wünschte, ich könnte solche Lösung denken. Ich werde es versuchen. Halte dich auf dem Laufenden. –

+0

Wenn Sie Hilfe bei der Implementierung brauchen, kann ich Ihnen helfen, lassen Sie es mich wissen –

+0

Gleiche Gefühle hier, UnGlücklicherweise wird es von einem anderen Team gemacht. –

3

Ich bin nicht sicher, ich verstehe die Frage, aber es scheint es einen Logikfehler für einen Thread T1 darstellen würde, die nur die Verarbeitung ist, für Beispiel: Datensätze, die mit AA beginnen, um die gesamte Datei als "Parsed" zu markieren? Was passiert, wenn Ihre Anwendung beispielsweise nach T1-Aktualisierungen abstürzt, während T2 jedoch BB-Datensätze verarbeitet? Einige BB-Datensätze sind wahrscheinlich verloren, richtig?

Wie auch immer, der Kern des Problems ist, dass Sie eine Race Condition mit zwei Threads haben, die das gleiche Objekt aktualisieren. Die Ausnahme für veraltete Objekte bedeutet nur, dass einer Ihrer Threads das Rennen verloren hat. Eine bessere Lösung vermeidet ein Rennen vollständig.

(Ich gehe hier davon aus, dass die individuelle Datensatzverarbeitung idempotent ist, wenn das nicht der Fall ist, haben Sie größere Probleme, da einige Fehlermodi zu einer erneuten Verarbeitung von Datensätzen führen. Wenn die Datensatzverarbeitung nur einmal ausgeführt werden muss einmal, dann haben Sie ein schwierigeres Problem, für das eine Nachrichtenwarteschlange wahrscheinlich eine bessere Lösung wäre.)

Ich würde die Funktionalität von java.util.concurrent verwenden, um Datensätze an Thread-Worker zu verteilen, und mit dem Thread interagieren zu lassen Ruhezustand blockieren, bis alle Datensätze verarbeitet wurden. Zu diesem Zeitpunkt kann dieser Thread die Datei als "Parsed" markieren.

Zum Beispiel

// do something like this during initialization, or use a Guava LoadingCache... 
Map<RecordType, Executor> executors = new HashMap<>(); 
// note I'm assuming RecordType looks like an enum 
executors.put(RecordType.AA_RECORD, Executors.newSingleThreadExecutor()); 

dann, wie Sie die Datei bearbeiten, versenden Sie jeden Datensatz wie folgt, um eine Liste von Futures auf den Status der anstehenden Aufgaben entsprechenden Aufbau.Nehmen wir an, erfolgreich verarbeiten ein Datensatz einen boolean „true“ zurückgibt:

List<Future<Boolean>> tasks = new ArrayList<>(); 
for (Record record: file.getRecords()) { 
    Executor executorForRecord = executors.get(record.getRecordType()); 
    tasks.add(executor.submit(new RecordProcessor(record))); 
} 

Jetzt warten, bis alle Aufgaben erfolgreich abgeschlossen - es gibt elegantere Wege, dies zu tun, vor allem mit Guava. Beachten Sie, dass Sie hier auch mit ExecutionException umgehen müssen, wenn Ihre Aufgabe mit einer Ausnahme fehlgeschlagen ist. Ich beschönige das hier.

boolean allSuccess = true; 
for (Future<Boolean> task: tasks) { 
    allSuccess = allSuccess && task.get(); 
    if (!allSuccess) break; 
} 

// if all your tasks completed successfully, update the file record 
if (allSuccess) { 
    file.setStatus("Parsed"); 
} 
+0

Ich füge hinzu - es gibt wirklich keinen Grund, einen SingleThreadExecutor für jeden Datensatztyp zu verwenden. Eine bessere Lösung würde nur einen Thread-Pool-Executor verwenden. Ich habe versucht, der ursprünglichen Lösung näher zu bleiben. Wenn Sie einen einzelnen Executor verwenden, können Sie auch einen ExecutorCompletionService verwenden, was Vorteile gegenüber einer einfachen Iteration durch die Futures bietet, wie ich es hier getan habe. – thewmo

+0

Danke, ich werde es mir anschauen. –

Verwandte Themen