Ich habe einige Hierarchie Kafka-Kanal, den ich in meinem Projekt bin mit:Scala Trait Typenkonflikt
Mein Base Merkmal ist:
trait SendChannel[A, B] extends CommunicationChannel {
def send(data:A): B
}
Jetzt habe ich eine gemeinsame kafka Kanal
trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
override def close(): Unit = kafkaProducer.close()
}
wie
senden
Jetzt gibt es 2 Varianten von CommanKafkaSendChannel, man ist mit Rückruf und man ist mit Zukunft:
trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}
KafkaSendChannelWithCallback
Definition:
object KafkaSendChannelWithCallback {
def apply[A, B](oChannelProps: KafkaSendChannelProperties,
oKafkaProducer: Producer[String, B],
oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
override val channelProps: KafkaSendChannelProperties = oChannelProps
override val kafkaProducer: Producer[String, B] = oKafkaProducer
override val callback: Callback = oCallback
}
}
trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {
val callback: Callback
override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}
nun auf dem Konfigurationswert basierend ich die richtige Art des Kanals auf der Laufzeit wie unten auswählen. Ich schaffe Schauspieler mit richtigen Art von Kanal, der die Daten an kafka senden:
val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
error => {
logger.error("Exception while instantiating the KafkaSendChannel")
throw error
},
success => success
)
actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)
Schauspieler Definition:
object IngestionRouterActor {
def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}
Das Problem ist, wenn ich KafkaSendChannelWithCallback
meinen Code verwenden kompiliert jedoch richtig, wenn ich KafkaSendChannelWithFuture
verwenden es gibt mir unten Fehler auf actor =
Erklärung:
[Fehler] IngestionActor.scala: 32: Mustertyp ist mit der erwarteten Art unvereinbar; [Fehler] gefunden: KafkaSendChannelWithFuture [String, V] [Fehler] benötigt: SendChannel [V, Einheit]
sowohl als die Kanaldefinitionen von SendChannel
ausgefahren sind, sollte dieser Code ohne Fehler kompiliert. Ich bin mir nicht sicher, warum es nicht kompiliert wird. Danke
Hallo @chunjef, vielen Dank für Ihre Antwort. Du hast 'da Any ist ein Supertyp von sowohl Unit als auch Future' erwähnt, der versucht,' Any' ist Supertyp, aber warum 'weder SendChannel [V, Unit] noch SendChannel [V, Future [RecordMetadata]] vom Typ SendChannel [V, Irgendein] 'das ist falsch? – Explorer
Ich habe die Änderung, die Sie vorgeschlagen haben, vorgenommen, ist eine Änderung in den 'Requisiten' erforderlich, die immer noch 'SendChannel [V, Unit]' haben, weil ich immer noch denselben Fehler erhalte. – Explorer
@Explorer: Bitte lesen Sie meine Antwort genauer. Ich adressiere beide Ihrer Kommentare in der Antwort. – chunjef