2016-05-19 7 views
0

Ich versuche eine Anwendung zur Protokollverarbeitung mit Scala (2.11), Spark Streaming (1.5.0) und Cassandra (3.5) zu erstellen. Im Moment auf den ersten Satz von RDD Empfangen der Einzelteile und Lauf foreachRDD (...), unten gezeigt,rdd.saveAsCassandraTable() erstellt Tabelle, schreibt aber keine rdd-Einträge in Cassandra

  1. Das erste Element in der Menge ohne Probleme gedruckt wird
  2. Die saveAsCassandraTable() Methode richtig Erstellt das erforderliche Tabellenschema in Cassandra, fügt jedoch keine der RDD-Einträge in die Tabelle ein.

    logitems.foreachRDD(items => { 
        if (items.count() == 0) 
        println("No log item received") 
        else{ 
        val first = items.first() 
        println(first.timestamp) // WORKS: Shows the timestamp in the first rdd element 
    
        items.saveAsCassandraTable("analytics", "test_logs", SomeColumns("timestamp", "c_ip", "c_referrer", "c_user_agent")) 
        //table schema is created but the rdd items are not written 
        } 
    }) 
    
    
    
    16/05/19 16:15:06 INFO Cluster: New Cassandra host /192.168.1.95:9042 added 
    16/05/19 16:15:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster 
    16/05/19 16:15:07 INFO SparkContext: Starting job: foreachRDD at StreamingApp.scala:27 
    16/05/19 16:15:07 INFO DAGScheduler: Got job 8 (foreachRDD at StreamingApp.scala:27) with 8 output partitions 
    16/05/19 16:15:07 INFO DAGScheduler: Final stage: ResultStage 6(foreachRDD at StreamingApp.scala:27) 
    16/05/19 16:15:07 INFO DAGScheduler: Parents of final stage: List() 
    16/05/19 16:15:07 INFO DAGScheduler: Missing parents: List() 
    16/05/19 16:15:07 INFO DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[9] at map at StreamingApp.scala:24), which has no missing parents 
    16/05/19 16:15:07 INFO MemoryStore: ensureFreeSpace(13272) called with curMem=122031, maxMem=1538166620 
    16/05/19 16:15:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 13.0 KB, free 1466.8 MB) 
    16/05/19 16:15:07 INFO MemoryStore: ensureFreeSpace(5909) called with curMem=135303, maxMem=1538166620 
    16/05/19 16:15:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 5.8 KB, free 1466.8 MB) 
    16/05/19 16:15:07 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:63323 (size: 5.8 KB, free: 1466.8 MB) 
    16/05/19 16:15:07 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:861 
    16/05/19 16:15:07 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 6 (MapPartitionsRDD[9] at map at StreamingApp.scala:24) 
    16/05/19 16:15:07 INFO TaskSchedulerImpl: Adding task set 6.0 with 8 tasks 
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 34, localhost, PROCESS_LOCAL, 1943 bytes) 
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 35, localhost, PROCESS_LOCAL, 1943 bytes) 
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 36, localhost, PROCESS_LOCAL, 1943 bytes) 
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 3.0 in stage 6.0 (TID 37, localhost, PROCESS_LOCAL, 1943 bytes) 
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 4.0 in stage 6.0 (TID 38, localhost, PROCESS_LOCAL, 1943 bytes) 
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 5.0 in stage 6.0 (TID 39, localhost, PROCESS_LOCAL, 1943 bytes) 
    16/05/19 16:15:07 INFO TaskSetManager: Starting task 6.0 in stage 6.0 (TID 40, localhost, PROCESS_LOCAL, 1943 bytes) 
    16/05/19 16:15:07 INFO Executor: Running task 0.0 in stage 6.0 (TID 34) 
    16/05/19 16:15:07 INFO Executor: Running task 1.0 in stage 6.0 (TID 35) 
    16/05/19 16:15:07 INFO Executor: Running task 4.0 in stage 6.0 (TID 38) 
    16/05/19 16:15:07 INFO Executor: Running task 5.0 in stage 6.0 (TID 39) 
    16/05/19 16:15:07 INFO Executor: Running task 3.0 in stage 6.0 (TID 37) 
    16/05/19 16:15:07 INFO Executor: Running task 2.0 in stage 6.0 (TID 36) 
    16/05/19 16:15:07 INFO Executor: Running task 6.0 in stage 6.0 (TID 40) 
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_6 locally 
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_3 locally 
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_4 locally 
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_2 locally 
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_1 locally 
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_5 locally 
    16/05/19 16:15:07 INFO BlockManager: Found block rdd_9_0 locally 
    16/05/19 16:15:10 INFO JobScheduler: Added jobs for time 1463667310000 ms 
    16/05/19 16:15:15 INFO JobScheduler: Added jobs for time 1463667315000 ms 
    16/05/19 16:15:20 INFO JobScheduler: Added jobs for time 1463667320000 ms 
    16/05/19 16:15:25 INFO JobScheduler: Added jobs for time 1463667325000 ms 
    16/05/19 16:15:30 INFO JobScheduler: Added jobs for time 1463667330000 ms 
    16/05/19 16:15:35 INFO JobScheduler: Added jobs for time 1463667335000 ms 
    16/05/19 16:15:40 INFO JobScheduler: Added jobs for time 1463667340000 ms 
    16/05/19 16:15:45 INFO JobScheduler: Added jobs for time 1463667345000 ms 
    
    .... continues until program is manually terminated 
    

