2017-06-26 3 views
0

Ich verwende Confluent.Kafka Dotnet-Client.Kafka Consumer Commit Thread-Sicherheit

namespace Confluent.Kafka 
{ 
    public class Consumer<TKey, TValue> : IDisposable 
    { 
     public Task<CommittedOffsets> CommitAsync(); 
    } 
} 

Wie Sie sehen, ist Consumer.CommitAsync eine asynchrone Methode. Ist es sicher, die Methode CommitAsync aufzurufen, ohne ihre Antwort abzuwarten und dann den nächsten Anruf an Subscribe zu tätigen?

Beispielcode unten.

using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer())) 
{ 
       consumer.Subscribe(topics); 

       while (true) 
       { 
        Message<MessageKey, byte[]> msg; 
        if (consumer.Consume(out msg, TimeSpan.FromSeconds(1))) 
        { 
         // ... 

         if(msg.Offset % 100 == 0) 
         { 
          consumer.CommitAsync().ContinueWith((t) => 
          { 
           // log t.Exception 
          }, TaskContinuationOptions.OnlyOnFaulted); 
         } 
        } 
       } 
} 

Antwort

0

Ich nehme an, Sie nächsten Aufruf zu Verbrauchen

Ja, es ist sicher, kein Problem damit sagen wollte. Ich würde auch ein Zeitfenster für das Commit hinzufügen (was zuerst zwischen 5s und 100msgs zum Beispiel kommt), so dass, wenn Sie keine Nachrichten für einige Zeit erhalten Sie immer noch commit sie