2016-04-01 2 views
5

Gegeben 2 riesige Liste von Werten, ich versuche jaccard similarity zwischen ihnen in Spark mit Scala zu berechnen.Spark Jaccard Ähnlichkeitsberechnung durch min Hashing langsam im Vergleich zu trivialen Ansatz

Angenommen colHashed1 enthält die erste Liste von Werten und colHashed2 enthält die zweite Liste.

Ansatz 1 (Trivial Ansatz):

val jSimilarity = colHashed1.intersection(colHashed2).distinct.count/(colHashed1.union(colHashed2).distinct.count.toDouble) 

Ansatz 2 (unter Verwendung von minHashing):

ich den Ansatz verwendet haben erklärt here.

import java.util.zip.CRC32 

def getCRC32 (s : String) : Int = 
{ 
    val crc=new CRC32 
    crc.update(s.getBytes) 
    return crc.getValue.toInt & 0xffffffff 
} 

val maxShingleID = Math.pow(2,32)-1 
def pickRandomCoeffs(kIn : Int) : Array[Int] = 
{ 
    var k = kIn 
    val randList = Array.fill(k){0} 

    while(k > 0) 
    { 
    // Get a random shingle ID. 

    var randIndex = (Math.random()*maxShingleID).toInt 

    // Ensure that each random number is unique. 
    while(randList.contains(randIndex)) 
    { 
     randIndex = (Math.random()*maxShingleID).toInt 
    } 

    // Add the random number to the list. 
    k = k - 1 
    randList(k) = randIndex 
    } 

    return randList 
} 

val colHashed1 = list1Values.map(a => getCRC32(a)) 
val colHashed2 = list2Values.map(a => getCRC32(a)) 

val nextPrime = 4294967311L 
val numHashes = 10 

val coeffA = pickRandomCoeffs(numHashes) 
val coeffB = pickRandomCoeffs(numHashes) 

var signature1 = Array.fill(numHashes){0} 
for (i <- 0 to numHashes-1) 
{ 
    // Evaluate the hash function. 
    val hashCodeRDD = colHashed1.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) 

    // Track the lowest hash code seen. 
    signature1(i) = hashCodeRDD.min.toInt 
} 

var signature2 = Array.fill(numHashes){0} 
for (i <- 0 to numHashes-1) 
{ 
    // Evaluate the hash function. 
    val hashCodeRDD = colHashed2.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) 

    // Track the lowest hash code seen. 
    signature2(i) = hashCodeRDD.min.toInt 
} 


var count = 0 
// Count the number of positions in the minhash signature which are equal. 
for(k <- 0 to numHashes-1) 
{ 
    if(signature1(k) == signature2(k)) 
    count = count + 1 
} 
val jSimilarity = count/numHashes.toDouble 

Ansatz 1 scheint immer Ansatz 2 in Bezug auf die Zeit zu übertreffen. Wenn ich den Code analysierte, benötigt der Funktionsaufruf min() auf dem RDD in Approach 2 erhebliche Zeit, und diese Funktion wird oft aufgerufen, je nachdem wie viele Hash-Funktionen verwendet werden.

Die in Ansatz 1 verwendeten Schnitt- und Vereinigungsoperationen scheinen im Vergleich zu den wiederholten min() - Funktionsaufrufen schneller zu funktionieren.

Ich verstehe nicht, warum minHashing hier nicht hilft. Ich habe erwartet, dass minHashing im Vergleich zu trivialem Ansatz schneller arbeitet. Was mache ich hier falsch?

Beispieldaten können here

+0

Können Sie Beispieldaten für Ihre hinzufügen col1 und col2 im Datensatz? – tuxdna

+0

@tuxdna Beispieldatenlink hinzugefügt am Ende der Frage – CRM

Antwort

0

JaccardSimilarity mit MinHash nicht geben konsistente Ergebnisse zu sehen:

import java.util.zip.CRC32 

object Jaccard { 
    def getCRC32(s: String): Int = { 
    val crc = new CRC32 
    crc.update(s.getBytes) 
    return crc.getValue.toInt & 0xffffffff 
    } 

