2017-02-14 6 views
2

Ich habe eine Maven-Scala-Anwendung, die Spark-Standalone-Einzelknotencluster einen Spark-Job übermittelt. Wenn der Job gesendet wird, versucht die Spark-Anwendung mithilfe des Spark-Cassandra-Connectors auf cassandra zuzugreifen, das auf der Amazon EC2-Instanz gehostet wird. Die Verbindung wurde hergestellt, aber die Ergebnisse werden nicht zurückgegeben. Nach einiger Zeit wird der Connector getrennt. Es funktioniert gut, wenn ich Funke im lokalen Modus laufen lasse. Ich habe versucht, einfache Anwendung zu erstellen, und mein Code sieht wie folgt aus:Spark-Cassandra-Connector funktioniert nicht im Standalone-Spark-Cluster

val sc = SparkContextLoader.getSC 
def runSparkJob():Unit={ 
    val table =sc.cassandraTable("prosolo_logs_zj", "logevents") 
    println(table.collect().mkString("\n")) 
} 

SparkContext.scala

object SparkContextLoader {  
    val sparkConf = new SparkConf() 
    sparkConf.setMaster("spark://127.0.1.1:7077") 

    sparkConf.set("spark.cores_max","2") 
    sparkConf.set("spark.executor.memory","2g") 
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    sparkConf.setAppName("Test application") 
    sparkConf.set("spark.cassandra.connection.host", "xxx.xxx.xxx.xxx") 
    sparkConf.set("spark.cassandra.connection.port", "9042") 
    sparkConf.set("spark.ui.port","4041") 

    val oneJar="/samplesparkmaven/target/samplesparkmaven-jar.jar" 
    sparkConf.setJars(List(oneJar)) 
    @transient val sc = new SparkContext(sparkConf) 

} 

Console Ausgabe wie folgt aussieht:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
17/02/14 23:11:25 INFO SparkContext: Running Spark version 2.1.0 
17/02/14 23:11:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
17/02/14 23:11:27 WARN Utils: Your hostname, zoran-Latitude-E5420 resolves to a loopback address: 127.0.1.1; using 192.168.2.68 instead (on interface wlp2s0) 
17/02/14 23:11:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
17/02/14 23:11:27 INFO SecurityManager: Changing view acls to: zoran 
17/02/14 23:11:27 INFO SecurityManager: Changing modify acls to: zoran 
17/02/14 23:11:27 INFO SecurityManager: Changing view acls groups to: 
17/02/14 23:11:27 INFO SecurityManager: Changing modify acls groups to: 
17/02/14 23:11:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(zoran); groups with view permissions: Set(); users with modify permissions: Set(zoran); groups with modify permissions: Set() 
17/02/14 23:11:28 INFO Utils: Successfully started service 'sparkDriver' on port 33995. 
17/02/14 23:11:28 INFO SparkEnv: Registering MapOutputTracker 
17/02/14 23:11:28 INFO SparkEnv: Registering BlockManagerMaster 
17/02/14 23:11:28 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 
17/02/14 23:11:28 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 
17/02/14 23:11:28 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7b25a4cc-cb37-4332-a59b-e36fa45511cd 
17/02/14 23:11:28 INFO MemoryStore: MemoryStore started with capacity 870.9 MB 
17/02/14 23:11:28 INFO SparkEnv: Registering OutputCommitCoordinator 
17/02/14 23:11:28 INFO Utils: Successfully started service 'SparkUI' on port 4041. 
17/02/14 23:11:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.2.68:4041 
17/02/14 23:11:28 INFO SparkContext: Added JAR /samplesparkmaven/target/samplesparkmaven-jar.jar at spark://192.168.2.68:33995/jars/samplesparkmaven-jar.jar with timestamp 1487142688817 
17/02/14 23:11:28 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://127.0.1.1:7077... 
17/02/14 23:11:28 INFO TransportClientFactory: Successfully created connection to /127.0.1.1:7077 after 62 ms (0 ms spent in bootstraps) 
17/02/14 23:11:29 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20170214231129-0016 
17/02/14 23:11:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36901. 
17/02/14 23:11:29 INFO NettyBlockTransferService: Server created on 192.168.2.68:36901 
17/02/14 23:11:29 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 
17/02/14 23:11:29 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.68:36901 with 870.9 MB RAM, BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 
17/02/14 23:11:29 INFO NettyUtil: Found Netty's native epoll transport in the classpath, using it 
17/02/14 23:11:31 INFO Cluster: New Cassandra host /xxx.xxx.xxx.xxx:9042 added 
17/02/14 23:11:31 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster 
17/02/14 23:11:32 INFO SparkContext: Starting job: collect at SparkConnector.scala:28 
17/02/14 23:11:32 INFO DAGScheduler: Got job 0 (collect at SparkConnector.scala:28) with 6 output partitions 
17/02/14 23:11:32 INFO DAGScheduler: Final stage: ResultStage 0 (collect at SparkConnector.scala:28) 
17/02/14 23:11:32 INFO DAGScheduler: Parents of final stage: List() 
17/02/14 23:11:32 INFO DAGScheduler: Missing parents: List() 
17/02/14 23:11:32 INFO DAGScheduler: Submitting ResultStage 0 (CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18), which has no missing parents 
17/02/14 23:11:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 8.4 KB, free 870.9 MB) 
17/02/14 23:11:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 870.9 MB) 
17/02/14 23:11:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.68:36901 (size: 4.4 KB, free: 870.9 MB) 
17/02/14 23:11:32 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996 
17/02/14 23:11:32 INFO DAGScheduler: Submitting 6 missing tasks from ResultStage 0 (CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18) 
17/02/14 23:11:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 6 tasks 
17/02/14 23:11:39 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 

Ich

mit
  • scala 2.11.6
  • Funken 2.1.0 (sowohl für Einzelfunken und die Abhängigkeit in der Anwendung)
  • funken cassandra-connector 2.0.0-M3
  • Cassandra Java-Treiber 3.0.0
  • Apache Cassandra 3.9

Versionskompatibilitätstabelle für Cassandra-Connector zeigt kein Problem damit, aber ich kann nichts anderes herausfinden, das das Problem sein könnte.

Antwort

1

Ich habe endlich das Problem gelöst, das ich hatte. Es stellte sich heraus, dass es das Problem mit dem Pfad war. Ich benutzte den lokalen Pfad zum Glas, aber ich vermisste das Hinzufügen von "." Am Anfang wurde es also als absoluter Weg behandelt. Leider gab es keine Ausnahme in der Anwendung, die angibt, dass die Datei nicht auf dem angegebenen Pfad existiert, und die einzige Ausnahme, die ich hatte, stammte vom Worker, der die JAR-Datei im Spark-Cluster nicht finden konnte.