2016-04-19 3 views
0

Wir haben diese massive Legacy SQL-Tabelle, die wir brauchen, um Daten daraus zu extrahieren und es auf s3 schieben. Im Folgenden stelle ich dar, wie ich einen Teil der Daten abfrage und die Ausgabe schreibe.Spark ClosedChannelException Ausnahme während Parkett schreiben

def writeTableInParts(tableName: String, numIdsPerParquet: Long, numPartitionsAtATime: Int, startFrom : Long = -1, endTo : Long = -1, filePrefix : String = s3Prefix) = { 
    val minId : Long = if (startFrom > 0) startFrom else findMinCol(tableName, "id") 
    val maxId : Long = if (endTo > 0) endTo else findMaxCol(tableName, "id") 

    (minId until maxId by numIdsPerParquet).toList.sliding(numPartitionsAtATime, numPartitionsAtATime).toList.foreach(list => { 
     list.map(start => { 
      val end = math.min(start + numIdsPerParquet, maxId) 

      sqlContext.read.jdbc(mysqlConStr, 
      s"(SELECT * FROM $tableName WHERE id >= ${start} AND id < ${end}) as tmpTable", 
      Map[String, String]()) 
     }).reduce((left, right) => { 
      left.unionAll(right) 
     }) 
     .write 
     .parquet(s"${filePrefix}/$tableName/${list.head}-${list.last + numIdsPerParquet}") 
    }) 
    } 

Dies hat auch für viele verschiedene Tabellen gearbeitet, aber aus irgendeinem Grund eine Tabelle java.nio.channels.ClosedChannelException egal wie viel ich das Scanfenster oder Größe erhalten weiter reduzieren.

basierend auf this Antwort Ich denke, ich habe eine Ausnahme irgendwo in meinem Code, aber ich bin mir nicht sicher, wo es sein würde, da es ein ziemlich einfacher Code ist. Wie kann ich diese Ausnahme weiter debuggen? Logs hatten nichts sehr heilpul und enthüllen die Ursache nicht.

Antwort