2017-11-01 1 views
9

Angenommen, ich habe eine sehr große Textdatei, die aus vielen Zeilen besteht, die ich gerne umkehren würde. Und mir ist die endgültige Bestellung egal. Die Eingabedatei enthält kyrillische Symbole. Ich verwende multiprocessing, um auf mehreren Kernen zu verarbeiten.Warum blockiert multiprocessing.Lock() keine freigegebene Ressource in Python?

# task.py 

import multiprocessing as mp 


POOL_NUMBER = 2 


lock_read = mp.Lock() 
lock_write = mp.Lock() 

fi = open('input.txt', 'r') 
fo = open('output.txt', 'w') 

def handle(line): 
    # In the future I want to do 
    # some more complicated operations over the line 
    return line.strip()[::-1] # Reversing 

def target(): 
    while True: 
     try: 
      with lock_read: 
       line = next(fi) 
     except StopIteration: 
      break 

     line = handle(line) 

     with lock_write: 
      print(line, file=fo) 

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)] 
for p in pool: 
    p.start() 
for p in pool: 
    p.join() 

fi.close() 
fo.close() 

Dieses Programm wird mit Fehler:

Process Process-2: 
Process Process-1: 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "task.py", line 22, in target 
    line = next(fi) 
    File "/usr/lib/python3.5/codecs.py", line 321, in decode 
    (result, consumed) = self._buffer_decode(data, self.errors, final) 
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "task.py", line 22, in target 
    line = next(fi) 
    File "/usr/lib/python3.5/codecs.py", line 321, in decode 
    (result, consumed) = self._buffer_decode(data, self.errors, final) 
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte 

Auf der anderen Seite, alles funktioniert gut, wenn ich gesetzt POOL_NUMBER = 1

ich ein solches Programm geschrieben haben. Aber es macht keinen Sinn, wenn ich die Gesamtleistung erreichen möchte.

Warum passiert dieser Fehler? Und wie kann ich es reparieren?

Ich verwende Python 3.5.2.

I erzeugten Daten dieses Skript:

# gen_file.py 

from random import randint 


LENGTH = 100 
SIZE = 100000 


def gen_word(length): 
    return ''.join(
     chr(randint(ord('а'), ord('я'))) 
     for _ in range(length) 
    ) 


if __name__ == "__main__": 
    with open('input.txt', 'w') as f: 
     for _ in range(SIZE): 
      print(gen_word(LENGTH), file=f) 
+0

