2017-10-15 5 views
0

Ich habe einen Datensatz, der Arbeiter mit ihren demographischen Informationen wie Alter Geschlecht, Adresse usw. und ihren Arbeitsorten enthält. Ich habe eine RDD aus dem Dataset erstellt und in einen DataFrame konvertiert.Berechnung der Kosinusähnlichkeit zwischen allen Zeilen eines Datenrahmens in pyspark

Es gibt mehrere Einträge für jede ID. Daher habe ich einen DataFrame erstellt, der nur die ID des Mitarbeiters und die verschiedenen Bürostandorte enthält, an denen er/sie gearbeitet hat.

|----------|----------------| 
    | **ID** **Office_Loc** | 
    |----------|----------------| 
    | 1  |Delhi, Mumbai, | 
    |   | Gandhinagar | 
    |---------------------------| 
    | 2  | Delhi, Mandi | 
    |---------------------------| 
    | 3  |Hyderbad, Jaipur| 
    ----------------------------- 

Ich möchte die Kosinusähnlichkeit zwischen jedem Arbeiter mit jedem anderen Arbeiter basierend auf ihren Bürostandorten 'berechnen.

So, iteriert I durch die Zeilen der Datenrahmen, um eine einzelne Zeile aus dem Datenrahmen abzurufen:

myIndex = 1 
values = (ID_place_df.rdd.zipWithIndex() 
      .filter(lambda ((l, v), i): i == myIndex) 
      .map(lambda ((l,v), i): (l, v)) 
      .collect()) 

und dann map

cos_weight = ID_place_df.select("ID","office_location").rdd\ 
    .map(lambda x: get_cosine(values,x[0],x[1])) 

zu berechnet, um die Cosinus-Ähnlichkeit zwischen der extrahierten Reihe unter Verwendung und der ganze DataFrame.

Ich glaube nicht, dass mein Ansatz gut ist, da ich durch die Zeilen des DataFrame iteriere, es vereitelt den ganzen Zweck der Verwendung von Funken. Gibt es einen besseren Weg, es in Pyspark zu tun? Bitte beraten.

+0

Ich thibk es ein bisschen lon g Frage. Normalerweise ist es eine gute Übung, die Frage mit dem einfachsten Fall zu stellen, wenn Sie das gleiche Problem bekommen. – ChaosPredictor

Antwort

1

Sie können das mllib Paket verwenden, um die L2 Norm der TF-IDF jeder Zeile zu berechnen. Dann multiplizieren Sie die Tabelle mit sich selbst die Kosinusähnlichkeit als Punktprodukt von zwei durch zwei L2 Normen zu erhalten:

1. RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]]) 
  • Compute TF-IDF:

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(",")) 
    
    from pyspark.mllib.feature import HashingTF, IDF 
    hashingTF = HashingTF() 
    tf = hashingTF.transform(documents) 
    

Y Sie können die Anzahl der Features in HashingTF angeben, um die Feature-Matrix kleiner zu machen (weniger Spalten).

tf.cache() 
    idf = IDF().fit(tf) 
    tfidf = idf.transform(tf) 
  • Compute L2 norm:

    from pyspark.mllib.feature import Normalizer 
    labels = rdd.map(lambda l: l[0]) 
    features = tfidf 
    
    normalizer = Normalizer() 
    data = labels.zip(normalizer.transform(features)) 
    
  • Compute Kosinusähnlichkeit durch die Matrix mit sich selbst multipliziert wird:

    from pyspark.mllib.linalg.distributed import IndexedRowMatrix 
    mat = IndexedRowMatrix(data).toBlockMatrix() 
    dot = mat.multiply(mat.transpose()) 
    dot.toLocalMatrix().toArray() 
    
        array([[ 0.  , 0.  , 0.  , 0.  ], 
          [ 0.  , 1.  , 0.10794634, 0.  ], 
          [ 0.  , 0.10794634, 1.  , 0.  ], 
          [ 0.  , 0.  , 0.  , 1.  ]]) 
    

    OR: ein kartesisches Produkt und der Funktion dot auf numpy Arrays:

    data.cartesian(data)\ 
        .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\ 
        .sortByKey()\ 
        .collect() 
    
        [((1, 1), 1.0), 
        ((1, 2), 0.10794633570596117), 
        ((1, 3), 0.0), 
        ((2, 1), 0.10794633570596117), 
        ((2, 2), 1.0), 
        ((2, 3), 0.0), 
        ((3, 1), 0.0), 
        ((3, 2), 0.0), 
        ((3, 3), 1.0)] 
    

2.Datenrahmen

Da Sie scheinen Datenrahmen zu verwenden, können Sie das spark ml Paket verwenden statt:

import pyspark.sql.functions as psf 
df = rdd.toDF(["ID", "Office_Loc"])\ 
    .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ',')) 
  • Compute TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF 
    hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf") 
    tf = hashingTF.transform(df) 
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf) 
    tfidf = idf.transform(tf) 
    
  • Compute L2 Norm:

    from pyspark.ml.feature import Normalizer 
    normalizer = Normalizer(inputCol="feature", outputCol="norm") 
    data = normalizer.transform(tfidf) 
    
  • Compute Matrixprodukt:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix 
    mat = IndexedRowMatrix(
        data.select("ID", "norm")\ 
         .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix() 
    dot = mat.multiply(mat.transpose()) 
    dot.toLocalMatrix().toArray() 
    

    OR: eine Verknüpfung und eine UDF für Funktion dot:

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType()) 
    data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\ 
        .select(
         psf.col("i.ID").alias("i"), 
         psf.col("j.ID").alias("j"), 
         dot_udf("i.norm", "j.norm").alias("dot"))\ 
        .sort("i", "j")\ 
        .show() 
    
        +---+---+-------------------+ 
        | i| j|    dot| 
        +---+---+-------------------+ 
        | 1| 2|0.10794633570596117| 
        | 1| 3|    0.0| 
        | 2| 3|    0.0| 
        +---+---+-------------------+ 
    

Dieses Tutorial verschiedene Methoden auflistet großen Maßstab Matrizen zu multiplizieren: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e

+0

Vielen Dank für die Antwort. Ich schätze die Hilfe sehr. Aber der Code gibt mir eine Fehlermeldung "Anforderung fehlgeschlagen: Die Eingabespalte muss ArrayType sein, aber StringType.'". während der HashingTF-Transformation während der Verwendung des Datenrahmens. –

+0

Sie müssen die String-Liste zuerst in eine Wortliste aufteilen. Ich habe den Teil hinzugefügt, wie man 'df' erstellt: – MaFF

+0

Hallo, es funktioniert, wenn ich' data.cartesian (data) \ .map (lambda l: ((0 [0] [0], l [1] [0 ]), l [0] [1] .dot (l [1] [1]))) \ .sortByKey() \ .nehmen (5) '. Aber wenn ich den MLLIB-Code benutze und die blockMatrix in eine LocalMatrix umwandele, ist die Anforderung nicht erfüllt: Die Länge des Array-Wertes muss kleiner sein als Int.MaxValue. Momentan numRows * numCols: 1006095879729669481'' was ich nicht verstehe, da ich eine kleine Teilmenge der Daten nehme (etwa 10 IDs) also numRows * numCols: 100. –

Verwandte Themen