2016-03-28 14 views
1

Kann mir jemand sagen, wie Abfragen mit Spark-Shell für CSV-Datei geschrieben werden?Schreiben von Abfragen für CSV-Datei in Spark-Shell

Was habe ich erreicht war eine CSV-Datei mit databricks Bibliothek zu lesen und ein Datenrahmen erstellen, wie unten dargestellt:

./spark-shell --packages.com.databricks:spark-csv_2.10:1.4.0 
import org.apache.spark.sql.SQLContext 
val sqlContect = new SQLContext(sc) 
val df = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true").load("mylocalpath.csv") 

Dann kann ich df.printSchema tun() und andere datafram Operationen ohne Probleme. Aber ich habe mich gefragt, wie kann ich ein paar Fragen schreiben?

ich die Anweisung auf http://spark.apache.org/docs/latest/sql-programming-guide.html sah und es erwähnt etwas über das Schema Programmatically angeben, folgte ich ihr Verfahren und nur CSV-Datei insteading von Text-Datei zu lesen, aber wenn ich val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) tat, bekam ich einen Fehlerwert Split sagen, ist nicht ein Mitglied von org.apache.spark.sql.Row. Wie kann ich dieses Problem beheben?

Und wenn es eine einfachere Methode zum Schreiben von SQL-Abfragen gibt, lass es mich wissen. Was ich letztendlich tun möchte, ist so etwas wie zwei Spalten auszuwählen, eine für ID, eine für Preis und returen den höchsten Preis so einfach.

df.printSchema() sieht wie folgt aus:

|-- TAXROLL_NUMBER: string (nullable = true)
|-- BUILDING_NAME: string (nullable = true)

|-- ASSESSED_VALUE: string (nullable = true)

|-- STREET_NAME: string (nullable = true)

|-- POSTAL_CODE: string (nullable = true)

|-- CITY: string (nullable = true)

|-- BUILD_YEAR: string (nullable = true)

|-- Lon: string (nullable = true)

|-- Lat: string (nullable = true)

+0

Können Sie die Frage bearbeiten und zeigen, was die Ausgabe von 'printSchema' ist? Sobald Sie einen gültigen 'DataFrame' mit einem gültigen Schema haben, können Sie mit der Abfrage beginnen. Wenn Sie das Schema drucken, zeige ich Ihnen wie. –

+0

Ja, ich habe meinen Post bearbeitet – teddy

Antwort

2

Ich bin mir nicht sicher, ob ich Sie vollständig folgen, aber vielleicht alles, was Sie brauchen, ist

df.registerTempTable("TblName") //temp table registration 

oder

df.saveAsTable("TblName") //actual physical table registration 

und Abfrage mit

sqlContext.sql("select * from TblName limit 100").take(100).foreach(println) 

oder einer anderen Funken SQL-Abfrage.

Ich denke, Ihr Problem resultiert aus dem Versuch, RDD Arbeit nach dem Lesen der CSV mit Spark-CSV-Paket zu tun. Der Typ, den dieses Paket zurückgibt, ist wie angegeben - org.apache.spark.sql.Row. Sie können die RDD-Methode einfach verwenden - lesen Sie einfach die CSV-Datei mit Textdatei. Beispiel:

case class tmpSchema(TAXROLL_NUMBER: String, BUILDING_NAME: String, ASSESSED_VALUE: String, STREET_NAME: String, CITY: String) // etc. 
val toTable = sc.textFile(pathString).map(_.split(",")).map(p => tmpSchema(p(0), p(1) ,p(2), p(3), p(4)). toDF.registerTempTable("tblName2") 

Diese Methode erfordert nicht die Verwendung des Databricks csv-Pakets. Auf der anderen Seite, wenn Ihre Daten einige Verkapselungen und das Entweichen von Zeichen haben, verwenden Sie besser das CSV-Paket.

0

Java-Code Spark 2.0.0

package com.example.SparkReadFile; 

    import org.apache.spark.sql.Dataset; 
    import org.apache.spark.sql.Row; 
    import org.apache.spark.sql.SparkSession; 
    import org.apache.spark.sql.functions; 


    public class Driver 
    { 
     public static void main(String[] args) throws Exception { 
      SparkSession spark = SparkSession 
        .builder() 
        .appName("Csv reader") 
        .master("local") 
        // .enableHiveSupport() 
        .getOrCreate(); 
      Dataset<Row> df = spark.read() 
        .format("csv") 
        .option("header", "true") 
        .option("nullValue", "") 
        .csv("file:///Users/karuturi/Desktop/sample.csv"); 

      df.registerTempTable("people"); //temp table registration 

      Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); 
      sqlDF.show(); 
      } 
    } 
Verwandte Themen