2017-02-15 5 views
1

Ich lerne Apache Spark mit Scala und möchte es verwenden, um eine DNA-Datensatz zu verarbeiten, die mehrere Zeilen wie diese erstreckt sich über:mehrzeilige Spark-Schiebefenster

ATGTAT 
ACATAT 
ATATAT 

ich diese in Gruppen von einer festen zuordnen möchten Größe k und zählen Sie die Gruppen. Also für k = 3, würden wir Gruppen der einzelnen Zeichen mit den nächsten zwei Zeichen erhalten:

ATG TGT GTA TAT ATA TAC 
ACA CAT ATA TAT ATA TAT 
ATA TAT ATA TAT 

... dann die Gruppen zählen (wie Wortanzahl):

(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1) 

Das Problem ist, dass Die "Wörter" umfassen mehrere Zeilen, wie im obigen Beispiel TAC. Es umfasst den Zeilenumbruch. Ich möchte nicht nur die Gruppen in jeder Zeile zählen, sondern in der ganzen Datei Zeilenendungen ignorieren.

Mit anderen Worten, ich möchte die gesamte Sequenz als ein gleitendes Fenster der Breite k über die gesamte Datei verarbeiten, als ob es keine Zeilenumbrüche gab. Das Problem ist, in die nächste RDD-Zeile zu schauen (oder zurück), um ein Fenster zu vervollständigen, wenn ich am Ende einer Zeile angelangt bin.

Zwei Ideen, die ich hatte, waren:

  1. Anfügen k-1 Zeichen aus der nächsten Zeile:
ATATATAC 
ACATATAT 
ATATAT 

Ich habe versucht, dies mit der Spark-SQL Leitung() Funktion , aber als ich versucht habe, eine flatMap auszuführen, habe ich eine NotSerializableException für WindowSpec bekommen. Gibt es eine andere Möglichkeit, auf die nächste Zeile zu verweisen? Müsste ich ein benutzerdefiniertes Eingabeformat schreiben?

  1. die gesamte Sequenz Lesen in einer einzigen Zeile (oder Verbindungslinien nach dem Lesen):
ATATATACATATATATAT 

Gibt es eine Möglichkeit, mehrere zu lesen Linien, damit sie als eine verarbeitet werden können? Wenn ja, müsste alles in den Speicher einer einzelnen Maschine passen?

Ich realisiere, dass einer von diesen könnte als ein Vorverarbeitungsschritt getan werden. Ich habe mich gefragt, wie es am besten ist, es in Spark zu machen. Sobald ich es in einem dieser Formate habe, weiß ich, wie ich den Rest mache, aber ich stecke hier fest.

Antwort

1

Sie können eine rdd einzelner Zeichenfolge machen, da statt sie sie als eine Zeile, dass das Ergebnis eine Zeichenfolge machen, die nicht verteilt werden können:

val rdd = sc.textFile("gene.txt") 
// rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24 

So einfach flatMap verwenden, um die Linien zu spalten in Liste der Charaktere:

rdd.flatMap(_.split("")).collect 
// res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T) 

Eine vollständigere Lösung von this answer entlehnt:

val rdd = sc.textFile("gene.txt") 

// create the sliding 3 grams for each partition and record the edges 
val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => { 
    val slideList = iter.toList.sliding(3).toList 
    Iterator((slideList, (slideList.head, slideList.last))) 
}) 

// collect the edge values, concatenate edges from adjacent partitions and broadcast it 
val edgeValues = rdd1.values.collect 

val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => { 
    (x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList 
}} 

val sewedEdgesMap = sc.broadcast(
    (0 until rdd1.partitions.size) zip sewedEdges toMap 
) 

// sew the edge values back to the result 
rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))). 
    flatMap(_.map(_ mkString "")).collect 

// res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT) 
+0

Ich denke, die Frage bleibt: wie kann ich auf Elemente zwei Positionen vor dem aktuellen Element zugreifen. Also, wenn ich auf dem ersten Element 'A' bin, wie kann ich nach vorne schauen, um eine Gruppe der nächsten zwei zu machen: 'ATG'? Ich weiß, wenn es in einer Zeichenfolge oder einem Array ist, kann ich basierend auf dem Index nach vorne schauen und verketten, aber was ist mit RDD-Zeilen? – jcadcell

+0

Sie können auf [diese Antwort] verweisen (http://stackoverflow.com/questions/35154267/how-to-compute-cumulative-sum-using-spark) – Psidom

+0

Danke, das funktioniert. Ich muss darüber nachdenken, um zu verstehen, was vor sich geht. – jcadcell