    def pickRandomCoeffs(kIn: Int, maxShingleID: Double): Array[Int] = { 
    var k = kIn 
    val randList = Array.ofDim[Int](k) 

    while (k > 0) { 
     // Get a random shingle ID. 
     var randIndex = (Math.random() * maxShingleID).toInt 
     // Ensure that each random number is unique. 
     while (randList.contains(randIndex)) { 
     randIndex = (Math.random() * maxShingleID).toInt 
     } 
     // Add the random number to the list. 
     k = k - 1 
     randList(k) = randIndex 
    } 
    return randList 
    } 


    def approach2(list1Values: List[String], list2Values: List[String]) = { 

    val maxShingleID = Math.pow(2, 32) - 1 

    val colHashed1 = list1Values.map(a => getCRC32(a)) 
    val colHashed2 = list2Values.map(a => getCRC32(a)) 

    val nextPrime = 4294967311L 
    val numHashes = 10 

    val coeffA = pickRandomCoeffs(numHashes, maxShingleID) 
    val coeffB = pickRandomCoeffs(numHashes, maxShingleID) 

    val signature1 = for (i <- 0 until numHashes) yield { 
     val hashCodeRDD = colHashed1.map(ele => (coeffA(i) * ele + coeffB(i)) % nextPrime) 
     hashCodeRDD.min.toInt // Track the lowest hash code seen. 
    } 

    val signature2 = for (i <- 0 until numHashes) yield { 
     val hashCodeRDD = colHashed2.map(ele => (coeffA(i) * ele + coeffB(i)) % nextPrime) 
     hashCodeRDD.min.toInt // Track the lowest hash code seen 
    } 

    val count = (0 until numHashes) 
     .map(k => if (signature1(k) == signature2(k)) 1 else 0) 
     .fold(0)(_ + _) 


    val jSimilarity = count/numHashes.toDouble 
    jSimilarity 
    } 


    // def approach1(list1Values: List[String], list2Values: List[String]) = { 
    // val colHashed1 = list1Values.toSet 
    // val colHashed2 = list2Values.toSet 
    // 
    // val jSimilarity = colHashed1.intersection(colHashed2).distinct.count/(colHashed1.union(colHashed2).distinct.count.toDouble) 
    // jSimilarity 
    // } 


    def approach1(list1Values: List[String], list2Values: List[String]) = { 
    val colHashed1 = list1Values.toSet 
    val colHashed2 = list2Values.toSet 

    val jSimilarity = (colHashed1 & colHashed2).size/(colHashed1 ++ colHashed2).size.toDouble 
    jSimilarity 
    } 

    def main(args: Array[String]) { 

    val list1Values = List("a", "b", "c") 
    val list2Values = List("a", "b", "d") 

    for (i <- 0 until 5) { 
     println(s"Iteration ${i}") 
     println(s" - Approach 1: ${approach1(list1Values, list2Values)}") 
     println(s" - Approach 2: ${approach2(list1Values, list2Values)}") 
    } 

    } 
} 

OUTPUT:

Iteration 0 
- Approach 1: 0.5 
- Approach 2: 0.5 
Iteration 1 
- Approach 1: 0.5 
- Approach 2: 0.5 
Iteration 2 
- Approach 1: 0.5 
- Approach 2: 0.8 
Iteration 3 
- Approach 1: 0.5 
- Approach 2: 0.8 
Iteration 4 
- Approach 1: 0.5 
- Approach 2: 0.4 

Warum sind Sie es verwenden?

+0

Ansatz 1 gibt den genauen Wert der Jaccard Ähnlichkeit. Ansatz 2 ist eine Annäherung. Es wird erwartet, dass Ansatz 2 für die Berechnung viel einfacher ist als Ansatz 1. Wenn wir den "numHashes" -Wert in Ansatz 2 abstimmen, können wir die Approximation näher an die tatsächliche jaccard-Ähnlichkeit annähern, die durch Ansatz 1 gegeben ist. Das Problem ist Näherung scheint rechenintensiv zu sein als genau – CRM

