2017-03-22 4 views
0

iam Entwicklung reaktive Kafka in unserem Spiel Scala-Projekt, in dem Projekt, das wir 5 Thema erstellt, die von Verbrauchergruppe abonniert sind und gut funktioniert, jetzt ist das Problem, ich schuf ein neues Thema, wie kann ich dieses Thema in die bestehenden Verbrauchergruppe hinzufügen (ist es möglich) mein Code:Hinzufügen von neu erstellen Thema zum Abonnement in reaktiven kafka

val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(bootStrapServer) 
     .withGroupId(groupId).withPollInterval(100 millis) 

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicList)) 
      .groupedWithin(10, 15 seconds) 
      .map({ 
       group => 
       var offSetBatch = CommittableOffsetBatch.empty 
       val sessionList = group.toList.map { eachItem => 
        offSetBatch = offSetBatch.updated(eachItem.committableOffset) 
        Json.parse(eachItem.record.value()).as[cityModel] 
       } 
       processRecords(cityList) 
       offSetBatch 
      }).mapAsync(1)(_.commitScaladsl()) 
      .toMat(Sink.ignore)(Keep.both) 
      .run() 

ist es eine Möglichkeit, dass ich

Antwort

0

in Verbraucher Schaffung Thema Verbraucher hinzufügen können wir Thema Muster geben kann i: e Subscriptions.topicPattern ("* .in"), das das Problem löst

Verwandte Themen