0

Ich benutze Dynamodb, um Aggregation Berichte zu erstellen. Mein Programm wird alle 10 Minuten ausgelöst, um die Datensätze von Dynamodb-Streams zu erhalten, die in den letzten 10 Minuten geschrieben wurden, und um zusammengefasste Berichte zu erstellen. Ich würde gerne wissen, wie man das mit Python und Boto3 macht. Ich habe versucht, wenn es einen Zeitparameter in get_shard_iterator gibt, aber es gibt keinen. Ich muss den shard_iterator basierend auf dem Erstellungsdatum abrufen. Gibt es einen anderen Weg, dies zu tun?Get Last N Minuten Datensätze Dynamodb Streams

+0

DynamoDB Ströme sind sharded auf den Primärschlüssel basiert, nicht Zeitstempel, so Theres keine ‚eingebaute in der Art, dies zu tun. –

Antwort

0

Alles in einem Kinesis-Stream ist nach einer 'sequence_number' sortiert. Wenn Sie alle Datensätze für die letzten 10 Minuten möchten, müssen Sie den Stream mit einem neuen shard_iterator abfragen, der angibt, mit welcher Sequenznummer gestartet werden soll. dann verarbeite alles im Stream.

Ich würde empfehlen, den Daten, die Sie über Kinesis senden, einen utc-Zeitstempel hinzuzufügen und ihn dann erneut zu bestellen, wenn Sie ihn alle 10 Minuten konsumieren.

Grob aus dem Gedächtnis, sollte es so etwas wie dieses:

import boto3 

KINESIS_CLIENT = boto3.client('kinesis', region_name='eu-west-1') 

stream_name="your stream name" 
# you may need to retrieve this from via boto if dynamically generated 
shard_id="your shard id" 

# how to get the first record from the stream with the starting sequence number 
#KINESIS_CLIENT.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType='TRIM_HORIZON') 

shard_iterator = KINESIS_CLIENT.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, 
              ShardIteratorType='AFTER_SEQUENCE_NUMBER', 
              StartingSequenceNumber=sequence_number) 

records = KINESIS_CLIENT.get_records(ShardIterator=shard_iterator, Limit=2500) 

Alles ist dokumentiert: http://boto3.readthedocs.io/en/latest/reference/services/kinesis.html