2017-05-05 1 views
0

Ich ziehe von einem JMS-basierten System zu Kafka, das für die Synchronisierung von Transaktionen zwischen neuen und älteren Systemen zuständig ist. Jede Nachricht, die vom neuen System veröffentlicht wird, muss vom Abonnenten/Konsumenten erfolgreich verarbeitet werden. Ich muss mir keine Gedanken über die Reihenfolge der Nachrichten machen. Aufgrund einiger Designprobleme (pessimistisches Sperren) im Altsystem können einige Transaktionen bei der Ankunft der Nachricht fehlschlagen. In diesem Fall möchte ich, dass die Nachricht nach einiger Zeit zurückkommt. Ich versuche herauszufinden, wie ich mit Kafka umgehen soll.Wie kann man mit Confluence Kafka Fehler bei der Nachrichtenverarbeitung auf Kundenseite behandeln?

Meine Quell- und Zielanwendungen sind in .NET 4.6.1 und C#. Ich verwende die Confluent.Kafka v0.9.5-Client-Bibliothek. Kafka ist in der Version 0.10.

Für die Consumer-Anwendung habe ich Auto-Commit deaktiviert und explizit CommitAsync-Methode zum Festschreiben des Offsets nach einer Nachricht erfolgreich verarbeitet. So erstelle ich den Konsumenten.

var config = new Dictionary<string, object>() 
       { 
        {"group.id", GroupId}, 
        {"client.id", ClientId}, 
        {"enable.auto.commit", false}, 
        {"bootstrap.servers", _consumerConnectionConfig.BrokerUrl}, 
        { 
         "default.topic.config", new Dictionary<string, object>() 
         { 
          {"auto.offset.reset", "latest"} 
         } 
        } 
       }; 
var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)); 

So wird die Abfrage eingerichtet.

consumer.Subscribe(topics); 

while (!SubscriberCancellationToken.IsCancellationRequested) 
{ 
     consumer.Poll(TimeSpan.FromMilliseconds(1000)); 
} 

Hier wird das OnMessage Ereignis ist, das für die Verarbeitung Nachricht

consumer.OnMessage += (sender, message) => { 
    try 
    { 
    var payload = GetPayload(message); 
    if (_messageHandlerService.ProcessMessage(payload)) 
    {     
     consumer.CommitAsync(message).Wait(SubscriberCancellationToken); 
    } 
    else 
    { 
     //(case 1) Now what should I do????? 
    } 
    } 
    catch(Exception ex) 
    { 
     Log.Error("Unable to process xyz messages", ex); 
     throw; //??? (Case 2) should I throw the exception or should I not? 
    } 
}; 

Wenn die Nachricht erfolgreich verarbeitet werden können, verantwortlich ist, rufe ich commitAsync und das funktioniert wie ein Charme. Jetzt ist meine Frage, was ich tun soll, wenn ich eine Nachricht nicht verarbeiten kann (Fall 1) oder eine Ausnahme auftritt (Fall 2). Welche Möglichkeiten habe ich, mit diesen beiden Situationen umzugehen?

In der JMS-Welt, für (Fall 1), habe ich eine verzögerte Neuveröffentlichungsstrategie angewendet. Im Grunde warte ich 1 Minute und veröffentliche dann die Nachricht zum selben Thema und übertrage die aktuelle Nachricht, so dass die neu veröffentlichte Nachricht wieder zurückkommt. Aus irgendeinem Grund, wenn ich nicht erneut veröffentlichen kann, versuche ich es immer wieder, bis ich kann oder bis der Prozess neu startet. Wenn der Prozess neu gestartet wurde, bevor ich ihn erneut veröffentlichen konnte, kommen die nicht festgeschriebenen Nachrichten zurück und der Zyklus beginnt von vorne.

Sobald die erneute Veröffentlichung erfolgreich ist, werden andere Nachrichten bereits auf das Thema warten Zug fährt in einem Kreis, bis alles bearbeitet ist. Jedes Mal, wenn ich die Nachricht nicht wie in Fall 1 verarbeiten kann, protokolliere ich einen Fehler, basierend darauf werden entsprechende Warnungen generiert, so dass das Anwendungs-Support-Team einige Maßnahmen ergreifen kann, falls sie einige Daten im Legacy-System reparieren müssen. Bis dahin wird die Nachricht weiterhin fehlschlagen und dann erneut veröffentlicht.

Und für Fall 2, protokollierte ich die Details und warf eine Ausnahme in der JMS-Implementierung. Ich frage mich nun, ob ich genau wie Fall 1 behandelt werden sollte.

Jetzt ist meine Frage, wie ich mit diesen beiden Situationen in der Welt von Kafka umgehen soll? Gibt es bessere Möglichkeiten?

Antwort

1

Nur wenige Optionen

One ist es, diese Nachrichten an einem anderen Kafka Thema zu schieben und einen eigenen Verbraucher mit ihnen umgehen lassen, oder,

Retry in Ihrem Verbraucher, bis diese bestimmte Nachricht wird bearbeitet oder erreichen Sie ein gewissen Schwelle. Dies kann durch Aufrufen der pause-Methode auf Ihrem Consumer geschehen, um den Nachrichtenverbrauch vorübergehend zu stoppen (ich spreche aus einer Java-Konsumentenperspektive, bitte bestätigen Sie mit dem C# -Client), führen Sie Ihre Wiederholungslogik aus. Bitte stellen Sie sicher, dass Sie auch weiterhin Poll aufrufen (andernfalls wird der Consumer aus der Gruppe rausgeworfen und seine Partitionen werden neu ausbalanciert).Sobald Sie fertig sind versuchen Sie es erneut, rufen Sie die Fortsetzen-Methode, um die Verarbeitung fortzusetzen - jetzt die Umfrage-Methode wird Datensätze von Kafka

+0

Vielen Dank. Lassen Sie mich die Funktion "Pause und Fortsetzen" ausprobieren. Ich werde die Ergebnisse mit C# Client – Vinod

+0

veröffentlichen Entschuldigung für verzögerte Antwort. Confluent.Kafka C# -Client hat noch keine Methoden zum Anhalten und Fortsetzen. – Vinod

Verwandte Themen