5

Ich habe eine schwere Aufgabe, von einer Cassandra-Tabelle Millionen von Zeilen zu lesen. Eigentlich enthält diese Tabelle 40 bis 50 Millionen Zeilen. Die Daten sind eigentlich interne URLs für unser System und wir müssen sie alle feuern. Um es zu feuern, benutzen wir Akka-Streams und es hat ziemlich gut funktioniert. Aber wir haben noch immer keinen Weg gefunden, alles effektiv zu lesen.Wie effektiv lesen Millionen von Zeilen von Cassandra?

Was wir bisher versucht haben:

  • die Daten als Strom mit Akka-Stream lesen. Wir verwenden phantom-dsl, das einen Publisher für eine bestimmte Tabelle bereitstellt. Aber es liest nicht alles, nur eine kleine Portion. Eigentlich hört es auf, nach der ersten Million zu lesen.

  • Lesen mit Spark zu einem bestimmten Datum. Unser Tisch ist wie eine Zeitreihentabelle mit Spalten für Jahr, Monat, Tag, Minuten ... modelliert. Gerade jetzt wählen wir bei Tag aus, also wird Spark nicht viele Dinge zur Verarbeitung holen, aber es ist ein Schmerz, all diese Tage auszuwählen.

Der Code ist der folgende:

val cassandraRdd = 
     sc 
     .cassandraTable("keyspace", "my_table") 
     .select("id", "url") 
     .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth) 

Leider kann ich nicht über die Trennwände durchlaufen weniger Daten zu bekommen, ich habe eine collect zu verwenden, weil es der Schauspieler beschwert ist nicht serialisierbar.

val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async 

val source = 
    Source 
    .actorRef[CassandraRow](10000000, OverflowStrategy.fail) 
    .map(row => makeUrl(row.getString("id"), row.getString("url"))) 
    .map(url => HttpRequest(uri = url) -> url) 

val ref = Flow[(HttpRequest, String)] 
    .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider))) 
    .to(Sink.actorRef(httpHandlerActor, IsDone)) 
    .runWith(source) 

cassandraRdd.collect().foreach { row => 
    ref ! row 
} 

Ich mag würde wissen, ob jemand von euch eine solche Erfahrung haben auf für Millionen von Zeilen zu lesen etwas zu tun, die sich von Aggregation und so weiter.

Auch ich habe gedacht, alles zu lesen und zu einem Kafka-Thema zu senden, wo ich mit Streaming (Funken oder Akka) empfangen würde, aber das Problem wäre das gleiche, wie alle diese Daten effektiv zu laden?

EDIT

Vorerst bin ich mit einer angemessenen Menge an Speicher von 100 GB auf einem Cluster ausgeführt wird und ein R-und iterieren es zu tun.

Auch ist dies ganz anders bekommen bigdata mit Funken und analysieren es Dinge wie reduceByKey, aggregateByKey mit, etc, etc.

Ich brauche alles, was zu holen und senden über HTTP =/

Bisher Es funktioniert so, wie ich es gemacht habe, aber ich befürchte, dass diese Daten immer größer werden und dass es keinen Sinn macht, alles in den Speicher zu holen.

Streaming dieser Daten wäre die beste Lösung, holen in Stücke, aber ich habe noch keinen guten Ansatz dafür gefunden.

Am Ende denke ich daran, Spark zu verwenden, um alle diese Daten zu erhalten, eine CSV-Datei zu generieren und Akka Stream IO zu verarbeiten, auf diese Weise würde ich vertreiben, um eine Menge Dinge im Speicher zu halten, da es dauert Stunden, um jede Million zu verarbeiten.

+0

Dies ist eine ziemlich große Frage zu Stackoverflow zu beantworten und es ist nicht besonders mit Phantom von dem, was ich sagen kann, verwandt. Mein starkes Gefühl ist, dass der ganze Täter über HTTP gehen muss. Gemeinsame Nutzung eines HDFS oder einer anderen Methode, z. B. das Abrufen der HTTP-Bits und so weiter. Es wäre interessant, mehr Einblick zu bekommen, wie Phantom-Streaming Ihnen nicht alles zurückgibt, ich vermute, dass es entweder überläuft oder vielleicht überzeichnet, so dass es denkt, dass es fertig ist, bevor es tatsächlich ist. Klingt wirklich seltsam. – flavian

