2017-07-06 2 views
2

Ich habe ein Szenario, um zwei verschiedene Tabellen Quelle und Ziel von zwei separaten Remote-Hive-Server zu vergleichen, können wir zwei SparkSessions etwas wie ich habe versucht, unten: -Können wir mehrere Sparksessions verwenden, um auf zwei verschiedene Hive-Server zuzugreifen

val spark = SparkSession.builder().master("local") 
    .appName("spark remote") 
    .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false") 
    .config("javax.jdo.option.ConnectionUserName", "hiveroot") 
    .config("javax.jdo.option.ConnectionPassword", "hivepassword") 
    .config("hive.exec.scratchdir", "/tmp/hive/${user.name}") 
    .config("hive.metastore.uris", "thrift://192.168.175.160:9083") 
    .enableHiveSupport() 
    .getOrCreate() 

SparkSession.clearActiveSession() 
SparkSession.clearDefaultSession() 

val sparkdestination = SparkSession.builder() 
    .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.42:3306/metastore?useSSL=false") 
    .config("javax.jdo.option.ConnectionUserName", "hiveroot") 
    .config("javax.jdo.option.ConnectionPassword", "hivepassword") 
    .config("hive.exec.scratchdir", "/tmp/hive/${user.name}") 
    .config("hive.metastore.uris", "thrift://192.168.175.42:9083") 
    .enableHiveSupport() 
    .getOrCreate() 

ich habe versucht, mit SparkSession.clearActiveSession() and SparkSession.clearDefaultSession() aber es funktioniert nicht, wirft den Fehler unten:

Hive: Failed to access metastore. This class should not accessed in runtime. 
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 

gibt es eine andere Art und Weise wir erreichen können zwei hive Tabellen Zugriff auf mehrere mit SparkSessions oder SparkContext.

Dank

Antwort

1

Blick auf SparkSessiongetOrCreate method

die besagen, dass

bekommt ein existent [[SparkSession]] oder, wenn es keine vorhandenen ist, ein neues auf der Basis schafft Optionen in diesem Builder festgelegt.

Diese Methode prüft zuerst, ob es eine gültige Thread-lokale SparkSession gibt, und wenn ja, geben Sie diese zurück. Es prüft dann, ob eine gültige globale Standard-SparkSession vorhanden ist, und wenn ja, geben Sie zurück. Wenn kein gültiger globaler Standard SparkSession vorhanden ist, erstellt die Methode eine neue SparkSession und weist die neu erstellte SparkSession als globalen Standard zu. Wenn eine vorhandene SparkSession zurückgegeben wird, werden die in diesem Builder angegebenen Konfigurationsoptionen auf die vorhandene SparkSession angewendet.

Das ist der Grund, warum die erste Sitzung und ihre Konfigurationen zurückgegeben werden.

Bitte gehen Sie durch die docs auf alternative Wege zu finden, um Sitzung zu erstellen ..


Ich bin auf < 2 Funken Version arbeiten. So bin ich nicht sicher, wie neue Sitzung mit aus Kollision der Konfiguration genau erstellen ..

Aber hier ist nützlich Testfall also SparkSessionBuilderSuite.scala dass- DIY ..

Beispiel Methode zu tun in dass Testfall

test("use session from active thread session and propagate config options") { 
    val defaultSession = SparkSession.builder().getOrCreate() 
    val activeSession = defaultSession.newSession() 
    SparkSession.setActiveSession(activeSession) 
    val session = SparkSession.builder().config("spark-config2", "a").getOrCreate() 

    assert(activeSession != defaultSession) 
    assert(session == activeSession) 
    assert(session.conf.get("spark-config2") == "a") 
    assert(session.sessionState.conf == SQLConf.get) 
    assert(SQLConf.get.getConfString("spark-config2") == "a") 
    SparkSession.clearActiveSession() 

    assert(SparkSession.builder().getOrCreate() == defaultSession) 
    SparkSession.clearDefaultSession() 
    } 
+0

Ich kann die Methode newSession() in SparkSession nicht finden. Gibt es Beispiele? Bitte. – Vignesh

+0

Ich habe nach Code geforscht, der eine Funkensession erzeugt und oben einen Zeiger gegeben hat. Sie müssen die Dokumentationsmethoden überprüfen. In der Tat arbeite ich an <2 Version von Funken. bitte überprüfe Methoden wie 'setActiveSession' etc ... –

2

ich benutze diese Art und Weise und arbeitet völlig in Ordnung mit Funken 2,1

val sc = SparkSession.builder() 
      .config("hive.metastore.uris", "thrift://dbsyz1111:10000") 
      .enableHiveSupport() 
      .getOrCreate() 

// Createdataframe 1 from by reading the data from hive table of metstore 1 
val dataframe_1 = sc.sql("select * from <SourcetbaleofMetaStore_1>") 

// Resetting the existing Spark Contexts 
SparkSession.clearActiveSession() 
SparkSession.clearDefaultSession() 

//Initialize Spark session2 with Hive Metastore 2 
val spc2 = SparkSession.builder() 
       .config("hive.metastore.uris", "thrift://dbsyz2222:10004") 
       .enableHiveSupport() 
       .getOrCreate() 

// Load dataframe 2 of spark context 1 into a new dataframe of spark context2, By getting schema and data by converting to rdd API 
val dataframe_2 = spc2.createDataFrame(dataframe_1.rdd, dataframe_1.schema) 

dataframe_2.write.mode("Append").saveAsTable(<targettableNameofMetastore_2>) 
Verwandte Themen