Ich versuche, eine einfache Anwendung zu erstellen, die Daten von AWS Kinesis liest. Ich habe es geschafft, Daten mit einem einzigen Shard zu lesen, aber ich möchte Daten von 4 verschiedenen Shards bekommen.kinesis Abrufen von Daten aus mehreren Scherben
Problem ist, dass ich eine While-Schleife haben, die so lange iteriert wie die Scherbe aktiv ist, die mich aus verschiedenen Scherben Daten aus der Lektüre verhindert. Bisher konnte ich keinen alternativen Algorithmus finden und auch keine KCL-basierte Lösung implementieren. Vielen Dank im Voraus
public static void DoSomething() {
AmazonKinesisClient client = new AmazonKinesisClient();
//noinspection deprecation
client.setEndpoint(endpoint, serviceName, regionId);
/** get shards from the stream using describe stream method*/
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
List<Shard> shards = new ArrayList<>();
String exclusiveStartShardId = null;
do {
describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
shards.addAll(describeStreamResult.getStreamDescription().getShards());
if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
} else {
exclusiveStartShardId = null;
}
}while (exclusiveStartShardId != null);
/** shards obtained */
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shards.get(0).getShardId());
getShardIteratorRequest.setShardIteratorType("LATEST");
GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
while (!shardIterator.equals(null)) {
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(250);
GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
shardIterator = getRecordsResult.getNextShardIterator();
if(records.size()!=0) {
for(Record r : records) {
System.out.println(r.getPartitionKey());
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
das Problem ist, ich keine Daten lesen kann, habe ich die Scherben nicht einmal abrufen kann, wenn ich einen Arbeiter verwenden. Wollte "consumeShard" Methode verwenden, aber nicht ausreichen. Ich habe versucht, das aws-Beispiel für Kinesis-Anwendungen anzupassen, aber es war zu viel auf DynamoDB abhängig. Ich möchte meine Objekte oder die Anzahl der Zählungen nicht in der Dynamotabelle speichern. – emrahozkan