2016-04-23 8 views
13

ich diesen Funken Programm haben, und ich werde versuchen, es nur die relevanten Teile funktioniert sehr gut auf meinem lokalen RechnerSpark-Programm gibt ungerade Ergebnisse, wenn auf Standalone-Cluster lief

# Split by delimiter , 
# If the file is in unicode, we need to convert each value to a float in order to be able to 
# treat it as a number 
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist() 

# start with K randomly selected points from the dataset 
# A centroid cannot be an actual data point or else the distance measure between a point and 
# that centroid will be zero. This leads to an undefined membership value into that centroid. 
centroids = points.takeSample(False, K, 34) 
#print centroids 
# Initialize our new centroids 
newCentroids = [[] for k in range(K)] 
tempCentroids = [] 
for centroid in centroids: 
    tempCentroids.append([centroid[N] + 0.5]) 
#centroids = sc.broadcast(tempCentroids) 

convergence = False 

ncm = NCM() 

while(not convergence): 
    memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m))) 
    cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value))) 
    # Memberships 
    T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c))) 
    I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0])) 
    F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0])) 
    # Components of new centroids 
    wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t)))) 
    #print "wTm = " + str(wTm.collect()) 
    print "at first reduce" 
    sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2)) 
    #print "sumwTm = " + str(sumwTm.collect()) 
    wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t)))) 
    print "adding to cnumerator list" 
    #print wTmx.collect() 
    cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values() 
    print "collected cnumerator, now printing"  
    #print "cnumerator = " + str(cnumerator.collect()) 
    #print str(sumwTm.collect()) 
    # Calculate the new centroids 
    sumwTmCollection = sumwTm.collect()[0][1] 
    cnumeratorCollection = cnumerator.collect() 
    #print "sumwTmCollection = " + str(sumwTmCollection) 
    #cnumeratorCollection =cnumerator.collectAsMap().get(0).items 
    print "cnumeratorCollection = " + str(cnumeratorCollection) 
    for i in range(len(newCentroids)): 
     newCentroids[i] = scalarMult(1/sumwTmCollection[i], [cnumeratorCollection[i]]) 
    centroids = newCentroids 
    # Test for convergence 
    convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon) 

    #convergence = True 
    # Replace our old centroids with the newly found centroids and repeat if convergence not met 
    # Clear out space for a new set of centroids 
    newCentroids = [[] for k in range(K)] 

Dieses Programm zu begrenzen, jedoch Es verhält sich nicht wie erwartet, wenn es auf einem eigenständigen Cluster ausgeführt wird. Es wirft nicht notwendigerweise einen Fehler auf, aber was es tut, ergibt eine andere Ausgabe als die, die ich beim Ausführen auf meinem lokalen Rechner erhalte. Der Cluster und die 3 Knoten scheinen gut zu funktionieren. Ich habe das Gefühl, das Problem ist, dass ich aktualisieren centroids, die eine Python-Liste ist, und es ändert sich jedes Mal durch die while-loop. Ist es möglich, dass nicht jeder Knoten die neueste Kopie dieser Liste hat? Ich denke so, also habe ich versucht, eine broadcast variable zu verwenden, aber diese kann nicht aktualisiert werden (nur lesen). Ich habe auch versucht, eine accumulator, aber das sind nur für Ansammlungen. Ich habe auch versucht, die Python-Listen als eine Datei auf hdfs für jeden Knoten zu speichern, um Zugriff zu haben, aber das hat nicht gut funktioniert. Glaubst du, ich verstehe das Problem richtig? Ist hier noch etwas los? Wie kann ich Code erhalten, der auf meinem lokalen Computer funktioniert, aber nicht auf einem Cluster?

+0

Entschuldigung, ich kann nicht herausfinden, wo Sie Ihre Centroids in Ihrem Code aktualisieren .. können Sie dies bitte zu mir markieren? Danke – mgaido

+0

Danke für das Betrachten. Es ist in Richtung der Unterseite. 'centroids = newCentroids'. –

+0

Ich würde mehr motiviert sein, Ihre Frage zu beantworten, wenn Sie Ihren Code bereinigt/verkleinert haben, und, noch wichtiger, ein Beispiel der Daten bereitgestellt haben, mit denen Ihr Skript unterschiedliche Ausgaben im Cluster liefert. –

Antwort

4

Vielen Dank für die ganze Zeit und Aufmerksamkeit für dieses Problem, vor allem da es klingt, als hätte ich mehr Informationen gepostet, um Ihre Arbeit einfacher zu machen. Das Problem hier ist in

centroids = points.takeSample(False, K, 34) 

ich nicht wusste, aber nach einem kurzen Versuch, diese Funktion liefert die gleiche Leistung jedes Mal, obwohl sie, was ich dachte, eine Stichprobe war. Solange Sie den gleichen Seed (34 in diesem Fall) verwenden, erhalten Sie die gleiche RDD als Gegenleistung. Die RDD in meinem Cluster war aus irgendeinem Grund anders als die auf meinem lokalen Rechner zurückgegeben. In jedem Fall, da es jedes Mal dieselbe RDD war, hat sich meine Ausgabe nie geändert. Das Problem mit den "zufälligen" Zentroiden, die mir zurückgegeben werden, ist, dass diese besonderen zu etwas wie einem Sattelpunkt in der Mathematik führten, wo keine Konvergenz der Zentroide gefunden werden würde. Dieser Teil der Antwort ist mathematisch und ein Programmieren, also werde ich es nicht weiter erwähnen. Meine wirkliche Hoffnung an dieser Stelle ist, dass andere von der Vorstellung geholfen werden, dass, wenn Sie

wollen
centroids = points.takeSample(False, K, 34) 

produzieren verschiedene Proben jedes Mal aufgerufen wird, dass Sie Ihre Samen jedes Mal zu einem gewissen Zufallszahl ändern.

Ich hoffe, das alles hilft. Ich habe noch nie so viel Zeit mit einer Lösung für mein Gedächtnis verbracht.

Nochmals vielen Dank.

Verwandte Themen