Ich bin froh, in Bezug auf alle Hinweise zu haben, wo dieses Problem beheben zu suchen.

spark ui

Ich habe einen Screenshot der Funken ui angebracht.

+0

Sehen Sie weitere Fehler? – RussS

+0

None ... Nur wiederholte Nachrichten sagen (16/05/19 16:15:45 INFO JobScheduler: Jobs für Zeit hinzugefügt 1463667345000 ms), bis ich den Job töten muss – Akinwale

+0

Auf etwa 20 Minuten in die Aufgabe warten, habe ich die Folgefehler '16/05/19 23:36:04 ERROR QueryExecutor: Konnte nicht ausgeführt werden: com.datastax.spark.connector.writer.RichBoundStatement @ 2c166223' ' com.datastax.driver.core.exceptions.UnavailableException: Nicht genug Replikate verfügbar für Abfrage bei Konsistenz LOCAL_QUORUM (2 erforderlich, aber nur 1 am Leben) ' \t bei' com.datastax.driver.core.exceptions.UnavailableException.copy (UnavailableException.java:128) ' – Akinwale

Antwort

0

Ich habe den Verdacht, dass die nachfolgenden Aufrufe an saveAsCassandraTable fehlschlagen, weil die Tabelle bereits existiert. Sie sollten die Tabelle wahrscheinlich außerhalb der Streaming-Schleife erstellen.

Ich würde überprüfen, ob die Umstellung auf saveToCassandra das Problem behebt. Wenn dies nicht der Fall ist, kann es hilfreich sein, Executor-Logs oder einen Screenshot der Streaming-UI zu erhalten.

+0

Hallo RussS, ich begann eigentlich mit saveToCassandra(), hat aber das gleiche Verhalten. Dann wechselte ich zu saveAsCassandraTable(), um sicherzugehen, dass Spark Cassandra tatsächlich erreichte. Aber Sie haben mit der Verwendung von saveAsCassandraTable im Rahmen des Foreach-Aufrufs definitiv Recht - ich werde auf das Erste zurückgreifen. – Akinwale

+0

Kann einfügen, was Sie in der Streaming-Benutzeroberfläche sehen? – RussS

+0

Ich habe den Screenshot des Funkens hinzugefügt. Normalerweise dauert die Verarbeitung meines kleinen Datensatzes nur wenige Millisekunden ohne den Schritt ** saveToCassandra() **. Aber mit diesem Schritt geht es minutenlang weiter, bis ich die Aufgabe erledige. – Akinwale

Verwandte Themen