2017-05-09 2 views
0

Ich versuche pyspark ausgeschüttete-kmodes Beispiel auszuführen:pysparkDistributedKmodes lib Fehler

import numpy as np 
data = np.random.choice(["a", "b", "c"], (50000, 10)) 
data2 = np.random.choice(["e", "f", "g"], (50000, 10)) 
data = list(data) + list(data2) 

from random import shuffle 
shuffle(data) 

# Create a Spark RDD from our sample data and decrease partitions to max_partions 
max_partitions = 32 

rdd = sc.parallelize(data) 
rdd = rdd.coalesce(max_partitions) 

for x in rdd.take(10): 
    print x 

method = EnsembleKModes(n_clusters, max_iter) 
model = method.fit(df.rdd) 

print(model.clusters) 
print(method.mean_cost) 

predictions = method.predictions 
datapoints = method.indexed_rdd 
combined = datapoints.zip(predictions) 
print(combined.take(10)) 

model.predict(rdd).take(5) 

Ich verwende Python 2.7, Apache Zeppelin 0.7.1 und Apache Spark 2.1.0.

Dies ist der Ausgangsfehler:

('Iteration ', 0) 

Traceback (most recent call last): 
     File "/tmp/zeppelin_pyspark-1298251609305129154.py", line 349, in <module> 
     raise Exception(traceback.format_exc()) 
    Exception: Traceback (most recent call last): 
     File "/tmp/zeppelin_pyspark-1298251609305129154.py", line 337, in <module> 
     exec(code) 
     File "<stdin>", line 13, in <module> 
     File "/usr/local/lib/python2.7/dist-packages/pyspark_kmodes/pyspark_kmodes.py", line 430, in fit 
     self.n_clusters,self.max_dist_iter) 
     File "/usr/local/lib/python2.7/dist-packages/pyspark_kmodes/pyspark_kmodes.py", line 271, in k_modes_partitioned 
     clusters = check_for_empty_cluster(clusters, rdd) 
     File "/usr/local/lib/python2.7/dist-packages/pyspark_kmodes/pyspark_kmodes.py", line 317, in check_for_empty_cluster 
     random_element = random.choice(clusters[biggest_cluster].members) 
     File "/usr/lib/python2.7/random.py", line 275, in choice 
     return seq[int(self.random() * len(seq))] # raises IndexError if seq is empty 
    IndexError: list index out of range 

die RDD verwendet, um das Modell zu passen nicht leer ist, ich habe es überprüft. Ich denke, das ist ein Versionsinkompatibilitätsproblem zwischen pyspark-verteilten-kmodes und spark, aber ich kann Spark nicht downgraden.

Irgendeine Idee, wie man es repariert?

Antwort

0

Was ist df? Sieht nicht wie ein Funkenfehler aus. Der Code von https://github.com/ThinkBigAnalytics/pyspark-distributed-kmodes funktioniert bei mir unter Spark 2.1.0. Auch wenn ich diese Codezeile von dir ändere funktioniert es auch:

method = EnsembleKModes(n_clusters, max_iter) 
model = method.fit(rdd) 
+0

Sie sind richtig, df ist ein Fehler. Jetzt funktioniert es gut. Vielen Dank –

Verwandte Themen