2017-12-19 8 views
0

Ich habe den Ignite-Cache-Speicher mit HBase als persistenten Back-End-Speicher implementiert. Der Code für den Cache-Speicher lautet wie folgt:Ignite Cache Store - Methode zum Freigeben von Ressourcen

public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> { 

@IgniteInstanceResource 
Ignite gridReference; 

@CacheNameResource 
private String cacheName; 

@Override 
public byte[] load(Long key) { 

    String hbaseKey; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

      Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey)); 

      Result rowFetched = bitDataPersistentTable.get(rowToBeFetched); 

      if (rowFetched == null || rowFetched.isEmpty()) { 
       return null; // Can't return an empty array as Ignite will 
           // load the entry 
      } 

      return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e, 
       "Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName 
         + " ] "); 
    } 

} 

@Override 
public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) { 

    String hbaseKey; 

    long startTime = System.currentTimeMillis(); 

    long numberOfKeysLoaded = 0l; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName); 

      Get rowToBeFetched; 

      Result rowFetched; 

      for (Long key : keys) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

       rowToBeFetched = new Get(Bytes.toBytes(hbaseKey)); 

       rowFetched = bitDataPersistentTable.get(rowToBeFetched); 

       cacheToBeLoaded.put(key, 
         rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
           TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES)); 

       numberOfKeysLoaded++; 

      } 

      System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName 
        + " ] took [ " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds ] "); 

      return null; 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e, 
       "Error while reading multiple keys for the cache [ " + cacheName + " ] "); 
    } 

} 

@Override 
public void write(Entry<? extends Long, ? extends byte[]> entry) { 

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString()); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey)); 

      rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue()); 

      bitDataPersistentTable.put(rowToBeWritten); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e, 
       "Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName 
         + " ] "); 
    } 

} 

@Override 
public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) { 

    long startTime = System.currentTimeMillis(); 

    String hbaseKey; 

    List<Put> rowsToBeWritten = new ArrayList<>(); 

    Put currentRowToBeWritten; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, 
         entryToBeInserted.getKey().toString()); 

       currentRowToBeWritten = new Put(hbaseKey.getBytes()); 

       currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
         TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, 
         entryToBeInserted.getValue()); 

       rowsToBeWritten.add(currentRowToBeWritten); 

      } 

      bitDataPersistentTable.put(rowsToBeWritten); 

     } 

     System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName 
       + " ] is " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds"); 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e, 
       "Error while writing multiple keys for the cache [ " + cacheName + " ] "); 
    } 

} 

@Override 
public void delete(Object key) { 

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey)); 

      bitDataPersistentTable.delete(rowToBeDeleted); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e, 
       "Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName 
         + " ] "); 
    } 

} 

@Override 
public void deleteAll(Collection<?> keys) { 

    String hbaseKey; 

    List<Delete> rowsToBeDeleted = new ArrayList<>(); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      for (Object keyToBeDeleted : keys) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, 
         keyToBeDeleted.toString()); 

       rowsToBeDeleted.add(new Delete(hbaseKey.getBytes())); 

      } 

      bitDataPersistentTable.delete(rowsToBeDeleted); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e, 
       "Error while deleting entries for the cache [ " + cacheName + " ] "); 
    } 

} 

@Override 
public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) { 
    // No implementation provided 
} 

@Override 
public void sessionEnd(boolean commit) { 
    // No implementation provided 
} 

} 

Der Cache-Modus ist PARTITIONED.

Der Cache-Atomizitätsmodus ist ATOMIC.

Es ist offensichtlich aus der Store-Implementierung, dass ich eine neue Verbindung zu HBase in jeder einzelnen der implementierten Methoden hervorbringen werde.

Ich wollte wissen, ob es eine Methode oder Möglichkeit gibt, mehr Kontrolle über das Öffnen und Schließen meiner datenquellenspezifischen Ressourcen (in diesem Fall HBase-Verbindungen) auf einer Makroebene zu haben, anstatt sie bei jedem Methodenaufruf durchzuführen.

+0

Schauen Sie sich vielleicht die Verbindungen an – GurV

Antwort

2

Sie müssen Verbindungspool in Ihrem Geschäft verwenden. Schauen Sie sich c3p0 an.

Verwandte Themen