2016-04-10 5 views
1

Im Gegensatz zu Standardbeispielen, wo wir FireHose (s) für den Import von Zeilen aus CSV, TSV usw. haben, haben wir eins, so dass wir Datensätze aus der Datenbank importieren und in Druide einfügen können? Irgendwelche Gedanken?Druid: Firehose zum Importieren von Datensätzen aus der Datenbank

Hier ist, was ich dachte -

"firehose": { 
    "type" : "database", 
     "datasource" : { 
       "connectURI" : "jdbc:mysql://localhost:3306/test", 
       "user" : "druid", 
       "password" : "xyz123" 
     }, 
     "query" : "select * from table" 
     "frequency" : "P1M" 
} 

Wir möglicherweise es verlängern Verbindung über jndi Datenquelle und einige andere zu bekommen. Hat diese Art von Implementierung irgendwelche Probleme?

+0

Vielleicht ist es einfacher wäre, aus der Datenbank in CSV-Datei zu exportieren und dann die übliche Art und Weise benutzen? – Nikem

+0

Sicher, aber die Größe der Daten ist das Problem. – jagamot

Antwort

0
 
How about this idea? It's custom firehose for jdbc ingestion. 
In this case, only supports one time query ingestion. 
https://github.com/sirpkt/druid/tree/jdbc_firehose/extensions-contrib/jdbc-firehose 
This is code snippet. Using DBI library try to get result set from existing database server. 
public Firehose connect(final MapInputRowParser parser) throws IOException, ParseException, IllegalArgumentException 
    { 
    if (columns != null) { 
     verifyParserSpec(parser.getParseSpec(), columns); 
    } 

    final Handle handle = new DBI(
     connectorConfig.getConnectURI(), 
     connectorConfig.getUser(), 
     connectorConfig.getPassword() 
    ).open(); 

    final String query = makeQuery(columns); 

    final ResultIterator<InputRow> rowIterator = handle 
     .createQuery(query) 
     .map(
      new ResultSetMapper<InputRow>() 
      { 
       List<String> queryColumns = (columns == null) ? Lists.<String>newArrayList(): columns; 

       @Override 
       public InputRow map(
        final int index, 
        final ResultSet r, 
        final StatementContext ctx 
      ) throws SQLException 
       { 
       try { 
        if (queryColumns.size() == 0) 
        { 
        ResultSetMetaData metadata = r.getMetaData(); 
        for (int idx = 1; idx <= metadata.getColumnCount(); idx++) 
        { 
         queryColumns.add(metadata.getColumnName(idx)); 
        } 
        Preconditions.checkArgument(queryColumns.size() > 0, 
         String.format("No column in table [%s]", table)); 
        verifyParserSpec(parser.getParseSpec(), queryColumns); 
        } 
        ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder(); 
        for (String column: queryColumns) { 
        builder.put(column, r.getObject(column)); 
        } 
        return parser.parse(builder.build()); 

       } catch(IllegalArgumentException e) { 
        throw new SQLException(e); 
       } 
       } 
      } 
     ).iterator(); 
Verwandte Themen