Ich habe eine schwere Aufgabe, von einer Cassandra-Tabelle Millionen von Zeilen zu lesen. Eigentlich enthält diese Tabelle 40 bis 50 Millionen Zeilen. Die Daten sind eigentlich interne URLs für unser System und wir müssen sie alle feuern. Um es zu feuern, benutzen wir Akka-Streams und es hat ziemlich gut funktioniert. Aber wir haben noch immer keinen Weg gefunden, alles effektiv zu lesen.Wie effektiv lesen Millionen von Zeilen von Cassandra?
Was wir bisher versucht haben:
die Daten als Strom mit Akka-Stream lesen. Wir verwenden phantom-dsl, das einen Publisher für eine bestimmte Tabelle bereitstellt. Aber es liest nicht alles, nur eine kleine Portion. Eigentlich hört es auf, nach der ersten Million zu lesen.
Lesen mit Spark zu einem bestimmten Datum. Unser Tisch ist wie eine Zeitreihentabelle mit Spalten für Jahr, Monat, Tag, Minuten ... modelliert. Gerade jetzt wählen wir bei Tag aus, also wird Spark nicht viele Dinge zur Verarbeitung holen, aber es ist ein Schmerz, all diese Tage auszuwählen.
Der Code ist der folgende:
val cassandraRdd =
sc
.cassandraTable("keyspace", "my_table")
.select("id", "url")
.where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)
Leider kann ich nicht über die Trennwände durchlaufen weniger Daten zu bekommen, ich habe eine collect zu verwenden, weil es der Schauspieler beschwert ist nicht serialisierbar.
val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async
val source =
Source
.actorRef[CassandraRow](10000000, OverflowStrategy.fail)
.map(row => makeUrl(row.getString("id"), row.getString("url")))
.map(url => HttpRequest(uri = url) -> url)
val ref = Flow[(HttpRequest, String)]
.via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
.to(Sink.actorRef(httpHandlerActor, IsDone))
.runWith(source)
cassandraRdd.collect().foreach { row =>
ref ! row
}
Ich mag würde wissen, ob jemand von euch eine solche Erfahrung haben auf für Millionen von Zeilen zu lesen etwas zu tun, die sich von Aggregation und so weiter.
Auch ich habe gedacht, alles zu lesen und zu einem Kafka-Thema zu senden, wo ich mit Streaming (Funken oder Akka) empfangen würde, aber das Problem wäre das gleiche, wie alle diese Daten effektiv zu laden?
EDIT
Vorerst bin ich mit einer angemessenen Menge an Speicher von 100 GB auf einem Cluster ausgeführt wird und ein R-und iterieren es zu tun.
Auch ist dies ganz anders bekommen bigdata mit Funken und analysieren es Dinge wie reduceByKey, aggregateByKey mit, etc, etc.
Ich brauche alles, was zu holen und senden über HTTP =/
Bisher Es funktioniert so, wie ich es gemacht habe, aber ich befürchte, dass diese Daten immer größer werden und dass es keinen Sinn macht, alles in den Speicher zu holen.
Streaming dieser Daten wäre die beste Lösung, holen in Stücke, aber ich habe noch keinen guten Ansatz dafür gefunden.
Am Ende denke ich daran, Spark zu verwenden, um alle diese Daten zu erhalten, eine CSV-Datei zu generieren und Akka Stream IO zu verarbeiten, auf diese Weise würde ich vertreiben, um eine Menge Dinge im Speicher zu halten, da es dauert Stunden, um jede Million zu verarbeiten.
Dies ist eine ziemlich große Frage zu Stackoverflow zu beantworten und es ist nicht besonders mit Phantom von dem, was ich sagen kann, verwandt. Mein starkes Gefühl ist, dass der ganze Täter über HTTP gehen muss. Gemeinsame Nutzung eines HDFS oder einer anderen Methode, z. B. das Abrufen der HTTP-Bits und so weiter. Es wäre interessant, mehr Einblick zu bekommen, wie Phantom-Streaming Ihnen nicht alles zurückgibt, ich vermute, dass es entweder überläuft oder vielleicht überzeichnet, so dass es denkt, dass es fertig ist, bevor es tatsächlich ist. Klingt wirklich seltsam. – flavian
Ja, in der Tat, das ist keine reine Phantomfrage, aber ich habe hinzugefügt, dass dies meine erste Idee war, aber es leider nach 1 Million oder so aufhören. Auch das HTTP ist hier ein Flaschenhals und es dauert Stunden, alles zu verarbeiten, und vielleicht ist das reaktive Phantom in Schwierigkeiten geraten, weil es klar ist, dass der HTTP-Rückstau den Phantom-Stream freigibt, so dass ich nach einigen Stunden nicht lesen konnte mehr und dachte, es hat schon getan. –