iam Entwicklung reaktive Kafka in unserem Spiel Scala-Projekt, in dem Projekt, das wir 5 Thema erstellt, die von Verbrauchergruppe abonniert sind und gut funktioniert, jetzt ist das Problem, ich schuf ein neues Thema, wie kann ich dieses Thema in die bestehenden Verbrauchergruppe hinzufügen (ist es möglich) mein Code:Hinzufügen von neu erstellen Thema zum Abonnement in reaktiven kafka
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootStrapServer)
.withGroupId(groupId).withPollInterval(100 millis)
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicList))
.groupedWithin(10, 15 seconds)
.map({
group =>
var offSetBatch = CommittableOffsetBatch.empty
val sessionList = group.toList.map { eachItem =>
offSetBatch = offSetBatch.updated(eachItem.committableOffset)
Json.parse(eachItem.record.value()).as[cityModel]
}
processRecords(cityList)
offSetBatch
}).mapAsync(1)(_.commitScaladsl())
.toMat(Sink.ignore)(Keep.both)
.run()
ist es eine Möglichkeit, dass ich