2016-12-01 5 views
6

Mein Szenario ist, dass ich eine BufferBlock<Stream> empfangen Stream 's von einer externen Quelle, sagen wir das Dateisystem oder einige FTP-Server. Diese Datei Stream wird in einen anderen Block übergehen und verarbeitet werden.TransformerBlock Buchung zum Ausgang

Der einzige Haken ist, dass einige dieser Dateien gezippt sind, und ich möchte eine Block in der Mitte hinzufügen, die Dateien entpacken würde, wenn nötig, und erstellen mehrere Ausgabe Stream für jeden seiner Einträge.

Allerdings möchte ich nicht TransformBlockMany verwenden, weil dies bedeutet, ich muss vollständig erhalten die PLZ Stream und erstellen Sie die Ausgabe Stream Array auf einmal.

Ich mag diese Block die ZIP Stream, startet Dekomprimierung empfangen und Push zum nächsten Strom, wenn ein Eintrag ist bereit, so dass der Prozess Blockverarbeitung beginnen kann, sobald die erste Datei dekomprimiert wird, und nicht warten, bis alles ist dekomprimiert.

Wie würde ich das machen?

+3

Welche Bibliothek verwenden Sie für die ZIP-Dekomprimierung? –

+2

Verwenden von System.IO.Compression.ZipFile. – Gidon

+0

Bis jetzt verstehe ich, dass mein Problem tatsächlich der asynchrone Teil ist. Wenn ich Async nicht verwenden würde, könnte ich einfach die Rendite im TransformManyBlock verwenden. Aber ich kann die Rendite nicht zusammen mit Async verwenden. – Gidon

Antwort

1

Ich habe verstanden, mein Problem ist nicht in der Lage, eine yield/async zusammen zu verwenden. Aber nachdem es Refactoring, habe ich von dieser Notwendigkeit zu befreien, und kam mit der folgenden (vereinfacht) Version auf:

var block = new TransformManyBlock<Stream, Stream>((input) => { 
var archive = new System.IO.Compression.ZipArchive(input, System.IO.Compression.ZipArchiveMode.Read, true); 
foreach (ZipArchiveEntry entry in archive.Entries) 
{ 
    if (string.IsNullOrWhiteSpace(entry.Name)) //is a folder 
     continue; 

    yield return entry.Open(); 

} 

}); 
0

Sie können Setup der Zwischenblock für Ihre unzip Logik mit predicate linking die Blöcke so können Sie überprüfen, ein Archiv oder nicht, so etwas wie dies streamen:

var buffer = new BufferBlock<Stream>(); 
var unzipper = new TransformManyBlock<Stream, Stream>(input => { /* unzip here */ }); 
var processBlock = new ActionBlock<Stream>(input => { /* process streams here */ }); 

buffer.LinkTo(unzipper, input => /* check is stream a zip archive */); 
unzipper.LinkTo(processBlock); 
buffer.LinkTo(processBlock); 

was async Verwendung zusammen mit yield, können Sie einen Versuch zum AsyncEnumerable Paket auf GitHub und NuGet geben.