2016-05-11 15 views
1

Ich versuche, eine einfache Anwendung zu erstellen, die Daten von AWS Kinesis liest. Ich habe es geschafft, Daten mit einem einzigen Shard zu lesen, aber ich möchte Daten von 4 verschiedenen Shards bekommen.kinesis Abrufen von Daten aus mehreren Scherben

Problem ist, dass ich eine While-Schleife haben, die so lange iteriert wie die Scherbe aktiv ist, die mich aus verschiedenen Scherben Daten aus der Lektüre verhindert. Bisher konnte ich keinen alternativen Algorithmus finden und auch keine KCL-basierte Lösung implementieren. Vielen Dank im Voraus

public static void DoSomething() { 
     AmazonKinesisClient client = new AmazonKinesisClient(); 
     //noinspection deprecation 
     client.setEndpoint(endpoint, serviceName, regionId); 
     /** get shards from the stream using describe stream method*/ 

     DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); 
     describeStreamRequest.setStreamName(streamName); 
     List<Shard> shards = new ArrayList<>(); 
     String exclusiveStartShardId = null; 
     do { 
      describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); 
      DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest); 
      shards.addAll(describeStreamResult.getStreamDescription().getShards()); 
      if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { 
       exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); 
      } else { 
       exclusiveStartShardId = null; 
      } 
     }while (exclusiveStartShardId != null); 

     /** shards obtained */ 
     String shardIterator; 

     GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); 
     getShardIteratorRequest.setStreamName(streamName); 
     getShardIteratorRequest.setShardId(shards.get(0).getShardId()); 
     getShardIteratorRequest.setShardIteratorType("LATEST"); 

     GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); 
     shardIterator = getShardIteratorResult.getShardIterator(); 
     GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); 

     while (!shardIterator.equals(null)) { 
      getRecordsRequest.setShardIterator(shardIterator); 
      getRecordsRequest.setLimit(250); 
      GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); 
      List<Record> records = getRecordsResult.getRecords(); 

      shardIterator = getRecordsResult.getNextShardIterator(); 
      if(records.size()!=0) { 
       for(Record r : records) { 
        System.out.println(r.getPartitionKey()); 
       } 
      } 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 

      } 
     } 
    } 

Antwort

1

Es wird empfohlen, dass Sie nicht von einem einzigen Prozess/Arbeitern aus mehreren Scherben lesen. Erstens, wie Sie sehen, trägt dies zur Komplexität Ihres Codes bei, aber vor allem werden Sie Probleme beim Hochskalieren haben.

Das „Geheimnis“ Skalierbarkeit ist klein und unabhängige Arbeiter oder andere solche Einheiten zu haben. Ein solches Design können Sie in Hadoop, DynamoDB oder Kinesis in AWS sehen. Es ermöglicht Ihnen, kleine Systeme (Micro-Services) zu bauen, die bei Bedarf einfach auf- und abwärts skaliert werden können. Sie können problemlos weitere Arbeitseinheiten/Daten hinzufügen, wenn Ihr Service erfolgreicher wird oder andere Nutzungsschwankungen auftreten.

Wie Sie in diesen AWS-Dienste sehen können, können Sie diese Skalierbarkeit manchmal kann in DynamoDB, automatisch so zu bekommen und manchmal braucht man Scherben auf Ihre kinesis Streams hinzuzufügen. Aber für Ihre Anwendung müssen Sie irgendwie Ihre Skalierbarkeit kontrollieren.

Im Fall von Kinesis können Sie nach oben und unten mit AWS Lambda oder Kinesis-Client-Bibliothek (KCL) skaliert werden. Beide hören den Status Ihrer Streams ab (Anzahl der Shards und Ereignisse) und verwenden sie, um Worker hinzuzufügen oder zu entfernen und die Ereignisse zur Verarbeitung bereitzustellen. In diesen beiden Lösungen sollten Sie einen Worker erstellen, der gegen einen einzelnen Shard arbeitet.

Wenn Sie Ereignisse von mehreren Shards abgleichen müssen, können Sie dies mit einem Zustandsdienst wie Redis oder DynamoDB tun.

+0

das Problem ist, ich keine Daten lesen kann, habe ich die Scherben nicht einmal abrufen kann, wenn ich einen Arbeiter verwenden. Wollte "consumeShard" Methode verwenden, aber nicht ausreichen. Ich habe versucht, das aws-Beispiel für Kinesis-Anwendungen anzupassen, aber es war zu viel auf DynamoDB abhängig. Ich möchte meine Objekte oder die Anzahl der Zählungen nicht in der Dynamotabelle speichern. – emrahozkan