2012-04-09 3 views
0

Ich habe riesige csv-Dateien (100 MB +) auf Amazon S3 und ich möchte sie in Chunks lesen und verarbeiten sie Ruby CSV-Bibliothek. Ich habe eine harte Zeit, um das richtige IO-Objekt für csv Verarbeitung zu erstellen:Gepufferte/RingBuffer IO in Ruby + Amazon S3 nicht blockierenden Chunk liest

buffer = TheRightIOClass.new 
bytes_received = 0 
RightAws::S3Interface.new(<access_key>, <access_secret>).retrieve_object(bucket, key) do  |chunk| 
    bytes_received += buffer.write(chunk) 
    if bytes_received >= 1*MEGABYTE 
    bytes_received = 0 
    csv(buffer).each do |row| 
     process_csv_record(row) 
    end 
    end 
end 

def csv(io) 
    @csv ||= CSV.new(io, headers: true) 
end 

Ich weiß nicht, was das richtige Setup hier sollte und was die TheRightIOClass ist. Ich möchte nicht die gesamte Datei mit StringIO in den Speicher laden. Gibt es dafür einen Pufferedio oder Ringpuffer in Ruby? Wenn jemand eine gute Lösung mit Threads (keine Prozesse) und Pipes hat, würde ich es gerne sehen.

Antwort

2

Sie können StringIO verwenden und eine clevere Fehlerbehandlung durchführen, um sicherzustellen, dass Sie eine ganze Zeile in einem Chunk haben, bevor Sie sie behandeln. Die Packerklasse in diesem Beispiel sammelt nur die analysierten Zeilen im Speicher, bis Sie sie auf die Festplatte oder eine Datenbank übertragen.

packer = Packer.new 
object = AWS::S3.new.buckets[bucket].objects[path] 
io = StringIO.new 
csv = ::CSV.new(io, headers: true) 
object.read do |chunk| 
    #Append the most recent chunk and rewind the IO 
    io << chunk 
    io.rewind 
    last_offset = 0 
    begin 
    while row = csv.shift do 
     #Store the parsed row unless we're at the end of a chunk 
     unless io.eof? 
     last_offset = io.pos 
     packer << row.to_hash 
     end 
    end 
    rescue ArgumentError, ::CSV::MalformedCSVError => e 
    #Only rescue malformed UTF-8 and CSV errors if we're at the end of chunk 
    raise e unless io.eof? 
    end 
    #Seek to our last offset, create a new StringIO with that partial row & advance the cursor 
    io.seek(last_offset) 
    io.reopen(io.read) 
    io.read 
    #Flush our accumulated rows to disk every 1 Meg 
    packer.flush if packer.bytes > 1*MEGABYTES 
end 
#Read the last row 
io.rewind 
packer << csv.shift.to_hash 
packer