2017-11-02 6 views
0

Ich versuche, einen Proof of Concept, einfache Kafka Produzent und Verbraucher zu machen. Die Plattform, die ich benutze, ist .Net Core 2.0, gebaut von Visual Studio 2017. Das Nuget-Paket, das ich benutze, ist Confluent.Kafka.Net Core Kafka ProduceAsync wird nicht abgeschlossen

Ich habe Code durch Forschung gefunden, die den besten Erfolg gehabt hat, aber jetzt Es scheint Probleme beim Implementieren einer .ProduceAsync() -Methode zu haben. Es werden keine Fehlercodes angezeigt, und das Programm scheint weiterhin ausgeführt zu werden, führt die Methode jedoch nicht aus.

Hier ist mein Produzent:

class Producer 
    { 
     static void Main(string[] args) 
     { 
      Console.WriteLine("PRODUCER!!!"); 

      // The Kafka endpoint address 
      string kafkaEndpoint = "127.0.0.1:9092"; 

      // The Kafka topic we'll be using 
      string kafkaTopic = "testtopic"; 

      // Create the producer configuration 
      var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } }; 

      // Create the producer 
      using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8))) 
      { 
       Console.WriteLine("Producer Created!"); 
       // Send 10 messages to the topic 
       for (int i = 0; i < 10; i++) 
       { 
        var message = $"Event {i}"; 
        Console.WriteLine($"Begin Event {i}"); 
        var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult(); 
        Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}"); 
        Console.WriteLine("Press any key to continue..."); 
        Console.ReadLine(); 
       } 
      } 
     } 
    } 

Und hier ist mein Verbraucher:

class Consumer 
    { 
     static void Main(string[] args) 
     { 
      Console.WriteLine("CONSUMER!!!"); 

      // The Kafka endpoint address 
      string kafkaEndpoint = "127.0.0.1:9092"; 

      // The Kafka topic we'll be using 
      string kafkaTopic = "testtopic"; 

      // Create the consumer configuration 
      var consumerConfig = new Dictionary<string, object> 
      { 
       { "group.id", "myconsumer" }, 
       { "bootstrap.servers", kafkaEndpoint }, 
      }; 

      // Create the consumer 
      using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8))) 
      { 
       // Subscribe to the OnMessage event 
       consumer.OnMessage += (obj, msg) => 
       { 
        Console.WriteLine($"Received: {msg.Value}"); 
       }; 

       // Subscribe to the Kafka topic 
       consumer.Subscribe(new List<string>() { kafkaTopic }); 

       // Handle Cancel Keypress 
       var cancelled = false; 
       Console.CancelKeyPress += (_, e) => 
       { 
        e.Cancel = true; // prevent the process from terminating. 
        cancelled = true; 
       }; 

       Console.WriteLine("Ctrl-C to exit."); 

       // Poll for messages 
       while (!cancelled) 
       { 
        consumer.Poll(3000); 
        //consumer.Poll(); 
       } 
      } 
     } 
    } 

Wie ich den Code ausführen, sitzt der Verbraucher im Leerlauf, da es scheint, dass keine Nachrichten produziert werden, aber hier ist meine Konsolen-Ausgabe für den Producer:

(Und das ist es. Console sitzt auch im Leerlauf a t dieser Punkt.)

Die Frage ist: Wie kann dies behoben werden, so dass die Nachrichten tatsächlich produziert werden, und damit der Verbraucher sie erhalten kann?

Antwort

1

Sie können erzwingen, um das Ergebnis zu erhalten, indem Sie die Zeile in Produzenten Wechsel zu:

var result = producer.ProduceAsync(kafkaTopic, null, message).Result; 

das Ergebnis Inhalte überprüfen zu finden, was falsch gelaufen ist.

Verwandte Themen