5

Ausführen des Funkenhalbierenden kmmeans-Algorithmus in Zeppelin.Spark throws java.util.NoSuchElementException: Schlüssel nicht gefunden: 67

//I transform my data using the TF-IDF algorithm 

val idf = new IDF(minFreq).fit(data) 
val hashIDF_features = idf.transform(dbTF)  

//and parse the transformed data to the clustering algorithm. 

val bkm = new BisectingKMeans().setK(100).setMaxIterations(2) 
val model = bkm.run(hashIDF_features) 
val cluster_rdd = model.predict(hashIDF_features) 

ich diesen Fehler immer wenn:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 270.0 failed 4 times, most recent failure: Lost task 0.3 in stage 270.0 (TID 126885, IP): java.util.NoSuchElementException: key not found: 67 
    at scala.collection.MapLike$class.default(MapLike.scala:228) 
    at scala.collection.AbstractMap.default(Map.scala:58) 
    at scala.collection.MapLike$class.apply(MapLike.scala:141) 
    at scala.collection.AbstractMap.apply(Map.scala:58) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) 
    at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231) 
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
    at scala.collection.immutable.List.foldLeft(List.scala:84) 
    at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125) 
    at scala.collection.immutable.List.reduceLeft(List.scala:84) 
    at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231) 
    at scala.collection.AbstractTraversable.minBy(Traversable.scala:105) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:933) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$.org$apache$spark$mllib$clustering$BisectingKMeans$$summarize(BisectingKMeans.scala:261) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$run$1.apply$mcVI$sp(BisectingKMeans.scala:194) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at org.apache.spark.mllib.clustering.BisectingKMeans.run(BisectingKMeans.scala:189) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101) 

Im Funken 1.6.1 verwenden. Interessanterweise, wenn ich diesen Algorithmus auf einer eigenständigen Anwendung ausführen, trifft es keine Fehler, aber ich bekomme das in Zeppelin. Außerdem wurde die Eingabe von einem externen Algorithmus berechnet, so dass ich glaube, dass es sich nicht um ein Formatierungsproblem handelt. Irgendwelche Ideen?

Edit: Ich habe das System erneut getestet, indem Sie eine kleinere Anzahl von Clustern verwenden, und der Fehler tritt nicht auf. Warum würde der Algorithmus für große Clusterwerte brechen?

Antwort

1

Ich glaube, das Problem ist wegen closure. Wenn Sie Ihre Anwendung lokal ausführen, wird möglicherweise alles im selben Speicher/Prozess ausgeführt. Stellen Sie daher sicher, dass Sie nicht versuchen, auf eine lokale Variable von einer Clousre zuzugreifen, die möglicherweise in einem anderen Speicher/Prozess ausgeführt wird. This wird hilfreich sein, um Ihr Problem zu lösen.

+1

Ich habe die Befehle in Funktionen eingepackt, um den Abschluss sicherzustellen, aber das Problem besteht immer noch. – Mnemosyne

1

Ich habe auch das gleiche Problem, ich habe dieses Problem auch an Spark JIRA gemeldet, aber habe keine Antwort. https://issues.apache.org/jira/browse/SPARK-16473

+0

Hat jemand irgendeine Lösung dieses Problems? Oder die Ursache davon –

+0

Ich denke, das Problem ist Speicher bezogen, obwohl ich nicht genau weiß, wie. Ich ging zu einem stärkeren Cluster über und die Obergrenze wurde erhöht. Ich könnte zu 150-200 Clustern gehen und dann würde ich den gleichen Fehler erneut treffen. Wahrscheinlich ein Fehler im Algorithmus. Es ist schade, denn BKM ist so viel schneller als KM. – Mnemosyne

+0

Jeder weiß, wann diese bereits summierte Änderung in der Klasse "BisectingKmeans.scala" in "sbt-Repositories" aufgenommen wird. Ich brauche die aktualisierte Funktion, aber in der letzten sbt-Version von mllib (2.11) fehlt die Änderung. – eifersucht

Verwandte Themen