2016-07-28 15 views
0

Ich Abfrage Oracle-Datenbank mit Flink DataSet API. Dafür habe ich Flink JDBCInputFormat angepasst, um java.sql.Resultset zurückzugeben. Da ich die Ergebnismenge mit Flink Operatoren weiter bearbeiten muss.Apache Flink JDBC InputFormat werfen java.net.SocketException: Socket geschlossen

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); 
    environment.setParallelism(1); 
    @SuppressWarnings("unchecked") 
    DataSource<ResultSet> source 
      = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() 
        .setUsername("username") 
        .setPassword("password") 
        .setDrivername("driver_name") 
        .setDBUrl("jdbcUrl") 
        .setQuery("query") 
        .finish(),  
        new GenericTypeInfo<ResultSet>(ResultSet.class) 
      ); 
    source.print(); 

    environment.execute(); 

} 

Folgende ist die angepasst JDBCInputFormat:

public class JDBCInputFormat extends RichInputFormat<ResultSet, InputSplit> implements ResultTypeQueryable { 

@Override 
public void open(InputSplit inputSplit) throws IOException { 
       Class.forName(drivername); 
        dbConn = DriverManager.getConnection(dbURL, username, password); 
       statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); 
       resultSet = statement.executeQuery(); 
} 

@Override 
public void close() throws IOException { 
      if(statement != null) { 
        statement.close(); 
       } 
       if(resultSet != null) 
        resultSet.close(); 
       if(dbConn != null) { 
        dbConn.close(); 
       } 
} 

@Override 
public boolean reachedEnd() throws IOException { 
     isLastRecord = resultSet.isLast(); 
    return isLastRecord; 
} 

@Override 
public ResultSet nextRecord(ResultSet row) throws IOException{ 
     if(!isLastRecord){    
      resultSet.next(); 
     } 
     return resultSet; 
} 

}

Dies funktioniert mit folgenden Abfrage mit Begrenzung in der Zeile abgerufen: select a, b, c von xyz wo rownum < = 10; aber wenn ich alle Zeilen zu holen versuchen ca. 1 Million von Daten mit, ich bin die unten Ausnahme immer nach Abrufen zufällige Anzahl von Zeilen:

java.sql.SQLRecoverableException: Io exception: Socket closed 
at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:101) 
at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:199) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:263) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:521) 
at oracle.jdbc.driver.T4CPreparedStatement.fetch(T4CPreparedStatement.java:1024) 
at oracle.jdbc.driver.OracleResultSetImpl.close_or_fetch_from_next(OracleResultSetImpl.java:314) 
at oracle.jdbc.driver.OracleResultSetImpl.next(OracleResultSetImpl.java:228) 
at oracle.jdbc.driver.ScrollableResultSet.cacheRowAt(ScrollableResultSet.java:1839) 
at oracle.jdbc.driver.ScrollableResultSet.isValidRow(ScrollableResultSet.java:1823) 
at oracle.jdbc.driver.ScrollableResultSet.isLast(ScrollableResultSet.java:349) 
at JDBCInputFormat.reachedEnd(JDBCInputFormat.java:98) 
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
at java.lang.Thread.run(Thread.java:745) 

Verursacht durch: java.net.SocketException : Socket geschlossen bei java.net.SocketOutputStream.socketWrite0 (Native Methode)

Also für meinen Fall, wie kann ich dieses Problem lösen?

Antwort

1

Ich glaube nicht, dass es möglich ist, eine ResultSet wie eine normale Aufzeichnung zu versenden. Dies ist ein statusbehaftetes Objekt, das intern eine Verbindung zum Datenbankserver unterhält. Die Verwendung eines ResultSet als Datensatz, der zwischen Flink-Operatoren übertragen wird, bedeutet, dass er serialisiert, über das Netzwerk an eine andere Maschine versendet, deserialisiert und an einen anderen Thread in einem anderen JVM-Prozess übergeben werden kann. Das funktioniert nicht.

Abhängig von der Verbindung könnte ein ResultSet auch auf der gleichen Maschine im selben Thread bleiben, was der Fall sein könnte, der für Sie arbeitete. Wenn Sie eine Datenbank innerhalb eines Operators abfragen möchten, können Sie die Funktion als RichMapPartitionFunction implementieren. Ansonsten würde ich die ResultSet in der Datenquelle lesen und die resultierenden Zeilen weiterleiten.

Verwandte Themen