Ich bin relativ neu in Cassandra, ich versuche, Daten mithilfe vorbereiteter Anweisung abgerufen über einen Executor Pool abrufen. Die Daten, die ich erhalte, sind nicht konsistent.Cassandra Abrufen von Daten über einen Thread-Pool
Ich habe diese user_connections Tabelle, wobei user_id der Zeilenschlüssel ist, friends_id Liste als Set-Spalte. Ich habe diese eine andere Tabelle, Friends_info Tabelle, wo Friend_id der Zeilenschlüssel und alle anderen Informationen als Spalten ist.
Wenn ich versuche, die Freundesliste für den Benutzer AA wiederherzustellen, erhalte ich die Freundesliste BBB, CCC, DDD. Was vollkommen in Ordnung ist.
Wenn versucht wird, BBB, CCC, DDD über einen Executor-Pool mithilfe vorbereiteter Anweisung abzurufen. Daten sind inkonsistent. Einige Male alle drei Datensätze sind BBB, Einige Male alle drei Datensätze sind, Einige Mal sind zwei Datensätze BBB und eins ist CCC etc ...
Ich habe die Methoden und relevanten Klassen, die ich benutze, können Sie bitte helfen ich damit. Ich weiß, dass eine vorbereitete Anweisung threadsicher ist und erwartungsgemäß wie erwartet funktioniert.
public Set<User> listUserConnections(String userId) {
Session session = client.getSession();
Set<String> listUserConnectionIds = listUserConnections(userId, session);
if (listUserConnectionIds.size() == 0)
return new HashSet<User>();
Set<User> listConnectionUserDetails = retrieveUserConnectionProfileInfo(
listUserConnectionIds, session);
return listConnectionUserDetails;
}
private Set<User> retrieveUserConnectionProfileInfo(Set<String> listUserConnectionIds,
Session session) {
Set<Callable<User>> callables = new HashSet<Callable<User>>();
for(String key:listUserConnectionIds){
logger.info("about to add callable" + key);
Callable<User> callable = new QueryTask(key);
callables.add(callable);
}
// invoke in parallel
List<Future<User>> futures;
Set<User> users = new HashSet<User>();
// TODO Revisit this
ExecutorService executorPool = Executors.newFixedThreadPool(100);
try {
futures = executorPool.invokeAll(callables);
for (Future<User> f : futures) {
User user = f.get();
users.add(user);
logger.info("User from future"+user.getUserId());
}
} catch (ExecutionException e) {
logger.error("Error in retrieving the stores in parallel ", e);
} catch (InterruptedException e) {
logger.error("Error in retrieving the stores in parallel as it was interrupted ", e);
} finally {
executorPool.shutdown();
}
return users;
}
// Executor Pool Klasse
class QueryTask implements Callable<User> {
private String userName;
// final PreparedStatement statement =
// client.getSession().prepare(QueryConstants.GET_ALL_STORE_BRANDS);
QueryTask(String name) {
this.userName = name;
}
@Override
public User call() throws Exception {
// -------------I am seeing the userName is correct------------- for example BBB
logger.info("inside call processing queries for " + userName);
//------------This is a prepared statement, userPreparedStatement.getbStUserInfo()
BoundStatement bStUserInfo = userPreparedStatement.getbStUserInfo();
bStUserInfo.bind(userName);
Session session = client.getSession();
ResultSet rs = session.execute(bStUserInfo);
User user = new User();
Iterator<Row> rowIterator = rs.iterator();
while (rowIterator.hasNext())
{
Row row = rowIterator.next();
//-------This user id is not right
logger.info("Inside the callable after retrieval"+row.getString(TableConstants.Users.COLUMN_NAME_USER_ID));
user.setUserId(row.getString(TableConstants.Users.COLUMN_NAME_USER_ID));
return user;
}
logger.info("outside the while loop");
return user;
}
}
Ich sehe nicht, wo Sie Ihre 'BoundStatement' Instanzen" neu ". – Ralf
Als eine Nebenbemerkung gibt Ihnen der Treiber bereits eine asynchrone API, so dass Sie fast alle QueryTask/Executor-Code loswerden und auf Ihren Test konzentrieren können. –