2017-09-25 4 views
1

Ich habe einige Hierarchie Kafka-Kanal, den ich in meinem Projekt bin mit:Scala Trait Typenkonflikt

Mein Base Merkmal ist:

trait SendChannel[A, B] extends CommunicationChannel { 
    def send(data:A): B 
} 

Jetzt habe ich eine gemeinsame kafka Kanal

trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] { 
val channelProps: KafkaSendChannelProperties 
val kafkaProducer: Producer[String, B] 
override def close(): Unit = kafkaProducer.close() 
} 
wie

senden

Jetzt gibt es 2 Varianten von CommanKafkaSendChannel, man ist mit Rückruf und man ist mit Zukunft:

trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] { 
override def send(data: A): Future[RecordMetadata] = Future { 
    kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get 
} 
} 

KafkaSendChannelWithCallback Definition:

object KafkaSendChannelWithCallback { 

def apply[A, B](oChannelProps: KafkaSendChannelProperties, 
       oKafkaProducer: Producer[String, B], 
       oCallback: Callback): KafkaSendChannelWithCallback[A, B] = 
new KafkaSendChannelWithCallback[A,B] { 
    override val channelProps: KafkaSendChannelProperties = oChannelProps 
    override val kafkaProducer: Producer[String, B] = oKafkaProducer 
    override val callback: Callback = oCallback 
} 
} 

trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] { 

    val callback: Callback 

override def send(data: A): Unit = 
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback) 
} 

nun auf dem Konfigurationswert basierend ich die richtige Art des Kanals auf der Laufzeit wie unten auswählen. Ich schaffe Schauspieler mit richtigen Art von Kanal, der die Daten an kafka senden:

val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
    error => { 
     logger.error("Exception while instantiating the KafkaSendChannel") 
     throw error 
    }, 
    success => success 
) 

actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME) 

Schauspieler Definition:

object IngestionRouterActor { 
    def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props = 
Props(classOf[IngestionActor[V]], config, sendChannel) 
} 

Das Problem ist, wenn ich KafkaSendChannelWithCallback meinen Code verwenden kompiliert jedoch richtig, wenn ich KafkaSendChannelWithFuture verwenden es gibt mir unten Fehler auf actor = Erklärung:

[Fehler] IngestionActor.scala: 32: Mustertyp ist mit der erwarteten Art unvereinbar; [Fehler] gefunden: KafkaSendChannelWithFuture [String, V] [Fehler] benötigt: SendChannel [V, Einheit]

sowohl als die Kanaldefinitionen von SendChannel ausgefahren sind, sollte dieser Code ohne Fehler kompiliert. Ich bin mir nicht sicher, warum es nicht kompiliert wird. Danke

Antwort

1

Die Props für IngestionActor dauert SendChannel[V, Unit]. Passing in einem KafkaSendChannelWithCallback zu diesem Argument funktioniert, weil es ein SendChannel[V, Unit] ist.

Auf der anderen Seite ist KafkaSendChannelWithFuture ein SendChannel[V, Future[RecordMetadata]]. A SendChannel[V, Future[RecordMetadata]] ist nicht ein SendChannel[V, Unit].

Eine Möglichkeit ist es, die Props zu nehmen SendChannel[V, Any], neu zu definieren, da Any ein übergeordneter Typ von beiden Unit und Future ist:

def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ??? 

An diesem Punkt ist der Compiler noch unglücklich, weil SendChannel, ein generischer Typ ist, , ist standardmäßig invariant. Mit anderen Worten, weder SendChannel[V, Unit] noch SendChannel[V, Future[RecordMetadata]] sind vom Typ SendChannel[V, Any].

, das zu ändern, machen SendChannel covariant auf den zweiten Typ-Parametern (durch ein + vor den B Zugabe):

trait SendChannel[A, +B] extends CommunicationChannel { 
    def send(data: A): B 
} 
+0

Hallo @chunjef, vielen Dank für Ihre Antwort. Du hast 'da Any ist ein Supertyp von sowohl Unit als auch Future' erwähnt, der versucht,' Any' ist Supertyp, aber warum 'weder SendChannel [V, Unit] noch SendChannel [V, Future [RecordMetadata]] vom Typ SendChannel [V, Irgendein] 'das ist falsch? – Explorer

+0

Ich habe die Änderung, die Sie vorgeschlagen haben, vorgenommen, ist eine Änderung in den 'Requisiten' erforderlich, die immer noch 'SendChannel [V, Unit]' haben, weil ich immer noch denselben Fehler erhalte. – Explorer

+0

@Explorer: Bitte lesen Sie meine Antwort genauer. Ich adressiere beide Ihrer Kommentare in der Antwort. – chunjef

Verwandte Themen