2014-05-21 2 views
9

Dies ist eine Frage, die ich already asked auf der Spark Benutzer Mailing-Liste habe und ich hoffe, hier mehr Erfolg zu bekommen .Umgehen org.apache.hadoop.mapred.InvalidInputException: Eingabe Muster s3n: // [...] entspricht 0 Dateien

Ich bin nicht sicher, dass es direkt mit Funken zusammenhängt, obwohl Funken etwas mit der Tatsache zu tun hat, dass ich dieses Problem nicht leicht lösen kann.

Ich versuche, einige Dateien von S3 mit verschiedenen Mustern zu bekommen. Mein Problem ist, dass einige dieser Muster nichts zurückgeben können, und wenn sie dies tun, erhalte ich die folgende Ausnahme:

org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58) 
    at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335) 
    ... 2 more 

Ich würde einen Weg wie fehlende Dateien zu ignorieren und einfach nichts tun, in diesem Fall. Das Problem hier IMO ist, dass ich nicht weiß, ob ein Muster etwas zurückgibt, bis es tatsächlich ausgeführt wird und Funke beginnt Daten nur zu verarbeiten, wenn eine Aktion auftritt (hier der reduceByKey Teil). Ich kann also nicht irgendwo einen Fehler entdecken und die Dinge weitermachen lassen.

Eine Lösung wäre, Funken zu zwingen, jeden Pfad einzeln zu verarbeiten, aber das wird wahrscheinlich Kosten in Bezug auf Geschwindigkeit und/oder Speicher zuteilen, so dass ich nach einer anderen Option suche, die effizient wäre.

Ich benutze funke 0.9.1. Dank

Antwort

4

Ok, ein bisschen in Funken und dank jemand graben mich auf die Funkenbenutzerliste Führung glaube, ich habe ich es:

sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration()) 
    .map(new Function<Tuple2<LongWritable, Text>, String>() { 
     @Override 
     public String call(Tuple2<LongWritable, Text> arg0) throws Exception { 
      return arg0._2.toString(); 
     } 
    }) 
    .count(); 

Und die EmptiableTextInputFormat, die die Magie tut:

import java.io.IOException; 
import java.util.Collections; 
import java.util.List; 

import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

public class EmptiableTextInputFormat extends TextInputFormat { 
    @Override 
    public List<InputSplit> getSplits(JobContext arg0) throws IOException { 
     try { 
      return super.getSplits(arg0); 
     } catch (InvalidInputException e) { 
      return Collections.<InputSplit> emptyList(); 
     } 
    } 
} 

Man könnte schließlich die Nachricht des InvalidInputException für ein wenig mehr Präzision überprüfen.

+0

Jede Art und Weise die gleiche Logik auf 'SparkContext.sequenceFile()' verwenden zu implementieren? –

2

Für jedermann einen schnellen Hack will, hier ist ein Beispiel sc.wholeTextFiles

def wholeTextFilesIgnoreErrors(path: String, sc: SparkContext): RDD[(String, String)] = { 
    // TODO This is a bit hacky, probabally ought to work out a better way using lower level hadoop api 

    sc.wholeTextFiles(path.split(",").filter(subPath => Try(sc.textFile(subPath).take(1)).isSuccess).mkString(",")) 
    } 
Verwandte Themen