2017-07-19 3 views
1

Ich hatte eine Anforderung, Streaming-Daten in DynamoDB-Tabelle zu laden. Ich habe Below Code versucht.kann DynamoDB-Client in Spark nicht erstellen Executor

object UnResolvedLoad { 

    def main(args: Array[String]){ 
    val spark = SparkSession.builder().appName("unresolvedload").enableHiveSupport().getOrCreate() 
    val tokensDf = spark.sql("select * from unresolved_logic.unresolved_dynamo_load") 
    tokensDf.foreachPartition { x => loadFunc(x) } 
    } 


    def loadFunc(iter : Iterator[org.apache.spark.sql.Row]) = { 

     val client:AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard().build() 
     val dynamoDB:DynamoDB = new DynamoDB(client) 
     val table:Table = dynamoDB.getTable("UnResolvedTokens") 

     while(iter.hasNext){ 
     val cur = iter.next() 
     val item:Item = new Item().withString("receiverId ", cur.get(2).asInstanceOf[String]). 
       withString("payload_id", cur.get(0).asInstanceOf[String]). 
       withString("payload_confirmation_code", cur.get(1).asInstanceOf[String]). 
       withString("token", cur.get(3).asInstanceOf[String]) 

     table.putItem(item) 

     } 

} 

}

Wenn ich das Ausführen Funken Senden sie nicht in der Lage ist Klasse zu instanziiert. Unten ist eine Fehlermeldung. Es besagt, dass es Klasse nicht instanziieren konnte. Hilfe wird geschätzt. Gibt es eine Möglichkeit wir

, executor 5): java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder 
     at com.dish.payloads.UnResolvedLoad$.loadFunc(UnResolvedLoad.scala:22) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:748) 

17/07/19 17:35:15 INFO TaskSetManager: Lost task 26.0 in stage 0.0 (TID 26) on ip-10-176-225-151.us-west-2.compute.internal, executor 5: java.lang.NoClassDefFoundError (Could not initialize class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder) [duplicate 1] 
17/07/19 17:35:15 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-176-225-151.us-west-2.compute.internal, executor 5): java.lang.IllegalAccessError: tried to access class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientConfigurationFactory from class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder 
     at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder.<clinit>(AmazonDynamoDBClientBuilder.java:30) 
     at com.dish.payloads.UnResolvedLoad$.loadFunc(UnResolvedLoad.scala:22) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:748) 

Antwort

1

Endlich konnte ich Spark-DataSet in Amazon DynamoDB speichern kann es lösen, indem sie niedrigere Version von DynamoDB API. Der EMR 5.7 unterstützt nur 1.10.75.1. Unten ist der Code, der für mich funktioniert.

object UnResolvedLoad { 

    def main(args: Array[String]){ 
    val spark = SparkSession.builder().appName("unresolvedload").enableHiveSupport().getOrCreate() 
    val tokensDf = spark.sql("select * from unresolved_logic.unresolved_dynamo_load") 
    tokensDf.foreachPartition { x => loadFunc(x) } 
    } 


    def loadFunc(iter : Iterator[org.apache.spark.sql.Row]) = { 

     val client:AmazonDynamoDBClient = new AmazonDynamoDBClient(); 
     val usWest2 = Region.getRegion(Regions.US_WEST_2); 
     client.setRegion(usWest2) 



     while(iter.hasNext){ 
     val cur = iter.next() 

     val putMap = Map("receiverId" -> new AttributeValue(cur.get(2).asInstanceOf[String]), 
          "payload_id" -> new AttributeValue(cur.get(0).asInstanceOf[String]), 
          "payload_confirmation_code" -> new AttributeValue(cur.get(1).asInstanceOf[String]), 
          "token" -> new AttributeValue(cur.get(3).asInstanceOf[String])).asJava 

     val putItemRequest:PutItemRequest = new PutItemRequest("UnResolvedTokens",putMap) 
     client.putItem(putItemRequest) 
     } 

    } 
} 
+0

Spart mir eine große Zeit! Danke, Mann. Wie hast du daran gedacht? Es ist wirklich schwer, in diese Richtung zu denken – KAs

+0

Ich habe diese Antwort von AWS Support Forum bekommen –

Verwandte Themen