Nach dem Lesen der Dokumentation habe ich Schwierigkeiten, den Change Feed zu konzipieren. Nehmen wir den Code von documentation unten. Der zweite Änderungsfeed erfasst die Änderungen seit der letzten Ausführung über die Checkpoints. Nehmen wir an, es wird verwendet, um zusammenfassende Daten zu erstellen, und es gab ein Problem, das von einem früheren Zeitpunkt erneut ausgeführt werden musste. Ich verstehe folgendes nicht:DocumentDB Change Feed und Speichern Checkpoint
- Wie Sie eine bestimmte Uhrzeit angeben, zu der der Prüfpunkt starten soll. Ich verstehe, ich kann das Checkpoint-Wörterbuch speichern und für jeden Lauf verwenden, aber wie erhalten Sie die Änderungen von X Zeit, um möglicherweise einige Zusammenfassung Daten erneut auszuführen
- Zweitens sagen wir, wir führen einige zusammenfassende Daten und wir speichern den letzten Prüfpunkt verwendet für jede zusammengefasste Daten, so dass wir wissen, wo das eine aufgehört hat. Woher weiß man, dass ein Datensatz in oder vor diesem Checkpoint ist?
Code, der von der Sammlung Anfang läuft und dann von den letzten Kontrollpunkt:
Dictionary < string, string > checkpoints = await GetChanges(client, collection, new Dictionary < string, string >());
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000
});
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000
});
// Returns only the two documents created above.
checkpoints = await GetChanges(client, collection, checkpoints);
//
private async Task < Dictionary < string, string >> GetChanges(
DocumentClient client,
string collection,
Dictionary < string, string > checkpoints) {
List <PartitionKeyRange> partitionKeyRanges = new List <PartitionKeyRange>();
FeedResponse <PartitionKeyRange> pkRangesResponse;
do {
pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collection);
partitionKeyRanges.AddRange(pkRangesResponse);
}
while (pkRangesResponse.ResponseContinuation != null);
foreach(PartitionKeyRange pkRange in partitionKeyRanges) {
string continuation = null;
checkpoints.TryGetValue(pkRange.Id, out continuation);
IDocumentQuery <Document> query = client.CreateDocumentChangeFeedQuery(
collection,
new ChangeFeedOptions {
PartitionKeyRangeId = pkRange.Id,
StartFromBeginning = true,
RequestContinuation = continuation,
MaxItemCount = 1
});
while (query.HasMoreResults) {
FeedResponse <DeviceReading> readChangesResponse = query.ExecuteNextAsync <DeviceReading>().Result;
foreach(DeviceReading changedDocument in readChangesResponse) {
Console.WriteLine(changedDocument.Id);
}
checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
}
}
return checkpoints;
}
lucuma, ich denke, das ist eine gute Frage, die von einer längeren/offline Diskussion mit [email protected] profitieren könnte. –
Danke Ich habe bereits eine Anfrage gesendet. Ich warte nur auf eine Zeit. – lucuma