Ich habe den Ignite-Cache-Speicher mit HBase als persistenten Back-End-Speicher implementiert. Der Code für den Cache-Speicher lautet wie folgt:Ignite Cache Store - Methode zum Freigeben von Ressourcen
public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
private String cacheName;
@Override
public byte[] load(Long key) {
String hbaseKey;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
Result rowFetched = bitDataPersistentTable.get(rowToBeFetched);
if (rowFetched == null || rowFetched.isEmpty()) {
return null; // Can't return an empty array as Ignite will
// load the entry
}
return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
"Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName
+ " ] ");
}
}
@Override
public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) {
String hbaseKey;
long startTime = System.currentTimeMillis();
long numberOfKeysLoaded = 0l;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName);
Get rowToBeFetched;
Result rowFetched;
for (Long key : keys) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
rowFetched = bitDataPersistentTable.get(rowToBeFetched);
cacheToBeLoaded.put(key,
rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES));
numberOfKeysLoaded++;
}
System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName
+ " ] took [ " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds ] ");
return null;
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
"Error while reading multiple keys for the cache [ " + cacheName + " ] ");
}
}
@Override
public void write(Entry<? extends Long, ? extends byte[]> entry) {
String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString());
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey));
rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue());
bitDataPersistentTable.put(rowToBeWritten);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
"Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName
+ " ] ");
}
}
@Override
public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) {
long startTime = System.currentTimeMillis();
String hbaseKey;
List<Put> rowsToBeWritten = new ArrayList<>();
Put currentRowToBeWritten;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
entryToBeInserted.getKey().toString());
currentRowToBeWritten = new Put(hbaseKey.getBytes());
currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES,
entryToBeInserted.getValue());
rowsToBeWritten.add(currentRowToBeWritten);
}
bitDataPersistentTable.put(rowsToBeWritten);
}
System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName
+ " ] is " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds");
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
"Error while writing multiple keys for the cache [ " + cacheName + " ] ");
}
}
@Override
public void delete(Object key) {
String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey));
bitDataPersistentTable.delete(rowToBeDeleted);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
"Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName
+ " ] ");
}
}
@Override
public void deleteAll(Collection<?> keys) {
String hbaseKey;
List<Delete> rowsToBeDeleted = new ArrayList<>();
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
for (Object keyToBeDeleted : keys) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
keyToBeDeleted.toString());
rowsToBeDeleted.add(new Delete(hbaseKey.getBytes()));
}
bitDataPersistentTable.delete(rowsToBeDeleted);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
"Error while deleting entries for the cache [ " + cacheName + " ] ");
}
}
@Override
public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) {
// No implementation provided
}
@Override
public void sessionEnd(boolean commit) {
// No implementation provided
}
}
Der Cache-Modus ist PARTITIONED.
Der Cache-Atomizitätsmodus ist ATOMIC.
Es ist offensichtlich aus der Store-Implementierung, dass ich eine neue Verbindung zu HBase in jeder einzelnen der implementierten Methoden hervorbringen werde.
Ich wollte wissen, ob es eine Methode oder Möglichkeit gibt, mehr Kontrolle über das Öffnen und Schließen meiner datenquellenspezifischen Ressourcen (in diesem Fall HBase-Verbindungen) auf einer Makroebene zu haben, anstatt sie bei jedem Methodenaufruf durchzuführen.
Schauen Sie sich vielleicht die Verbindungen an – GurV