1

Könnte jemand ein Beispiel mit pyspark angeben, wie eine benutzerdefinierte Apache Phoenix SQL-Abfrage ausgeführt und das Ergebnis dieser Abfrage in einer RDD oder DF gespeichert wird. Hinweis: Ich suche nach einer benutzerdefinierten Abfrage und nicht nach einer vollständigen Tabelle, die in eine RDD gelesen werden soll.Ausführen einer benutzerdefinierten Apache Phoenix SQL-Abfrage in PySpark

Von Phoenix-Dokumentation, eine ganze Tabelle laden ich diese verwenden:

table = sqlContext.read \ 
     .format("org.apache.phoenix.spark") \ 
     .option("table", "<TABLENAME>") \ 
     .option("zkUrl", "<hostname>:<port>") \ 
     .load() 

ich wissen will, was für die Verwendung einer benutzerdefinierten SQL

sqlResult = sqlContext.read \ 
      .format("org.apache.phoenix.spark") \ 
      .option("sql", "select * from <TABLENAME> where <CONDITION>") \ 
      .option("zkUrl", "<HOSTNAME>:<PORT>") \ 
      .load() 

Dank der entsprechenden entspricht.

Antwort

1

Dies kann getan werden, Phoenix als JDBC-Datenquelle verwendet, wie unten angegeben:

sql = '(select COL1, COL2 from TABLE where COL3 = 5) as TEMP_TABLE' 

df = sqlContext.read.format('jdbc')\ 
     .options(driver="org.apache.phoenix.jdbc.PhoenixDriver", url='jdbc:phoenix:<HOSTNAME>:<PORT>', dbtable=sql).load() 

df.show() 

jedoch beachtet werden, dass sollte, wenn es Spaltenaliasnamen in der SQL-Anweisung dann dem. show() - Anweisung würde eine Ausnahme auslösen (Es funktioniert, wenn Sie .select() verwenden, um die Spalten auszuwählen, die keinen Alias ​​enthalten). Dies ist ein möglicher Fehler in Phoenix.

+0

Ist dies eine Antwort oder ein Teil der Frage? – YOU

+0

Beide. Es benutzte JDBC, um das zu erreichen, was ich machen wollte, aber die Phoenix Spark-Option wäre besser, daher mein Versuch und die entsprechende Fehlermeldung. –

+0

Frage sollte im ersten Beitrag bearbeitet werden, weil dies der Antwortabschnitt ist. stackoverflow ist nicht wie normale Foren. – YOU

0

Hier müssen Sie .SQL verwenden, um mit benutzerdefinierten Abfragen zu arbeiten. Hier ist Syntax

dataframe = sqlContext.sql("select * from <table> where <condition>") 
dataframe.show() 
+0

Das würde nicht funktionieren, weil wir Spark nicht sagen, Phoenix überall zu benutzen. Dies ist ein Link zu Phoenix Dokumentation https://phoenix.apache.org/phoenix_spark.html –

Verwandte Themen