2016-06-21 9 views
0

Ich versuchte Cassandra und multiprocessing zu verwenden, um Zeilen (Dummy-Daten) gleichzeitig auf die Beispiele basieren einzufügenCassandra Multiprozessing kann _thread.lock nicht beizen Objekte

http://www.datastax.com/dev/blog/datastax-python-driver-multiprocessing-example-for-improved-bulk-data-throughput

Dies ist mein Code

class QueryManager(object): 

concurrency = 100 # chosen to match the default in execute_concurrent_with_args 

def __init__(self, session, process_count=None): 
    self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,)) 

@classmethod 
def _setup(cls, session): 
    cls.session = session 
    cls.prepared = cls.session.prepare(""" 
INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?) 
""") 

def close_pool(self): 
    self.pool.close() 
    self.pool.join() 

def get_results(self, params): 
    results = self.pool.map(_multiprocess_write, (params[n:n+self.concurrency] for n in range(0, len(params), self.concurrency))) 
    return list(itertools.chain(*results)) 

@classmethod 
def _results_from_concurrent(cls, params): 
    return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)] 


def _multiprocess_write(params): 
    return QueryManager._results_from_concurrent(params) 


if __name__ == '__main__': 

    processes = 2 

    # connect cluster 
    cluster = Cluster(contact_points=['127.0.0.1'], port=9042) 
    session = cluster.connect() 

    # database name is a concatenation of client_id and system_id 
    keyspace_name = 'unit_test_0' 

    # drop keyspace if it already exists in a cluster 
    try: 
     session.execute("DROP KEYSPACE IF EXISTS " + keyspace_name) 
    except: 
     pass 

    create_keyspace_query = "CREATE KEYSPACE " + keyspace_name \ 
         + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};" 
    session.execute(create_keyspace_query) 

    # use a session's keyspace 
    session.set_keyspace(keyspace_name) 

    # drop table if it already exists in the keyspace 
    try: 
     session.execute("DROP TABLE IF EXISTS " + "test_table") 
    except: 
     pass 

    # create a table for invoices in the keyspace 
    create_test_table = "CREATE TABLE test_table(" 

    keys = "key1 text,\n" \ 
      "key2 text,\n" \ 
      "key3 text,\n" \ 
      "key4 text,\n" \ 
      "key5 text,\n" 

    create_invoice_table_query += keys 
    create_invoice_table_query += "PRIMARY KEY (key1))" 
    session.execute(create_test_table) 

    qm = QueryManager(session, processes) 

    params = list() 
    for row in range(100000): 
     key = 'test' + str(row) 
     params.append([key, 'test', 'test', 'test', 'test']) 

    start = time.time() 
    rows = qm.get_results(params) 
    delta = time.time() - start 
    log.info(fm('Cassandra inserts 100k dummy rows for ', delta, ' secs')) 

, wenn ich den Code ausgeführt wird, bekam ich folgende Fehlermeldung

TypeError: can't pickle _thread.lock objects 

, die bei

wies
self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,)) 
+0

Mögliche Hilfe für andere, die hier ankommen: https://stackoverflow.com/questions/44005212/picklingerror-when-copying-a-y-large-cassandra-table-using-cqlsh/45698179#45698179 – bpgriner

Antwort

1

dass Sie versuchen, eine Sperre über IPC Grenzen würde vorschlagen, serialisiert werden. Ich denke, das liegt möglicherweise daran, dass Sie ein Session-Objekt als Argument für die Worker-Initialisierungsfunktion bereitstellen. Lassen Sie die init-Funktion eine neue Sitzung in jedem Arbeitsprozess erstellen (siehe den Abschnitt "Sitzung pro Prozess" in der von Ihnen zitierten blog post).