2017-02-09 5 views
0

Ich versuche Java Spark-Streaming mit Cassandra zu tun. Ich habe dasselbe mit Scala gemacht, aber ich habe keine Ahnung, wie ich in Java vorgehen soll. Das Web hat mir keine Beispiele mit Java-Spark-Streaming und Cassandra gegeben.Java Spark Streaming mit Cassandra

Könnte jemand zeigen Sie mir bitte, wie die unten Scala Code in Java haben:

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 

Jede Hilfe sehr geschätzt wird. Danke

Antwort

1

In Ihrer ForeachRDD-Transformation können Sie Ihre Daten nach Cassandra-Tabellenformat umwandeln.

JavaRDD<TestBean> cassandraRDD = testRDD 
       .flatMap(new FlatMapFunction<Tuple2<String, List<Map<String, Object>>>, TestBean>() { 

        private static final long serialVersionUID = 1L; 

        @Override 
        public Iterable<TestBean> call(Tuple2<String, List<Map<String, Object>>> tuple) throws Exception { 

         return rawData; 
        } 
       }); 

      javaFunctions(jsonRDD).writerBuilder(CASSANDRA_KEYSPACE,CASSANDRA_TABLE, mapToRow(TestBean.class)).saveToCassandra(); 
+0

Das sind meine Import-Anweisungen import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; Import statisch com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; –