2016-07-07 5 views
1

Ich habe diesen Code unten:HBase Verbindung Instanz

DStream.map { 
     _.message() 
}.foreachRDD { rdd => 
    rdd.foreachPartition{iter => 
     val conf = HBaseUtils.configureHBase("iemployee") 
     val connection = ConnectionFactory.createConnection(conf) 
     val table = connection.getTable(TableName.valueOf("""iemployee""")) 
     iter.foreach{elem => 
     /* loop through the records in the partition and push them out to the DB */ 
    } 
} 

Kann mir bitte jemand sagen, ob das Verbindungsobjekt val connection = ConnectionFactory.createConnection(conf) hier erstellt ist die gleiche Verbindungsobjekt in jeder Partition verwendet (da ich es nie schließen) oder wird ein neues Verbindungsobjekt für jede Partition erstellt werden?

Antwort

1

Neue Verbindungsinstanz für jede Partition ..

Bitte beachten Sie die unten code & documentation of Connection Factory. es wurde auch erwähnt, dass seine Anrufer die Verantwortung haben, die Verbindung zu schließen.

/** 
    * Create a new Connection instance using the passed <code>conf</code> instance. Connection 
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 
    * created from returned connection share zookeeper connection, meta cache, and connections 
    * to region servers and masters. 
    * <br> 
    * The caller is responsible for calling {@link Connection#close()} on the returned 
    * connection instance. 
    * 
    * Typical usage: 
    * <pre> 
    * Connection connection = ConnectionFactory.createConnection(conf); 
    * Table table = connection.getTable(TableName.valueOf("table1")); 
    * try { 
    * table.get(...); 
    * ... 
    * } finally { 
    * table.close(); 
    * connection.close(); 
    * } 
    * </pre> 
    * 
    * @param conf configuration 
    * @param user the user the connection is for 
    * @param pool the thread pool to use for batch operations 
    * @return Connection object for <code>conf</code> 
    */ 
    public static Connection createConnection(Configuration conf, ExecutorService pool, User user) 
    throws IOException { 
    if (user == null) { 
     UserProvider provider = UserProvider.instantiate(conf); 
     user = provider.getCurrent(); 
    } 

    String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 
     ConnectionImplementation.class.getName()); 
    Class<?> clazz; 
    try { 
     clazz = Class.forName(className); 
    } catch (ClassNotFoundException e) { 
     throw new IOException(e); 
    } 
    try { 
     // Default HCM#HCI is not accessible; make it so before invoking. 
     Constructor<?> constructor = 
     clazz.getDeclaredConstructor(Configuration.class, 
      ExecutorService.class, User.class); 
     constructor.setAccessible(true); 
     return (Connection) constructor.newInstance(conf, pool, user); 
    } catch (Exception e) { 
     throw new IOException(e); 
    } 
    } 
} 

Hoffe das hilft !!

+0

Vielen Dank! – CapturedTree