2017-01-28 1 views
1

Ich versuche, einen einfachen Prozess mit Spark Streaming einzurichten, Apache Bahir verwenden, um eine Verbindung zu Akka herzustellen. Ich habe versucht, their example zusammen mit dieser older one zu folgen. Ich habe eine einfache Spediteur SchauspielerApache Bahir, sende Zeug an ActorReceiver

class ForwarderActor extends ActorReceiver { 
    def receive = { 
    case data: MyData => store(data) 
    } 
} 

und erstelle ich einen Stream mit

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName) 

die Konfiguration sieht wie folgt aus:

akka { 
    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 
    } 
    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "localhost" 
     port = 7777 
    } 
    } 
} 

und mein Problem ist: Wie verschicke ich Nachrichten an der Forwarder Schauspieler? Vielleicht verstehe ich nicht, wie Akka Remote in diesem Fall verwendet wird. Wenn die App gestartet wird, sehe ich ein Protokoll

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:7777] 

und später sehe ich

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:52369] 

, die auf die Beschreibung in der ScalaDoc zu erinnern scheint:

/** 
    * A default ActorSystem creator. It will use a unique system name 
    * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote 
    * communication. 
    */ 

Alles in allem I Ich bin mir nicht sicher, wie ich dem Forwarder-Schauspieler Nachrichten schicken soll. Danke für jede Hilfe!

Antwort

0

Akka-Akteure können Nachrichten an andere Akka-Akteure senden, die auf einer Remote-JVM laufen. Also ... wenn der Sender-Aktor die Adresse des beabsichtigten Empfänger-Aktors kennen muss.

AkkaUtil (Bahir) ermöglicht es Ihnen, aus den Nachrichten, die ein ReceiverActor empfängt, einen Funkenstrom zu erstellen. Aber wo wird Nachrichten empfangen? Nun ... ein entfernter Schauspieler. Und um Nachrichten zu senden, benötigt dieser Remote-Actor die Adresse Ihrer ReceiverActor, die in Ihrer Spark-Anwendung läuft.

Im Allgemeinen können Sie nicht zu sicher über die IP sein, die Ihre Spark-Anwendung ausführen wird. Also werden wir es so machen, dass der Schauspieler, der mit dem Funken läuft, dem Produzentenschauspieler seine Referenz erzählt und sie auffordert, seine Sachen zu senden.

Stellen Sie nur sicher, dass beide Anwendungen mit der gleichen Version von Scala geschrieben sind und dieselbe JRE ausführen.

Nun ... lässt zunächst den Schauspieler schreiben, die die Datenquelle sein,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 

case class SendMeYourStringsRequest(requesterRef: ActorRef) 
case class RequestedString(s: String) 

class MyActor extends Actor with ActorLogging { 

    val theListOfMyStrings = List("one", "two", "three") 

    override def receive: Receive = { 
    case SendMeYourStringsRequest(requesterRef) => { 
     theListOfMyStrings.foreach(s => { 
     requesterRef ! RequestedString(s) 
     }) 
    } 
    } 
} 

object MyApplication extends App { 

    val config = ConfigFactory.parseString(
    """ 
     |akka{ 
     | actor { 
     | provider = remote 
     | } 
     | remote { 
     | enabled-transports = ["akka.remote.netty.tcp"] 
     | untrusted-mode = off 
     | netty.tcp { 
     |  hostname="my-ip-address" 
     |  port=18000 
     | } 
     | } 
     |} 
    """.stripMargin 
) 

    val actorSystem = ActorSystem("my-actor-system", config) 

    var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor") 

} 

Jetzt ... können unsere einfache Funken App schreiben,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} 

case class SendMeYourStringsRequest(requesterRef: ActorRef) 
case class RequestedString(s: String) 

class YourStringRequesterActor extends ActorReceiver { 
    def receive = { 
    case RequestedString(s) => store(s) 
    } 

    override def preStart(): Unit = { 
    val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor") 
    val myActorSelection = context.actorSelection(myActorPath) 

    myActorSelection ! SendMeYourStringsRequest(self) 
    } 
} 

object YourSparkApp { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("ActorWordCount") 

    if (!sparkConf.contains("spark.master")) { 
     sparkConf.setMaster("local[2]") 
    } 

    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    val stringStream = AkkaUtils.createStream[String](
     ssc, 
     Props(classOf[YourStringRequesterActor]), 
     "your-string-requester-actor" 
    ) 

    stringStream.foreach(println) 

    } 
} 

Hinweis :: Nehmen Sie einfach Pflege von my-ip-address. Wenn es irgendwelche anderen Probleme gibt, lass es mich in Kommentaren wissen.

+0

@SaSarvesh Kumar Singh Ich bin froh, dass ich das gefunden habe. Ich habe versucht, das gleiche Problem zu beheben. Hier habe ich deinen Code ausprobiert. Wenn Sie 'actorSselection' verwenden, sollten Sie sich dieses Problem ansehen, wenn Sie' actorSclection' verwenden: http://stackoverflow.com/questions/46724732/actorsystem-actorselection-is-not-working-for-remote-actors-where-actorof-is-wor Sie könnten dieses Problem überprüfen? – Mahesh