2017-01-17 2 views
0

Nach dem Lesen der Dokumentation habe ich Schwierigkeiten, den Change Feed zu konzipieren. Nehmen wir den Code von documentation unten. Der zweite Änderungsfeed erfasst die Änderungen seit der letzten Ausführung über die Checkpoints. Nehmen wir an, es wird verwendet, um zusammenfassende Daten zu erstellen, und es gab ein Problem, das von einem früheren Zeitpunkt erneut ausgeführt werden musste. Ich verstehe folgendes nicht:DocumentDB Change Feed und Speichern Checkpoint

  • Wie Sie eine bestimmte Uhrzeit angeben, zu der der Prüfpunkt starten soll. Ich verstehe, ich kann das Checkpoint-Wörterbuch speichern und für jeden Lauf verwenden, aber wie erhalten Sie die Änderungen von X Zeit, um möglicherweise einige Zusammenfassung Daten erneut auszuführen
  • Zweitens sagen wir, wir führen einige zusammenfassende Daten und wir speichern den letzten Prüfpunkt verwendet für jede zusammengefasste Daten, so dass wir wissen, wo das eine aufgehört hat. Woher weiß man, dass ein Datensatz in oder vor diesem Checkpoint ist?

Code, der von der Sammlung Anfang läuft und dann von den letzten Kontrollpunkt:

Dictionary < string, string > checkpoints = await GetChanges(client, collection, new Dictionary < string, string >()); 

    await client.CreateDocumentAsync(collection, new DeviceReading { 
    DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000 
    }); 
    await client.CreateDocumentAsync(collection, new DeviceReading { 
    DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000 
    }); 

    // Returns only the two documents created above. 
    checkpoints = await GetChanges(client, collection, checkpoints); 
    // 

    private async Task < Dictionary < string, string >> GetChanges(
    DocumentClient client, 
    string collection, 
    Dictionary < string, string > checkpoints) { 
    List <PartitionKeyRange> partitionKeyRanges = new List <PartitionKeyRange>(); 
    FeedResponse <PartitionKeyRange> pkRangesResponse; 

    do { 
    pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collection); 
    partitionKeyRanges.AddRange(pkRangesResponse); 
    } 
    while (pkRangesResponse.ResponseContinuation != null); 

    foreach(PartitionKeyRange pkRange in partitionKeyRanges) { 
    string continuation = null; 
    checkpoints.TryGetValue(pkRange.Id, out continuation); 

    IDocumentQuery <Document> query = client.CreateDocumentChangeFeedQuery(
    collection, 
    new ChangeFeedOptions { 
     PartitionKeyRangeId = pkRange.Id, 
     StartFromBeginning = true, 
     RequestContinuation = continuation, 
     MaxItemCount = 1 
    }); 

    while (query.HasMoreResults) { 
    FeedResponse <DeviceReading> readChangesResponse = query.ExecuteNextAsync <DeviceReading>().Result; 

    foreach(DeviceReading changedDocument in readChangesResponse) { 
     Console.WriteLine(changedDocument.Id); 
    } 

    checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation; 
    } 
    } 

    return checkpoints; 
    } 
+0

lucuma, ich denke, das ist eine gute Frage, die von einer längeren/offline Diskussion mit [email protected] profitieren könnte. –

+0

Danke Ich habe bereits eine Anfrage gesendet. Ich warte nur auf eine Zeit. – lucuma

Antwort

1

DocumentDB unterstützt Check-Pointing nur anhand des logischen Zeitstempels, der vom Server zurückgegeben wird. Wenn Sie alle Änderungen von X Minuten abrufen möchten, müssen Sie sich den logischen Zeitstempel der Uhrzeit (ETag, die für die Sammlung in der REST-API zurückgegeben wird, ResponseContinuation im SDK) "merken" und dann zum Abrufen verwenden Änderungen.

Der Wechselfeed verwendet die logische Zeit anstelle der Uhrzeit, da sie sich über verschiedene Server/Partitionen unterscheiden kann. Wenn Sie die Änderungsfeed-Unterstützung basierend auf der Uhrzeit anzeigen möchten (mit einigen Einschränkungen bei der Schräglage), schlagen Sie bitte unter https://feedback.azure.com/forums/263030-documentdb/ vor.

den letzten Kontrollpunkt pro Partition Schlüssel/Dokument zu speichern, können Sie einfach die entsprechende Version des Chargen speichern, in dem sie (wieder für die Sammlung in dem REST-API, ResponseContinuation im SDK ETag), wie Fred zuletzt gesehen wurden vorgeschlagen in seiner Antwort.

1

Wie eine bestimmte Zeit angeben, der Checkpoint starten soll.

Sie könnten versuchen, eine logische Version zur Verfügung zu stellen/ETag (wie 95488) anstelle eines Nullwert als RequestContinuation Eigenschaft ChangeFeedOptions bietet.