0

Ich benutze Spark Structured Streaming für Machine Learning in Echtzeit, und ich möchte Vorhersagen in meinem Cassandra-Cluster gespeichert.Spark (SQL/Structured Streaming) Cassandra - PreparedStatement

Da ich in einem Streaming-Kontext bin, mehrere Male pro Sekunde die gleiche Anfrage ausführen, ist eine obligatorische Optimierung PreparedStatement zu verwenden.

Im cassandra Funken Treiber (https://github.com/datastax/spark-cassandra-connector) gibt es keine Möglichkeit PreparedStatement zu verwenden (in scala oder Python, überlege ich mich nicht Java als Option)

i ein scala verwenden soll (https://github.com/outworkers/phantom)/Python (https://github.com/datastax/python-driver) Cassandra Treiber? Wie funktioniert es, dann muss mein Verbindungsobjekt serialisierbar sein, um an die Arbeiter weitergeleitet zu werden?

Wenn mir jemand helfen kann!

Thanks :)

Antwort

1

Um eine vorbereitete Erklärung zu tun, und dann werden die Daten in Cassandra registrieren während der Verarbeitung mit strukturierten Funken Streaming Streaming, benötigen Sie:

  • Import com.datastax.driver.core. Session
  • Import com.datastax.spark.connector.cql.CassandraConnector

Dann bauen Sie Ihre Stecker:

val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf) 

die beide Sitzung und Anschluss, können Sie jetzt rufen Sie die vorbereitete Anweisung Funktion in der Anweisung scala Klasse

connector.withSessionDo { session => 
Statements.PreparedStatement() 

schrieb}

Sie können endlich Beenden Sie das Schreiben der Daten in Cassandra mit der folgenden Funktion: cql ist die Funktion, die die Variablen auf die vorbereitete Erklärung bindet und führen Sie es aus:

private def processRow(value: Commons.UserEvent) = { 
    connector.withSessionDo { session => 
    session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream)) 
} 

}

Natürlich werden Sie diese Funktion nennen (processRow) in der foreach Schriftsteller

 // This Foreach sink writer writes the output to cassandra. 
import org.apache.spark.sql.ForeachWriter 
val writer = new ForeachWriter[Commons.UserEvent] { 
    override def open(partitionId: Long, version: Long) = true 
    override def process(value: Commons.UserEvent) = { 
    processRow(value) 
    } 
    override def close(errorOrNull: Throwable) = {} 
} 

val query = 
    ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start