0

Ich bin relativ neu in Cassandra, ich versuche, Daten mithilfe vorbereiteter Anweisung abgerufen über einen Executor Pool abrufen. Die Daten, die ich erhalte, sind nicht konsistent.Cassandra Abrufen von Daten über einen Thread-Pool

Ich habe diese user_connections Tabelle, wobei user_id der Zeilenschlüssel ist, friends_id Liste als Set-Spalte. Ich habe diese eine andere Tabelle, Friends_info Tabelle, wo Friend_id der Zeilenschlüssel und alle anderen Informationen als Spalten ist.

Wenn ich versuche, die Freundesliste für den Benutzer AA wiederherzustellen, erhalte ich die Freundesliste BBB, CCC, DDD. Was vollkommen in Ordnung ist.

Wenn versucht wird, BBB, CCC, DDD über einen Executor-Pool mithilfe vorbereiteter Anweisung abzurufen. Daten sind inkonsistent. Einige Male alle drei Datensätze sind BBB, Einige Male alle drei Datensätze sind, Einige Mal sind zwei Datensätze BBB und eins ist CCC etc ...

Ich habe die Methoden und relevanten Klassen, die ich benutze, können Sie bitte helfen ich damit. Ich weiß, dass eine vorbereitete Anweisung threadsicher ist und erwartungsgemäß wie erwartet funktioniert.

public Set<User> listUserConnections(String userId) { 
    Session session = client.getSession(); 

    Set<String> listUserConnectionIds = listUserConnections(userId, session); 

    if (listUserConnectionIds.size() == 0) 
     return new HashSet<User>(); 



    Set<User> listConnectionUserDetails = retrieveUserConnectionProfileInfo(
      listUserConnectionIds, session); 
    return listConnectionUserDetails; 

} 



private Set<User> retrieveUserConnectionProfileInfo(Set<String> listUserConnectionIds, 
     Session session) { 


    Set<Callable<User>> callables = new HashSet<Callable<User>>(); 


    for(String key:listUserConnectionIds){ 



     logger.info("about to add callable" + key); 

     Callable<User> callable = new QueryTask(key); 

     callables.add(callable); 

    } 
    // invoke in parallel 
    List<Future<User>> futures; 
    Set<User> users = new HashSet<User>(); 

    // TODO Revisit this 
    ExecutorService executorPool = Executors.newFixedThreadPool(100); 

    try { 
     futures = executorPool.invokeAll(callables); 
     for (Future<User> f : futures) { 

      User user = f.get(); 
      users.add(user); 

      logger.info("User from future"+user.getUserId()); 

     } 
    } catch (ExecutionException e) { 
     logger.error("Error in retrieving the stores in parallel ", e); 
    } catch (InterruptedException e) { 
     logger.error("Error in retrieving the stores in parallel as it was interrupted ", e); 
    } finally { 
     executorPool.shutdown(); 
    } 

    return users; 
} 

// Executor Pool Klasse

class QueryTask implements Callable<User> { 

    private String userName; 

    // final PreparedStatement statement = 
    // client.getSession().prepare(QueryConstants.GET_ALL_STORE_BRANDS); 

    QueryTask(String name) { 
     this.userName = name; 

    } 

    @Override 
    public User call() throws Exception { 
     // -------------I am seeing the userName is correct------------- for example BBB 
     logger.info("inside call processing queries for " + userName); 

     //------------This is a prepared statement, userPreparedStatement.getbStUserInfo() 
     BoundStatement bStUserInfo = userPreparedStatement.getbStUserInfo(); 
     bStUserInfo.bind(userName); 
     Session session = client.getSession(); 
     ResultSet rs = session.execute(bStUserInfo); 

     User user = new User(); 

     Iterator<Row> rowIterator = rs.iterator(); 
     while (rowIterator.hasNext()) 
     { 
     Row row = rowIterator.next(); 

     //-------This user id is not right 
     logger.info("Inside the callable after retrieval"+row.getString(TableConstants.Users.COLUMN_NAME_USER_ID)); 
     user.setUserId(row.getString(TableConstants.Users.COLUMN_NAME_USER_ID)); 



     return user; 
     } 

     logger.info("outside the while loop"); 
     return user; 

    } 

} 
+0

Ich sehe nicht, wo Sie Ihre 'BoundStatement' Instanzen" neu ". – Ralf

+2

Als eine Nebenbemerkung gibt Ihnen der Treiber bereits eine asynchrone API, so dass Sie fast alle QueryTask/Executor-Code loswerden und auf Ihren Test konzentrieren können. –

Antwort

0

Vielen Dank @Ralf und Alex Popescu für auf diese zurück zu mir zu bekommen. Datastax verfügte über eine Dokumentation, in der ausführlich beschrieben wird, wie die Aync-Aufrufe funktionieren.

@ Alex Popescu. Danke ich versuchte ihre Aync Anrufe und es hat gut für mich funktioniert.