-2

Ich bin neu bei Spark-Cassandra und Scala. Ich habe eine bestehende RDD. sagen wir mal:Scala Spark Filter RDD mit Cassandra

((url_hash, url, created_timestamp)).

Ich möchte diese RDD basierend auf URL_hash filtern. Wenn url_hash in der Cassandra-Tabelle vorhanden ist, möchte ich es aus der RDD herausfiltern, damit ich nur die neuen URLs verarbeiten kann.

Cassandra Tabelle sieht wie folgt aus:

url_hash| url | created_timestamp | updated_timestamp 

Alle Zeiger wird groß sein.

habe ich versucht, so etwas wie dies folgt aus:

case class UrlInfoT(url_sha256: String, full_url: String, created_ts: Date) 
    def timestamp = new java.utils.Date() 
    val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp))) 
    val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts") 
    val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts))) 
    newUrlsRDD = rdd1.subtractByKey(rdd3) 

I cassandra Fehler

java.lang.NullPointerException: Unexpected null value of column full_url in  keyspace.url_info.If you want to receive null values from Cassandra, please wrap the column type into Option or use JavaBeanColumnMapper 

Es bin immer keine Nullwerte in cassandra Tabelle

+1

Was haben Sie versucht? Verwandle die Cassandra-Tabelle in eine andere RDD, "Map", so dass sie 'url_hash' als Schlüssel haben, dann benutze' subtractByKey'? –

+0

Danke für den Zeiger. Ich habe die Frage mit dem, was ich versucht habe, aktualisiert. Jetzt bekomme ich eine Nullzeiger-Ausnahme – Abhishek

Antwort

1

Dank der archetypische Paul!

Ich hoffe, jemand findet das nützlich. Musste Option zur Fallklasse hinzufügen.

Wir freuen uns auf bessere Lösungen

case class UrlInfoT(url_sha256: String, full_url: Option[String], created_ts: Option[Date]) 

def timestamp = new java.utils.Date() 
val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp))) 
val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts") 
val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts))) 
newUrlsRDD = rdd1.subtractByKey(rdd3)