Sie auf eine Antwort finden können über [einzelne Datei aus mehreren Prozessen in Python wird bearbeitet] (https : //stackoverflow.com/a/11196615/4662041) – Sheshnath

+0

haben Sie versucht, diese Datei zu lesen und ihre Daten zu drucken? wenn du diesen Fehler wieder fängst! das heißt du solltest es lesen, als binärer modus mit "rb" ... – DRPK

+0

@DRPK habe ich gemacht. Wenn ich 'line = handle (line)' aus meinem Skript lösche, kommt der gleiche Fehler. – Fomalhaut

Antwort

3

Die Frage ist hier eine Datei von mehreren Prozessen ist das Lesen nicht funktioniert, wie Sie denken, können Sie das open Objekt zwischen Prozessen nicht teilen.

Sie könnten eine globale current_line Variable, und jedes Mal lesen Sie die Datei und die aktuelle Zeile, nicht ideal. Hier

ist ein anderer Ansatz, Prozesse Pool verwenden und map Methode, ich bin Iterieren über die Datei, und für jede Zeile enqueue ich Ihre Zielmethode:

from multiprocessing import Lock 
from multiprocessing import Pool 
import time 
import os 

POOL_NUMBER = 8 

def target(line): 
    # Really need some processing here 
    for _ in range(2**10): 
     pass 
    return line[::-1] 


pool = Pool(processes=POOL_NUMBER) 
os.truncate('output.txt', 0) # Just to make sure we have plan new file 
with open('input.txt', 'r') as fi: 
    t0 = time.time() 
    processed_lines = pool.map(target, fi.readlines()) 
    print('Total time', time.time() - t0) 

    with open('output.txt', 'w') as fo: 
     for processed_line in processed_lines: 
      fo.writelines(processed_line) 

Mit 8 Prozess auf meinem Rechner: Total time 1.3367934226989746

Und mit 1 Prozess: Total time 4.324501991271973

Dies funktioniert am besten, wenn Ihre Zielfunktion CPU gebunden ist, eine andere ap Proach wäre, die Datei in POOL_NUMBER Chunks zu teilen und jeden Prozess einen verarbeiteten Datenblock (mit Sperre!) in die Ausgabedatei schreiben zu lassen.

Ein anderer Ansatz besteht darin, einen Master-Prozess zu erstellen, der den Schreibauftrag für den Rest der Prozesse ausführt, here ist ein Beispiel.

EDIT

Nachdem ich dachte, kommentieren Sie die Datei nicht in den Speicher passen. Dazu können Sie einfach über das Dateiobjekt iterieren, das zeilenweise in den Speicher gelesen wird. Aber als wir den Code ein bisschen zu groß ändern müssen:

POOL_NUMBER = 8 
CHUNK_SIZE = 50000 

def target(line): 
    # This is not a measurable task, since most of the time wil spent on writing the data 
    # if you have a CPU bound task, this code will make sense 
    return line[::-1] 


pool = Pool(processes=POOL_NUMBER) 
os.truncate('output.txt', 0) # Just to make sure we have plan new file 
processed_lines = [] 

with open('input.txt', 'r') as fi: 
    t0 = time.time() 
    for line in fi: 
     processed_lines.append(pool.apply_async(target, (line,))) # Keep a refernce to this task, but don't 

     if len(processed_lines) == CHUNK_SIZE: 
      with open('output.txt', 'w') as fo: # reading the file line by line 
       for processed_line in processed_lines: 
        fo.writelines(processed_line.get()) 
      processed_lines = [] # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory! 
    print('Total time', time.time() - t0) 

Beachten Sie, dass Sie mit der CHUNK_SIZE Variable spielen, um zu steuern, wie viel Speicher Sie verwenden. Für mich ist 5000 ungefähr 10K maximal für jeden Prozess.

P.S

Ich denke, es wäre am besten die große Datei in kleinere Dateien aufteilen, auf diese Weise lösen Sie die Lese-/Schreibsperre für die Datei, und auch skalierbar zu verarbeiten (sogar auf einer anderen Maschine!)

+0

Vielen Dank für die von Ihnen zur Verfügung gestellte Lösung. Aber leider hat es einen großen Nachteil. Die gesamte Eingabedatei geht in den RAM (weil 'fi.readlines()' auf diese Weise funktioniert) und auch 'verarbeitete_zeilen' benötigt viel Speicher. Mit anderen Worten verbraucht Ihr Skript zu viel Speicher und es wäre ineffizient im Falle einer wirklich großen 'input.txt' (zum Beispiel 100 Millionen Zeilen). Ist es möglich, Ihr Skript zu aktualisieren, um dieses Problem zu lösen? – Fomalhaut

+0

@Fomalhaut Ich habe meine Antwort aktualisiert, bitte mach eine Schleife :) –

0

Es sieht aus wie line = next(fi) wird nicht korrekt unter anderen Process verarbeitet.

Es ist möglich, die Notwendigkeit der Verwendung next(fi) mit Hilfe von temporären Puffer von Zeilen, die durch den Hauptthread des Programms gefüllt und von jedem Prozess gelesen werden, zu umgehen. Für diese Rolle ist es besser multiprocessing.Queue zu verwenden.

Das ist also mein Skript:

from time import sleep, time 
import multiprocessing as mp 
import queue 


MAX_QUEUE_SIZE = 1000 
QUEUE_TIMEOUT = 0.000001 
POOL_NUMBER = 4 


def handle(line): 
    sleep(0.00001) # Some processing here that takes time 
    return line.strip()[::-1] 


def target(fout, write_lock, lines_queue): 
    while True: 
     try: 
      line = lines_queue.get(timeout=1.0) 
      line = handle(line) 
      with write_lock: 
       print(line, file=fout) 
       fout.flush() 
     except queue.Empty: 
      break 


if __name__ == "__main__": 
    time_begin = time() 

    with open('output.txt', 'w') as fout: 
     write_lock = mp.Lock() 
     lines_queue = mp.Queue() 

     processes = [ 
      mp.Process(target=target, args=(fout, write_lock, lines_queue)) 
      for _ in range(POOL_NUMBER) 
     ] 
     for p in processes: 
      p.start() 

     with open('input.txt', 'r') as fin: 
      while True: 
       try: 
        while lines_queue.qsize() < MAX_QUEUE_SIZE: 
         line = next(fin) 
         lines_queue.put(line) 
        sleep(QUEUE_TIMEOUT) 
       except StopIteration: 
        break 

     for p in processes: 
      p.join() 

    time_end = time() 
    print("Time:", time_end - time_begin) 

Auf meiner CPU ich dieses Ergebnis bekam:

POOL_NUMBER = 1 -> Time: 17.877086400985718 
POOL_NUMBER = 2 -> Time: 8.611438989639282 
POOL_NUMBER = 3 -> Time: 6.332395553588867 
POOL_NUMBER = 4 -> Time: 5.321753978729248 
Verwandte Themen