Ich versuche, einen Proof of Concept, einfache Kafka Produzent und Verbraucher zu machen. Die Plattform, die ich benutze, ist .Net Core 2.0, gebaut von Visual Studio 2017. Das Nuget-Paket, das ich benutze, ist Confluent.Kafka.Net Core Kafka ProduceAsync wird nicht abgeschlossen
Ich habe Code durch Forschung gefunden, die den besten Erfolg gehabt hat, aber jetzt Es scheint Probleme beim Implementieren einer .ProduceAsync() -Methode zu haben. Es werden keine Fehlercodes angezeigt, und das Programm scheint weiterhin ausgeführt zu werden, führt die Methode jedoch nicht aus.
Hier ist mein Produzent:
class Producer
{
static void Main(string[] args)
{
Console.WriteLine("PRODUCER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the producer configuration
var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
// Create the producer
using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
{
Console.WriteLine("Producer Created!");
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
Console.WriteLine($"Begin Event {i}");
var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
Console.WriteLine("Press any key to continue...");
Console.ReadLine();
}
}
}
}
Und hier ist mein Verbraucher:
class Consumer
{
static void Main(string[] args)
{
Console.WriteLine("CONSUMER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the consumer configuration
var consumerConfig = new Dictionary<string, object>
{
{ "group.id", "myconsumer" },
{ "bootstrap.servers", kafkaEndpoint },
};
// Create the consumer
using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
// Subscribe to the OnMessage event
consumer.OnMessage += (obj, msg) =>
{
Console.WriteLine($"Received: {msg.Value}");
};
// Subscribe to the Kafka topic
consumer.Subscribe(new List<string>() { kafkaTopic });
// Handle Cancel Keypress
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
// Poll for messages
while (!cancelled)
{
consumer.Poll(3000);
//consumer.Poll();
}
}
}
}
Wie ich den Code ausführen, sitzt der Verbraucher im Leerlauf, da es scheint, dass keine Nachrichten produziert werden, aber hier ist meine Konsolen-Ausgabe für den Producer:
(Und das ist es. Console sitzt auch im Leerlauf a t dieser Punkt.)
Die Frage ist: Wie kann dies behoben werden, so dass die Nachrichten tatsächlich produziert werden, und damit der Verbraucher sie erhalten kann?