0

Es scheint mir, dass die Gemeinkosten für minHashing Ansatz überwiegt nur seine Funktionalität in Spark. Zumal sich numHashes erhöht. Hier sind einige Beobachtungen, die ich in Ihrem Code gefunden habe:

Zuerst wird while (randList.contains(randIndex)) dieser Teil wird sicherlich verlangsamen Sie Ihren Prozess als numHashes (die übrigens der Größe von randList entspricht) erhöht.

Zweitens können Sie Zeit sparen, wenn Sie diesen Code neu schreiben:

var signature1 = Array.fill(numHashes){0} 
for (i <- 0 to numHashes-1) 
{ 
    // Evaluate the hash function. 
    val hashCodeRDD = colHashed1.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) 

    // Track the lowest hash code seen. 
    signature1(i) = hashCodeRDD.min.toInt 
} 

var signature2 = Array.fill(numHashes){0} 
for (i <- 0 to numHashes-1) 
{ 
    // Evaluate the hash function. 
    val hashCodeRDD = colHashed2.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) 

    // Track the lowest hash code seen. 
    signature2(i) = hashCodeRDD.min.toInt 
} 


var count = 0 
// Count the number of positions in the minhash signature which are equal. 
for(k <- 0 to numHashes-1) 
{ 
    if(signature1(k) == signature2(k)) 
    count = count + 1 
} 

in

var count = 0 
for (i <- 0 to numHashes - 1) 
{ 
    val hashCodeRDD1 = colHashed1.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) 
    val hashCodeRDD2 = colHashed2.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) 

    val sig1 = hashCodeRDD1.min.toInt 
    val sig2 = hashCodeRDD2.min.toInt 

    if (sig1 == sig2) { count = count + 1 } 
} 

Dieses Verfahren die drei Schleifen in eine vereinfacht. Ich bin mir jedoch nicht sicher, ob dies einen enormen Schub für die Rechenzeit bedeuten würde.

val colHashed1_dist = colHashed1.distinct 
val colHashed2_dist = colHashed2.distinct 
val intersect_cnt = colHashed1_dist.intersection(colHashed2_dist).distinct.count 

val jSimilarity = intersect_cnt/(colHashed1_dist.count + colHashed2_dist.count - intersect_cnt).toDouble 

mit, dass, anstatt sich die Vereinigung: noch erweist sich als viel schneller ist die Eigenschaft von Sätzen zu verwenden, um den ersten Ansatz zu ändern

Ein weiterer Vorschlag, den ich habe, dass der erste Ansatz unter der Annahme, , Sie können einfach den Wert der Kreuzung wiederverwenden.

+0

Ich stimme Ihren Punkten zu. Aber wie Sie sagten, hatten die Änderungen in der Code-Organisation keine wesentlichen Auswirkungen. Ich frage mich immer noch, ob es eine bessere minHash-Implementierung in Spark gibt – CRM

0

Eigentlich würden Sie in LSH procrach minHash nur einmal für jedes Ihrer Dokumente berechnen und dann zwei minHases für jedes mögliche Paar von Dokumenten vergleichen. Und im Falle eines trivialen Ansatzes würden Sie einen vollständigen Vergleich der Dokumente für jedes mögliche Paar von Dokumenten durchführen. Das ist ungefähr N^2/2 Anzahl der Vergleiche. Daher sind zusätzliche Kosten für die Berechnung von minHashes für eine ausreichend große Anzahl von Dokumenten vernachlässigbar.

Sie sollten eigentlich die Leistung des triviale Ansatz vergleichen:

val jSimilarity = colHashed1.intersection(colHashed2).distinct.count/(colHashed1.union(colHashed2).distinct.count.toDouble) 

und die Leistung der Berechnung Jaccard Abstand (letzte Zeile im Code):

var count = 0 
// Count the number of positions in the minhash signature which are equal. 
for(k <- 0 to numHashes-1) 
{ 
    if(signature1(k) == signature2(k)) 
    count = count + 1 
} 
val jSimilarity = count/numHashes.toDouble 
Verwandte Themen