2016-06-04 11 views
0

EINLEITUNGMerging Eingänge in verteilten Anwendung

Ich muss verteilte Anwendung schreiben, die für 3 Datensätze maximale Anzahl von eindeutigen Werten zählt. Ich habe keine Erfahrung in diesem Bereich und kenne keine Rahmenbedingungen. Meine Eingabe konnte sieht wie folgt:

u1: u2,u3,u4,u5,u6 
u2: u1,u4,u6,u7,u8 
u3: u1,u4,u5,u9 
u4: u1,u2,u3,u6 
... 

Dann beginnt der Ergebnisse sein sollte:

(u1,u2,u3), u4,u5,u6,u7,u8,u9 => count=6 
(u1,u2,u4), u3,u5,u6,u7,u8 => count=5 
(u1,u3,u4), u2,u5,u6,u9  => count=4 
(u2,u3,u4), u1,u5,u6,u7,u8,u9 => count=6 
... 

Also mein Ansatz ist es, zunächst jede zwei Datensätze zu verschmelzen, und dann verschmelzen jedes fusionierte Paar mit jedem einzelnen Aufzeichnung.

FRAGE

Kann ich eine solche Operation zu tun, wie (merge) auf mehr als einer Eingabereihe auf der gleichen Zeit in framewors wie hadoop/Funkenarbeits? Oder ist mein Ansatz falsch und ich sollte das anders machen?

Jeder Rat wird geschätzt.

+0

'Also mein Ansatz ist es, zunächst jede zwei Datensätze zu verschmelzen, implementieren und dann mit jedem einzelnen record.' jedes fusionierte Paar verschmelzen Dies ist genau das, was eine Reduzierung tut in einem typischen Map Reduce-Framework (z. B. Spark oder Hadoop MR) durch Anwenden einer Zusammenführungsfunktion auf die Werte. –

+0

@Hawknight Von MapReduce Tutorial auf Hadoop-Seite: "Reducer reduziert eine Reihe von Zwischenwerten, die einen Schlüssel zu einer kleineren Gruppe von Werten teilen." Es ist in meinem Fall nicht abgedeckt, weil ich Datensätze nicht wirklich zusammenführen möchte, sondern sie mehr miteinander kombiniere. Habe ich recht? –

+0

Ich bin mir nicht sicher, ob ich richtig verstanden habe, was Sie erreichen möchten, aber letztendlich liegt es an Ihnen zu definieren, wie Ihre Daten verarbeitet werden sollen. Wenn Sie möchten, dass mehrere Zeilen in Ihrer Eingabe gleichzeitig verarbeitet werden, müssen Sie ein eigenes InputFormat definieren, das eine bestimmte Anzahl von Zeilen liest und in Ihre Map-Operation einfügt.Ein häufigerer Anwendungsfall wäre jedoch, die Daten vorzuverarbeiten, um Ihre Zeilen zu kombinieren, und dann eine Stapelreduzierung für Stapel auszuführen. Oder um eine erste Karte zu erstellen, reduzieren Sie den Stapel, um Ihre Daten wie gewünscht zu sortieren und reduzieren Sie dann die Ergebnisse nach Schlüssel. –

Antwort

0

Kann ich einen solchen Vorgang wie das Arbeiten (Zusammenführen) von mehr als einer Eingangszeile gleichzeitig in Framewors wie hadoop/spark ausführen?

Ja, Sie können.

Oder vielleicht ist mein Ansatz falsch und ich sollte dies anders machen?

Es hängt von der Größe der Daten ab. Wenn Ihre Daten klein sind, ist es schneller und einfacher, dies lokal zu tun. Wenn Ihre Daten riesig sind, mindestens Hunderte von GBs, besteht die allgemeine Strategie darin, die Daten in HDFS (verteiltes Dateisystem) zu speichern und eine Analyse mit Mapreduce/Spark durchzuführen.

Ein Beispiel Funken Anwendung in scala geschrieben:

object MyCounter { 
     val sparkConf = new SparkConf().setAppName("My Counter") 
     val sc = new SparkContext(sparkConf) 

     def main(args: Array[String]) { 
     val inputFile = sc.textFile("hdfs:///inputfile.txt") 
     val keys = inputFile.map(line => line.substring(0, 2)) // get "u1" from "u1: u2,u3,u4,u5,u6" 

     val triplets = keys.cartesian(keys).cartesian(keys) 
      .map(z => (z._1._1, z._1._2, z._2)) 
      .filter(z => !z._1.equals(z._2) && !z._1.equals(z._3) && !z._2.equals(z._3)) // get "(u1,u2,u3)" triplets 

     // If you have small numbers of (u1,u2,u3) triplets, it's better prepare them locally. 

     val res = triplets.cartesian(inputFile).filter(z => { 
      z._2.startsWith(z._1._1) || z._2.startsWith(z._1._2) || z._2.startsWith(z._1._3) 
     }) // (u1,u2,u3) only matches line starts with u1,u2,u3, for example "u1: u2,u3,u4,u5,u6" 
      .reduceByKey((a, b) => a + b) // merge three lines 
      .map(z => { 
      val line = z._2 
      val values = line.split(",") 
      //count unique values using set 
      val set = new util.HashSet[String]() 
      for (value <- values) { 
      set.add(value) 
      } 
      "key=" + z._1 + ", count=" + set.size() // the result from one mapper is a string 
     }).collect() 

     for (line <- res) { 
      println(line) 
     } 
     } 
    } 
  1. Der Code ist nicht getestet. Und ist nicht effizient. Es kann einige Optimierung haben (zum Beispiel, entfernen Sie unnötige Map-Reduce-Schritte.)
  2. Sie können die gleiche Version mit Python/Java neu schreiben.
  3. Sie können die gleiche Logik Hadoop/MapReduce