2016-10-14 3 views
0

Ich habe eine CSV-Datei mit Staat, Alter, Geschlecht, Gehalt usw. als unabhängige Variablen.Funken Umwandlung CSV in libsvm Format

Abhängige Variable ist Churn.

In Spark müssen wir den Datenrahmen in libsvm-Format konvertieren. Kannst du mir sagen, wie es geht?

LIBSVM Format lautet: 0 128: 51

als Feature VALUE HIER bedeutet, dass es WERT 51 IN DEN 128.

+0

Beschreiben Sie Ihr Problem genauer. Ich folge deiner Idee nicht. – Jerry

Antwort

0

Ich war hadoop für die Verwendung derselben aber Logik sollte gleich sein. Ich habe ein Beispiel für Ihren Anwendungsfall erstellt. Hier erstelle ich zuerst einen Datenrahmen und entferne dann alle Zeilen, die entweder Null- oder Leerwerte haben. Danach erstellen RDD und konvertieren Row in libsvm-Format. "repartition (1)" bedeutet, dass alles nur in eine Datei geht. Es wird eine resultierende Spalte geben, z. Im Falle einer CTR-Vorhersage wird es nur 1 oder 0 sein.

Beispieldatei Eingang:

"zip","city","state","latitude","longitude","timezone","dst" 
"00210","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00211","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00212","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00213","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00214","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00215","Portsmouth","NH","43.005895","-71.013202","-5","1" 
"00501","Holtsville","NY","40.922326","-72.637078","-5","1" 
"00544","Holtsville","NY","40.922326","-72.637078","-5","1" 

public class LibSvmConvertJob { 

    private static final String SPACE = " "; 
    private static final String COLON = ":"; 

    public static void main(String[] args) { 

     SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("Libsvm Convertor"); 

     JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 

     SQLContext sqlContext = new SQLContext(javaSparkContext); 

     DataFrame inputDF = sqlContext.read().format("com.databricks.spark.csv").option("header", "true") 
       .load("/home/raghunandangupta/inputfiles/zipcode.csv"); 

     inputDF.printSchema(); 

     sqlContext.udf().register("convertToNull", (String v1) -> (v1.trim().length() > 0 ? v1.trim() : null), DataTypes.StringType); 

     inputDF = inputDF.selectExpr("convertToNull(zip)","convertToNull(city)","convertToNull(state)","convertToNull(latitude)","convertToNull(longitude)","convertToNull(timezone)","convertToNull(dst)").na().drop(); 

     inputDF.javaRDD().map(new Function<Row, String>() { 
      private static final long serialVersionUID = 1L; 
      @Override 
      public String call(Row v1) throws Exception { 
       StringBuilder sb = new StringBuilder(); 
       sb.append(hashCode(v1.getString(0))).append("\t") //Resultant column 
       .append("1"+COLON+hashCode(v1.getString(1))).append(SPACE) 
       .append("2"+COLON+hashCode(v1.getString(2))).append(SPACE) 
       .append("3"+COLON+hashCode(v1.getString(3))).append(SPACE) 
       .append("4"+COLON+hashCode(v1.getString(4))).append(SPACE) 
       .append("5"+COLON+hashCode(v1.getString(5))).append(SPACE) 
       .append("6"+COLON+hashCode(v1.getString(6))); 
       return sb.toString(); 
      } 
      private String hashCode(String value) { 
       return Math.abs(Hashing.murmur3_32().hashString(value, StandardCharsets.UTF_8).hashCode()) + ""; 
      } 
     }).repartition(1).saveAsTextFile("/home/raghunandangupta/inputfiles/zipcode"); 

    } 
} 
+0

Vielen Dank :) – Nirmal

+0

Bitte korrekt und positiv bewerten. TY – cody123

0
/* 
/Users/mac/matrix.txt 
1 0.5 2.4 3.0 
1 99 34 6454 
2 0.8 3.0 4.5 
*/ 
def concat(a:Array[String]):String ={ 
    var result=a(0)+" " 
    for(i<-1 to a.size.toInt-1) 
    result=result+i+":"+a(i)(0)+" " 
    return result 
} 
val rfile=sc.textFile("file:///Users/mac/matrix.txt") 
val f=rfile.map(line => line.split(' ')).map(i=>concat(i)) 

Ich glaube, ich eine viel einfachere Lösung.