2017-10-17 5 views

Antwort

0

Sie Elemente aus und Apache Funke und emr-DynamoDB-Anschluss Bibliothek schreiben Artikel zu DynamoDB Tabellen lesen. Zum Lesen von Daten können Sie javaSparkContext.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class); verwenden und Daten in DynamoDB schreiben: javaPairRDD.saveAsHadoopDataset(jobConf);. Unten ist ein Beispiel (Arbeiten in EMR und Nicht-EMR-Umgebungen):

public static void main(String[] args) throws Exception { 
    SparkConf conf = new SparkConf() 
      .setAppName("DynamoDBApplication") 
      .setMaster("local[4]") 
      .registerKryoClasses(new Class<?>[]{ 
        Class.forName("org.apache.hadoop.io.Text"), 
        Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable") 
      }); 

    JavaSparkContext sc = new JavaSparkContext(conf); 

    JobConf jobConf = getDynamoDbJobConf(sc, "TableNameForRead", "TableNameForWrite"); 

    // read all items from DynamoDB table with name TableNameForRead 
    JavaPairRDD<Text, DynamoDBItemWritable> javaPairRdd = sc.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class); 
    System.out.println("count: " + javaPairRdd.count()); 

    // process data in any way, below is just a simple example 
    JavaRDD<Map<String, AttributeValue>> javaRDD = javaPairRdd.map(t -> { 
     DynamoDBItemWritable item = t._2(); 
     Map<String, AttributeValue> attrs = item.getItem(); 
     String hashKey = attrs.get("key").getS(); 
     Long result = Long.valueOf(attrs.get("resultAttribute").getN()); 
     System.out.println(String.format("hashKey=%s, result=%d", hashKey, result)); 
     return attrs; 
    }); 
    System.out.println("count: " + javaRDD.count()); 

    // update JavaPairRdd in order to store it to DynamoDB, below is just a simple example with updating hashKey 
    JavaPairRDD<Text, DynamoDBItemWritable> updatedJavaPairRDD = javaPairRdd.mapToPair(t -> { 
     DynamoDBItemWritable item = t._2(); 
     Map<String, AttributeValue> attrs = item.getItem(); 
     String hashKey = attrs.get("key").getS(); 
     String updatedHashKey = hashKey + "_new"; 
     attrs.get("key").setS(updatedHashKey); 
     return new Tuple2<>(t._1(), item); 
    }); 

    // write items to DynamoDB table with name TableNameForWrite 
    updatedJavaPairRDD.saveAsHadoopDataset(jobConf); 

    sc.stop(); 
} 


private static JobConf getDynamoDbJobConf(JavaSparkContext sc, String tableNameForRead, String tableNameForWrite) { 
    final JobConf jobConf = new JobConf(sc.hadoopConfiguration()); 
    jobConf.set("dynamodb.servicename", "dynamodb"); 

    jobConf.set("dynamodb.input.tableName", tableNameForRead); 
    jobConf.set("dynamodb.output.tableName", tableNameForWrite); 

    jobConf.set("dynamodb.awsAccessKeyId", "YOUR_AWS_ACCESS_KEY"); 
    jobConf.set("dynamodb.awsSecretAccessKey", "YOUR_AWS_SECRET_KEY"); 
    jobConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com"); 
    jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat"); 
    jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat"); 

    return jobConf; 
} 

Für diesen Code laufen die folgenden Maven Abhängigkeiten müssen:

<dependencies> 

    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-core</artifactId> 
     <version>2.6.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.6.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-annotations</artifactId> 
     <version>2.6.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.module</groupId> 
     <artifactId>jackson-module-scala_2.10</artifactId> 
     <version>2.6.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>${spark.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>${spark.version}</version> 
    </dependency> 

    <dependency> 
     <groupId>com.amazonaws</groupId> 
     <artifactId>aws-java-sdk-emr</artifactId> 
     <version>1.11.113</version> 
    </dependency> 
    <dependency> 
     <groupId>com.amazonaws</groupId> 
     <artifactId>aws-java-sdk-dynamodb</artifactId> 
     <version>1.11.113</version> 
    </dependency> 

    <!-- https://github.com/awslabs/emr-dynamodb-connector --> 
    <dependency> 
     <groupId>com.amazon.emr</groupId> 
     <artifactId>emr-dynamodb-hadoop</artifactId> 
     <version>4.2.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-aws</artifactId> 
     <version>2.8.0</version> 
    </dependency> 

</dependencies>