Ich verwende Confluent.Kafka Dotnet-Client.Kafka Consumer Commit Thread-Sicherheit
namespace Confluent.Kafka
{
public class Consumer<TKey, TValue> : IDisposable
{
public Task<CommittedOffsets> CommitAsync();
}
}
Wie Sie sehen, ist Consumer.CommitAsync eine asynchrone Methode. Ist es sicher, die Methode CommitAsync
aufzurufen, ohne ihre Antwort abzuwarten und dann den nächsten Anruf an Subscribe
zu tätigen?
Beispielcode unten.
using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer()))
{
consumer.Subscribe(topics);
while (true)
{
Message<MessageKey, byte[]> msg;
if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
{
// ...
if(msg.Offset % 100 == 0)
{
consumer.CommitAsync().ContinueWith((t) =>
{
// log t.Exception
}, TaskContinuationOptions.OnlyOnFaulted);
}
}
}
}