+0

Ja, in der Tat, das ist keine reine Phantomfrage, aber ich habe hinzugefügt, dass dies meine erste Idee war, aber es leider nach 1 Million oder so aufhören. Auch das HTTP ist hier ein Flaschenhals und es dauert Stunden, alles zu verarbeiten, und vielleicht ist das reaktive Phantom in Schwierigkeiten geraten, weil es klar ist, dass der HTTP-Rückstau den Phantom-Stream freigibt, so dass ich nach einigen Stunden nicht lesen konnte mehr und dachte, es hat schon getan. –

Antwort

5

Nun, nach einiger Zeit zu verbringen Lesen, Gespräche mit anderen Jungs und tun Tests das Ergebnis durch die folgende Codebeispiel erreichen könnte:

val sc = new SparkContext(sparkConf) 

val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable") 
    .select("key", "value") 
    .as((key: String, value: String) => (key, value)) 
    .partitionBy(new HashPartitioner(2 * sc.defaultParallelism)) 
    .cache() 

cassandraRdd 
    .groupByKey() 
    .foreachPartition { partition => 
    partition.foreach { row => 

     implicit val system = ActorSystem() 
     implicit val materializer = ActorMaterializer() 

     val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor") 

     val source = Source.fromIterator {() => row._2.toIterator } 
     source 
     .map { str => 
      myActor ! Count 
      str 
     } 
     .to(Sink.actorRef(myActor, Finish)) 
     .run() 
    } 
    } 

sc.stop() 


class MyActor(system: ActorSystem) extends Actor { 

    var count = 0 

    def receive = { 

    case Count => 
     count = count + 1 

    case Finish => 
     println(s"total: $count") 
     system.shutdown() 

    } 
} 

case object Count 
case object Finish 

Was ich tue, ist die folgende:

  • Versuchen Sie, mit den Methoden partitionBy und groupBy eine gute Anzahl an Partitionen und Partitionen zu erreichen.
  • Verwenden Sie Cache, um Datenmigration zu verhindern, sodass Ihr Spark große Daten über Knoten hinweg mit hoher E/A usw. verschieben kann.
  • Erstellen Sie das gesamte Akteursystem mit seinen Abhängigkeiten sowie den Stream innerhalb der foreachPartition-Methode. Hier ist eine Abwägung, Sie können nur ein ActorSystem haben, aber Sie müssen eine schlechte Verwendung von .collect machen, wie ich in der Frage geschrieben habe. Wenn Sie jedoch alles im Inneren erstellen, haben Sie immer noch die Möglichkeit, Dinge innerhalb des Funkens auszuführen, die über Ihren Cluster verteilt sind.
  • Beende Schauspieler System am Ende des Iterators der Sink.actorRef mit einer Nachricht unter Verwendung Ich bin zu töten (Fertig)

Vielleicht könnte dieser Code noch weiter verbessert, aber bisher glücklich zu tun benutze .collect nicht mehr und arbeite nur in Spark.

+0

Das ist interessant, ich bin sehr neugierig auf Ihre Leistung im Allgemeinen und wenn Sie in der Lage, das zu benchmarken. Tatsächlich finde ich, dass die Erstellung eines Actor-Systems pro Partition eher eine schwere Operation ist. Abhängig von der Anzahl der Partitionen können Sie sich mit mehr als 8 Actor-Systemen auf demselben Computer auf derselben VM befinden. Wie ist das schneller als simple synchrone Aufrufe, wenn auch parallel? – MaatDeamon

+0

@MaatDeamon Ich erinnere mich, dass es ziemlich schnell war, aber leider nicht gemessen haben. Wenn ich es heute noch einmal machen müsste, würde ich mich für den Alpakka Cassandra Connector entscheiden. –

+0

Ich sehe. Sie würden die Verteilung verschenken? – MaatDeamon

Verwandte Themen