0

Ich habe einen RDD [PersonType] = [PID, CID, Vorname, Nachname, Alter, Quelle, sourceType, Nachricht] Wert als RDD = [1000,100, Vikash, Singh, 33 , Quelle, sourceType, Nachricht]Update-Spalte des Settyps in Cassandra Zeile

und ich habe csaandra Zeile als [PID, CID, Vorname, Nachname, Alter, Abteilung, MRIDs] hier MRids gesetzt ist. angenommen Wert in Cassandra ist [1000,100, vikash, singh, 33, Bank, {sourceold.sourceTypeold.messageold}

Ich möchte die Cassandra-Spalte MRIDs mit alten und neuen Wert aktualisieren. Mein neuer aktualisierter Wert in cassandra sollte so sein [1000.100, vikash, singh, 33, Bank, {sourceold.sourceTypeold.messageold, source.sourceType.message}

Bitte sagen Sie mir, wie mrids Spalte zu aktualisieren.

val rdd[personType] = rdd1 
val rdd2 = sc.cassandraTable(keyspace,tablename) 
       .select("p_id","c_id", "mrids") 

welchen Code soll ich als nächstes schreiben, um dies zu erreichen?

Antwort

0

Dies sollte Ihnen den Anfang machen.

Es zeigt Ihnen, wie Join von RDD basierend auf Schlüssel zu tun, und fügen Sie Daten in Satz einer anderen RDD.

val temp = List((1, 4, Set(1)), 
        (2, 5, Set(2)), 
        (3, 6, Set(3)) 
        ) 
val temp2 = List((1, 11, 11), 
        (2, 11, 22), 
        (3, 11, 33) 
       ) 
val temp_rdd = sc.parallelize(temp) 

val temp2_rdd = sc.parallelize(temp2) 

val test = temp_rdd.map{case(key, data, set)=>((key),(data, set))} 
         .join(temp2_rdd.map{case(key, data, set_new_value)=>((key),(data, set_new_value))}) 
         .map{case(key, ((data1, set),(data2, set_new_value)))=>(key, set.toSet + set_new_value)} 


test.collect().foreach(println) 

am Ende können Sie rdd.saveToCassandra verwenden, um die rdd resultset zu speichern.