2017-02-21 3 views
0

Ich versuche, einen Message Broker zu implementieren, der mit Lagom 1.2.2 eingerichtet wurde und in eine Wand gerannt ist. Die Dokumentation hat das folgende Beispiel für den Service Descriptor:Vollständiges Beispiel für Message Broker in Lagom

default Descriptor descriptor() { 
return named("helloservice").withCalls(...) 
    // here we declare the topic(s) this service will publish to 
    .publishing(
    topic("greetings", this::greetingsTopic) 
) 
    ....; 
} 

Und dieses Beispiel für die Umsetzung:

public Topic<GreetingMessage> greetingsTopic() { 
return TopicProducer.singleStreamWithOffset(offset -> { 
    return persistentEntityRegistry 
     .eventStream(HelloEventTag.INSTANCE, offset) 
     .map(this::convertEvent); 
    }); 
} 

Allerdings gibt es kein Beispiel dafür, was die Art Argument oder Rückgabetyp der convertEvent() Funktion ist und hier zeichne ich eine leere Stelle. Am anderen Ende der Teilnehmer an den Message, scheint es, dass es GreetingMessage Objekte ist aufwendig, aber wenn ich eine Funktion convertEvent erstellen GreetingMessage Objekte zurück, erhalte ich einen Übersetzungsfehler:

Error:(61, 21) java: method map in class akka.stream.javadsl.Source<Out,Mat> cannot be applied to given types; 
    required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 
    found: this::convertEvent 
    reason: cannot infer type-variable(s) T 
    (argument mismatch; invalid method reference 
    incompatible types: akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset> cannot be converted to com.example.GreetingMessage) 

es weitere gründlichere Beispiele wie man das benutzt? Ich habe die Chirper-Beispiel-App bereits eingecheckt und es scheint kein Beispiel dafür zu geben.

Danke!

Antwort

3

Die Fehlermeldung, Sie klebte sagt genau das, was map erwartet:

required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 

Also, müssen Sie eine Funktion übergeben, die Pair<GreetingEvent, Offset> nimmt. Was sollte die Funktion zurückgeben? Nun, update es, um das zu nehmen, und dann wirst du den nächsten Fehler bekommen, der dir wieder sagen wird, was du erwartest, zurück zu kommen, und in diesem Fall wirst du es finden Pair<GreetingMessage, Offset>.

Um zu erklären, was diese Typen sind - Lagom muss verfolgen, welche Ereignisse in Kafka veröffentlicht wurden, so dass beim Neustart eines Dienstes nicht am Anfang des Ereignisprotokolls begonnen wird und alle Ereignisse erneut veröffentlicht werden Anfang der Zeit wieder. Dies geschieht durch Verwendung von Offsets. Daher erzeugt das Ereignisprotokoll Paare von Ereignissen und Offsets, und dann müssen Sie diese Ereignisse in die Nachrichten umwandeln, die in Kafka veröffentlicht werden, und wenn Sie die umgewandelte Nachricht an Lagom zurückgegeben haben, muss es ein Paar mit dem Offset sein das Sie aus dem Ereignisprotokoll erhalten haben, so dass Lagom nach der Veröffentlichung in Kafka den Offset beibehalten und diesen beim nächsten Neustart des Dienstes als Ausgangspunkt verwenden kann. https://github.com/lagom/online-auction-java/blob/a32e696/bidding-impl/src/main/java/com/example/auction/bidding/impl/BiddingServiceImpl.java#L91

+0

Dank;:

kann ein vollständiges Beispiel hier zu sehen Ich habe über die von Ihnen zur Verfügung gestellte Auktionsbeispiel-Anwendung geforscht und es war hilfreich (obwohl es die 'taggedStreamWithOffset' anstelle von' singleStreamWithOffset' verwendet) (die, um ehrlich zu sein, möglicherweise das ist was ich will) Der Teil I Es fehlte das Argument "convertEvent", aus Gründen, dass ich davon ausgehe, dass ich den Rückgabetyp vermasselt habe, auch wenn ich den Rückgabetyp korrekt hatte und das mich in die Irre führte. Nochmals vielen Dank für die Hilfe! –

Verwandte Themen