2016-09-13 4 views
0

Hallo mit (sage User Objects, die firstName, lastName etc ..) von meiner Java-Klasse zu scala Klasse und bis hier ist es fein ich bin in der Lage auf Benutzer-Objekt zugreifen und in der Lage, seinen InhaltSaving JavaList zu Cassandra Tabelle Funken Zusammenhang

2) Jetzt möchte ich diese Benutzerliste speichern in Cassandra Tabelle mit Funken Kontext, ich habe durch viele Beispiele gegangen, aber jeder, wo ich sehe Seq mit unserem caseClass und fest codierten Werte und dann auf cassandra Speichern, ich habe für mich das und arbeiten gut versucht, wie unten

import scala.collection.JavaConversions._ 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

import com.datastax.spark.connector._ 
import java.util.ArrayList 

object SparkCassandra extends App { 
    val conf = new SparkConf() 
     .setMaster("local[*]") 
     .setAppName("SparkCassandra") 
     //set Cassandra host address as your local address 
     .set("spark.cassandra.connection.host", "127.0.0.1") 
    val sc = new SparkContext(conf) 
    val usersList = Test.getUsers 
    usersList.foreach(x => print(x.getFirstName)) 
    val collection = sc.parallelize(Seq(userTable("testName1"), userTable("testName1"))) 
    collection.saveToCassandra("demo", "user", SomeColumns("name")) 
    sc.stop() 
} 

case class userTable(name: String) 

Aber hier meine Forderung ist dynamisch Werte von meinem usersList statt hardcoaded Werte, oder jede andere Art und Weise zu verwenden, um Dies erreichen.

+0

Wie viele Benutzer? Wo werden diese Werte gespeichert? –

+0

Es wird bis zu 20k User geben, actullay Ich bekomme diese Liste von einer anderen JavaClass und muss in Cassandra Tabelle speichern –

+0

Solange Sie parallelisieren, sollte es funktionieren. Wie wäre es mit einem Seq, der alle Case-Klassenobjekte von "userTable" aus "usersList" enthält und parallelisiert und speichert? – Sreekar

Antwort

0

Endlich habe ich Lösung für meine Anforderung getestet und funktioniert gut, wie unten:

Mein Scala Code:

import scala.collection.JavaConversions.asScalaBuffer 
import scala.reflect.runtime.universe 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import com.datastax.spark.connector.SomeColumns 
import com.datastax.spark.connector.toNamedColumnRef 
import com.datastax.spark.connector.toRDDFunctions 

object JavaListInsert { 
    def randomStores(sc: SparkContext, users: List[User]): RDD[(String, String, String)] = { 
     sc.parallelize(users).map { x => 
     val fistName = x.getFirstName 
     val lastName = x.getLastName 
     val city = x.getCity 
     (fistName, lastName, city) 
    } 
    } 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("cassandraInsert") 
    val sc = new SparkContext(conf) 
    val usersList = Test.getUsers.toList 
    randomStores(sc, usersList). 
     saveToCassandra("test", "stores", SomeColumns("first_name", "last_name", "city")) 
    sc.stop 
    } 
} 

Java Pojo Objekt:

import java.io.Serializable; 
    public class User implements Serializable{ 
     private static final long serialVersionUID = -187292417543564400L; 
     private String firstName; 
     private String lastName; 
     private String city; 

     public String getFirstName() { 
      return firstName; 
     } 

     public void setFirstName(String firstName) { 
      this.firstName = firstName; 
     } 

     public String getLastName() { 
      return lastName; 
     } 

     public void setLastName(String lastName) { 
      this.lastName = lastName; 
     } 

     public String getCity() { 
      return city; 
     } 

     public void setCity(String city) { 
      this.city = city; 
     } 
} 

Java-Klasse Liste der Benutzer zurück:

import java.util.ArrayList; 
import java.util.List; 


public class Test { 
    public static List<User> getUsers() { 
     ArrayList<User> usersList = new ArrayList<User>(); 
     for(int i=1;i<=100;i++) { 
      User user = new User(); 
      user.setFirstName("firstName_+"+i); 
      user.setLastName("lastName_+"+i); 
      user.setCity("city_+"+i); 
      usersList.add(user); 
     } 
     return usersList; 
    } 
} 
0

Wenn Sie ein Objekt RDD von CassandraRow erstellen, können Sie das Ergebnis direkt speichern, ohne Spalten oder Fallklassen angeben zu müssen. Darüber hinaus hat CassandraRow die äußerst bequeme Funktion fromMap, so dass Sie Ihre Zeilen als Map Objekte definieren, konvertieren und speichern können.

Beispiel:

val myData = sc.parallelize(
    Seq(
    Map("name" -> "spiffman", "address" -> "127.0.0.1"), 
    Map("name" -> "Shabarinath", "address" -> "127.0.0.1") 
) 
) 

val cassandraRowData = myData.map(rowMap => CassandraRow.fromMap(rowMap)) 

cassandraRowData.saveToCassandra("keyspace", "table") 
+0

Danke für die Antwort, Hier ist meine Anforderung nicht hartcodierte Werte anstelle von "spiffman" und " shabarinath "Ich muss die Listenobjektwerte verwenden –

+0

Was ist der Typ des Listenobjekts? Kannst du es in eine Map umwandeln? – spiffman

+0

Liste der einfachen pojo Benutzerobjekt, das Vorname und Getter und Setter enthält und ich möchte diese Liste speichern –

Verwandte Themen