2017-10-30 3 views
0

Ich versuche einen Kafka-Produzenten und -Konsumenten zu machen, aber mein Projekt ist in dotnet Core 2.0 und es scheint nicht gut mit Kafka zu funktionieren. Dies ist der Proof-of-Concept, den ich versucht habe. Ich bin mit Visual Studio 2017 mit dem kafka-net nuget Paket:unterstützt kafka dotnet core?

mit

using KafkaNet; 
using KafkaNet.Model; 
using KafkaNet.Protocol; 

Produzenten

static void Main(string[] args) 
{ 
    string payload = "Welcome to Kafka!"; 
    string topic = "IDGTestTopic"; 
    Message msg = new Message(payload); 
    Uri uri = new Uri("localhost:9092"); 
    var options = new KafkaOptions(uri); 
    var router = new BrokerRouter(options); 
    var client = new Producer(router); 
    client.SendMessageAsync(topic, new List<Message> { msg }).Wait(); 
    Console.ReadLine(); 
} 

Verbraucher

static void Main(string[] args) 
{ 
    string topic = "IDGTestTopic"; 
    Uri uri = new Uri("http://localhost:9092"); 
    var options = new KafkaOptions(uri); 
    var router = new BrokerRouter(options); 
    var consumer = new Consumer(new ConsumerOptions(topic, router)); 
    foreach (var message in consumer.Consume()) 
    { 
     Console.WriteLine(Encoding.UTF8.GetString(message.Value)); 
    } 
    Console.ReadLine(); 
} 

Wenn ich versuche, den Hersteller zu laufen Zuerst erhalte ich eine Fehlermeldung auf dem BrokerRouter:

$exception {System.ArgumentOutOfRangeException: Specified argument was out of the range of valid values. 
Parameter name: port 
at System.Net.IPEndPoint..ctor(IPAddress address, Int32 port) 
at KafkaNet.DefaultKafkaConnectionFactory.Resolve(Uri kafkaAddress, IKafkaLog log) 
at KafkaNet.Model.KafkaOptions.<get_KafkaServerEndpoints>d__0.MoveNext() 
at KafkaNet.BrokerRouter..ctor(KafkaOptions kafkaOptions) 
at SampleKafkaProducer.Program.Main(String[] args) in C:\v4target\SampleKafka\SampleKafkaProducer\SampleKafkaProducer\Program.cs:line 18} System.ArgumentOutOfRangeException 

Wie ist ein Port von 9092 außerhalb des Bereichs? Meine Visual Studio-Projekte werden auf Ports in den 55000 ausgeführt. Mehrere Quellen, die ich erforscht habe, verwenden 9092 als Kafka-Port.

Hat jemand die Fehlermeldung verstanden? Teil des Hauptproblems, weil ich eine Version von Kafka benutze, die nicht mit dotnet core kompatibel ist?

+0

Bitte geben Sie die gesamte Stacktrace von Ausnahme veröffentlichen, die Sie zumindest sagen, wird die Codezeile, die die Ausnahme verursacht -, dass die Informationen von unschätzbarem Wert ist. – nos

+0

Danke @nos. Ich habe die exakte Stack-Spur gezogen, aber sie führt zum Port. Der Fehler selbst beginnt bei BrokerRouter(), der KafkaOptions übernimmt, aber ich kann nicht verstehen, was genau das Problem hier ist. –

Antwort

1

Das Problem ist mit der URI.

Uri uri = neuer Uri ("localhost: 9092");

Wenn Sie den uri.Port ausdrucken, ist es -1. Daher die ArgumentOutOfRangeException.

Versuchen Sie stattdessen:

Uri uri = new Uri("http://localhost:9092"); 

Vom KafkaNet Repository. Dies ist, wie sie Setup die URI:

var options = new KafkaOptions(new Uri("http://CSDKAFKA01:9092"), new Uri("http://CSDKAFKA02:9092")) 
{ 
    Log = new ConsoleLog() 
}; 
Verwandte Themen