2016-03-09 18 views
16

hat Wenn ich einen Datenrahmen aus einer JSON-Datei in Funken SQL erstellen, wie kann ich, wenn eine bestimmte Spalte sagen existiert, bevor .selectWie erkenne ich, ob ein Funke Datenrahmen eine Spalte

Beispiel zum Beispiel json Schema

Aufruf
{ 
    "a": { 
    "b": 1, 
    "c": 2 
    } 
} 

Dies ist, was ich tun möchte:

potential_columns = Seq("b", "c", "d") 
df = sqlContext.read.json(filename) 
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column")) 

aber ich kann eine gute Funktion für hasColumn nicht finden. Der nächstgelegene ich bekommen habe, ist zu prüfen, ob die Spalte in diesem etwas umständlich Array ist:

scala> df.select("a.*").columns 
res17: Array[String] = Array(b, c) 

Antwort

7

Eigentlich müssen Sie nicht einmal, um Spalten zu verwenden, wählen Sie anrufen, können Sie es nur fordern die Datenrahmen

selbst
// define test data 
case class Test(a: Int, b: Int) 
val testList = List(Test(1,2), Test(3,4)) 
val testDF = sqlContext.createDataFrame(testList) 

// define the hasColumn function 
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName) 

// then you can just use it on the DF with a given column name 
hasColumn(testDF, "a") // <-- true 
hasColumn(testDF, "c") // <-- false 

Alternativ können Sie eine implizite Klasse mit der Zuhälter meine Bibliothek Muster definieren, so dass die hasColumn Methode auf Ihrem Datenrahmen verfügbar ist direkt

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) { 
    def hasColumn(colName: String) = df.columns.contains(colName) 
} 

Dann sind Sie c eine Verwendung als:

testDF.hasColumn("a") // <-- true 
testDF.hasColumn("c") // <-- false 
+4

kehrt nicht mit der Arbeit verschachtelte Spalten. von json '{" a ": {" b ": 1," c ": 0}}' – ben

2

Ihre andere Möglichkeit hierfür wäre eine Array-Manipulation zu tun (in diesem Fall ein intersect) auf den df.columns und Ihrem potential_columns.

// Loading some data (so you can just copy & paste right into spark-shell) 
case class Document(a: String, b: String, c: String) 
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF 

// The columns we want to extract 
val potential_columns = Seq("b", "c", "d") 

// Get the intersect of the potential columns and the actual columns, 
// we turn the array of strings into column objects 
// Finally turn the result into a vararg (: _*) 
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show 

Leider wird dies nicht für Ihr inneres Objekt Szenario oben funktionieren. Sie müssen das Schema dafür betrachten.

Ich werde Ihre potential_columns voll qualifizierte Spaltennamen

val potential_columns = Seq("a.b", "a.c", "a.d") 

// Our object model 
case class Document(a: String, b: String, c: String) 
case class Document2(a: Document, b: String, c: String) 

// And some data... 
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF 

// We go through each of the fields in the schema. 
// For StructTypes we return an array of parentName.fieldName 
// For everything else we return an array containing just the field name 
// We then flatten the complete list of field names 
// Then we intersect that with our potential_columns leaving us just a list of column we want 
// we turn the array of strings into column objects 
// Finally turn the result into a vararg (: _*) 
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show 

Dies nur eine Ebene geht tief zu ändern, so macht es generic Sie mehr arbeiten müßten.

38

Gehen Sie einfach davon aus, dass es existiert und lassen Sie es mit Try. Schlicht und einfach und unterstützt eine beliebige Verschachtelung:

import scala.util.Try 
import org.apache.spark.sql.DataFrame 

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess 

val df = sqlContext.read.json(sc.parallelize(
    """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil)) 

hasColumn(df, "foobar") 
// Boolean = false 

hasColumn(df, "foo") 
// Boolean = true 

hasColumn(df, "foo.bar") 
// Boolean = true 

hasColumn(df, "foo.bar.foobar") 
// Boolean = true 

hasColumn(df, "foo.bar.foobaz") 
// Boolean = false 

Oder noch einfacher:

val columns = Seq(
    "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz") 

columns.flatMap(c => Try(df(c)).toOption) 
// Seq[org.apache.spark.sql.Column] = List(
// foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13) 

Python-Äquivalent:

from pyspark.sql.utils import AnalysisException 
from pyspark.sql import Row 


def has_column(df, col): 
    try: 
     df[col] 
     return True 
    except AnalysisException: 
     return False 

df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF() 

has_column(df, "foobar") 
## False 

has_column(df, "foo") 
## True 

has_column(df, "foo.bar") 
## True 

has_column(df, "foo.bar.foobar") 
## True 

has_column(df, "foo.bar.foobaz") 
## False 
+1

Dies funktioniert auch mit strukturierten Feldern. Die Lösungen, die die 'contains'-Funktion verwenden, tun dies nicht! +1 –

+1

Danke, ich hätte diese Antwort akzeptiert! – sparker

1

Try ist nicht optimal, da die es den Ausdruck in Try auswertet bevor es die Entscheidung trifft.

Für große Datensätze, verwenden Sie die unten in Scala:

df.schema.fieldNames.contains("column_name") 
+0

Funktioniert nicht mit verschachtelten Daten. – user8371915

2

Eine weitere Option, die ich normalerweise verwenden

df.columns.contains("column-name-to-check") 

Dies ist ein boolean Dies gilt

+2

funktioniert nicht mit verschachtelten Spalten. – Sindhu

Verwandte Themen