2017-06-13 2 views
3

Ich bin mit Funken 2.11 Version und ich tue nur drei grundlegende Operationen in meiner Anwendung:Leistungsproblem bei den Funken java

  1. nehmen Datensätze aus Datenbank: 2,2 Millionen
  2. Überprüfung Datensätze aus einer Datei (5 000 Verwendung) in Datenbank (2.200.000) enthält
  3. Schreiben in eine Datei im cSV-Format Datensätze angepasst

Aber für diese drei Operationen dauert es fast 20 Minuten. Wenn ich dieselben Operationen in SQL mache, dauert es weniger als 1 Minuten.

Ich habe begonnen, Funken zu verwenden, weil es Ergebnisse sehr schnell ergibt, aber es dauert zu viel Zeit. Wie kann ich die Leistung verbessern?

Schritt 1: Datensätze aus der Datenbank aufnehmen.

 Properties connectionProperties = new Properties(); 
     connectionProperties.put("user", "test"); 
     connectionProperties.put("password", "test##"); 
     String query="(SELECT * from items) 
     dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties); 

Schritt 2: Aufzeichnungen der Datei A (5k), die in Datei B (2M) Prüfen, unter Verwendung enthält

Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner"); 

Schritt 3: Schreiben angepassten Aufzeichnungen in einer Datei des CSV-Format

NewSet.repartition(1).select("*") 
     .write().format("com.databricks.spark.csv") 
     .option("delimiter", ",") 
     .option("header", "true") 
     .option("treatEmptyValuesAsNulls", "true") 
     .option("nullValue", "") 
     .save(fileAbsolutePath); 

Um die Leistung zu verbessern, habe ich verschiedene Einstellungen wie Cache, Daten Serialisierung

012 versucht

Shuffle Zeit

sqlContext.setConf("spark.sql.shuffle.partitions", "10"), 

Tuning Datenstruktur
-XX:+UseCompressedOops , 

keiner der Ansatz ergibt eine bessere Leistung nicht.

+0

Gibt es einen Grund, Funken für diesen Anwendungsfall zu verwenden? Es scheint mir, dass das Schreiben der 5k-Datensätze in die Datenbank und das Ausgeben eines SQL-Joins innerhalb der Datenbank der effizienteste Ansatz wäre. – maasg

+0

Ich meine, wie lange dauert es, diese Abfrage in Spark materialisieren: 'SELECT * von Elementen)'? – maasg

Antwort

4

Die Erhöhung der Leistung ist eher eine Verbesserung der Parallelität.

Parallelität hängt von der Anzahl der Partitionen in RDD ab.

Stellen Sie sicher, dass Dataset/Dataframe/RDD weder eine zu große Anzahl von Partitionen noch eine sehr geringe Anzahl an Partitionen hat.

Bitte überprüfen Sie die folgenden Vorschläge, in denen Sie Ihren Code verbessern können. Ich bin mit scala wohler, also gebe ich Vorschläge in Scala.

Schritt1: Stellen Sie sicher, dass Sie die Kontrolle über Verbindungen haben, die Sie mit der Datenbank herstellen, indem Sie numPartitions erwähnen.

Anzahl der Verbindungen = Anzahl der Partitionen.

Unten habe ich nur 10 zu num_partitions zugewiesen, das musst du tunen, um mehr Leistung zu bekommen.

int num_partitions; 
    num_partitions = 10; 
    Properties connectionProperties = new Properties(); 
    connectionProperties.put("user", "test"); 
    connectionProperties.put("password", "test##"); 
    connectionProperties.put("partitionColumn", "hash_code"); 
    String query = "(SELECT mod(A.id,num_partitions) as hash_code, A.* from items A)"; 
    dataFileContent = spark.read() 
    .jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", 
     dbtable = query, 
     columnName = "hash_code", 
     lowerBound = 0, 
     upperBound = num_partitions, 
     numPartitions = num_partitions, 
     connectionProperties); 

You can check how numPartitions works

Schritt 2:

Dataset<Row> NewSet = source.join(target, 
    target.col("ItemIDTarget").contains(source.col("ItemIDSource")), 
    "inner"); 

Da man der Tabelle/Datenrahmen 5k Aufzeichnungen (kleine Datenmenge), die können Sie Broadcast verwenden, wie unten erwähnt beizutreten.

import org.apache.spark.sql.functions.broadcast 
val joined_df = largeTableDF.join(broadcast(smallTableDF), "key") 

Schritt 3: Verwenden coalesce Anzahl der Partitionen zu verringern, so dass es voll Shuffle vermeidet.

NewSet.coalesce(1).select("*") 
     .write().format("com.databricks.spark.csv") 
     .option("delimiter", ",") 
     .option("header", "true") 
     .option("treatEmptyValuesAsNulls", "true") 
     .option("nullValue", "") 
     .save(fileAbsolutePath); 

Hoffnung meine Antwort hilft Ihnen.