2016-10-17 6 views
0

Ich möchte Spark Daten Daten auf Couchbase schreiben. Dazu versuche ich, es zu tun, wie folgt: -Sparen Sie ein Spark-Datenframe zu couchbase

double[] val=new double[3]; 
SparkContext sc = new SparkContext(new SparkConf().setAppName("sql").setMaster("local").set("com.couchbase.nodes", "url_of_couchbase").set("com.couchbase.bucket.bucket_name", "password")); 
SQLContext sql = new SQLContext(sc); 
DataFrame df = sql.read().json("sample.json"); 
df.registerTempTable("sample"); 

DataFrame men=sql.sql("select mean(imp_recall_interval) from sample"); 
Row[] r=men.collect(); 
val[0]=Double.parseDouble(r[0].toString().replace("[", "").replace("]", "").trim()); 
JsonDocument doc1=JsonDocument.create("docId", JsonObject.create().put("mean", val[0])); 
System.out.println("Data Saved"); 
JsonArrayDocument jrd=JsonArrayDocument.create("imp_recall_timeinterval_mean_median_sd", JsonArray.from("more", "content", "in", "here")); 

Aber wenn ich versuche, diese zu parrallelize, ich bin nicht in der Lage, das zu tun.

sc.parrallelize(Seq(doc1,jrd)); 

Bitte sagen Sie mir, wie kann ich diese Daten in couchbase speichern. Oder bitte geben Sie auch andere Methoden an, mit denen ich ein Create erstellen und das Dokument in Couchbase speichern kann

Antwort

0

Versuchen Sie dies.

import java.util.ArrayList; 
import java.util.List; 
import com.couchbase.spark.japi.CouchbaseDocumentRDD; 
import com.couchbase.client.java.document.AbstractDocument; 


JavaSparkContext jsc = new JavaSparkContext(sc); 
SQLContext sql = new SQLContext(jsc); 

JsonDocument doc1; 
JsonArrayDocument jrd; 

List<AbstractDocument> list = new ArrayList<AbstractDocument>(); 
list.add(doc1); 
list.add(jrd); 

JavaRDD<AbstractDocument> jRDD = jsc.parallelize(list); 
CouchbaseDocumentRDD<AbstractDocument> cbRDD = CouchbaseDocumentRDD.couchbaseDocumentRDD(jRDD); 
cbRDD.saveToCouchbase(); 
+0

was ist das impot für Liste. –

+0

weil list.add(); Funktion funktioniert nicht. –

+0

Hinzugefügt Importe in Antwort oben. – abaghel