2014-06-21 14 views
5

Ich bin neu in Kinesis. Ich lese die Dokumentation aus, die ich gefunden habe, ich kann den Kinesis-Strom erzeugen, um Daten vom Produzenten zu erhalten. Dann liest KCL diese Daten von Stream zur weiteren Verarbeitung. Ich verstehe, wie Sie die KCL-Anwendung schreiben, indem Sie IRecordProcessor implementieren.Wie man Daten vom Server zu Kinesis Stream setzt

Allerdings ist mir die erste Phase, wie Daten auf Kinesis-Stream setzen, noch nicht klar. Haben wir eine AWS-API, die eine Implementierung benötigt, um dies zu erreichen?

Szenarien: Ich habe einen Server, der kontinuierlich Daten aus verschiedenen Quellen in den Ordnern abruft. Jeder Ordner enthält die Textdatei, deren Zeilen die erforderlichen Attribute für weitere analytische Arbeiten enthalten. Ich muss alle diese Daten zu Kinesis Stream schieben.

Ich brauche Code etwas wie unten unten Klasse putData Methode wil

public class Put { 

    AmazonKinesisClient kinesisClient; 

    Put() 
    { 
     String accessKey = "My Access Key here" ; 
     String secretKey = "My Secret Key here" ; 
     AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); 
     kinesisClient = new AmazonKinesisClient(credentials); 
     kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1"); 
     System.out.println("starting the Put Application"); 
    } 

    public void putData(String fileContent,String session) throws Exception 
    { 
     final String myStreamName = "ClickStream"; 

      PutRecordRequest putRecordRequest = new PutRecordRequest(); 
      putRecordRequest.setStreamName(myStreamName); 
      String putData = fileContent; 
      putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes())); 
      putRecordRequest.setPartitionKey("session"+session); 
      PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest); 
      System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey() 
        + ", ShardID : " + putRecordResult.getShardId()); 
      System.out.println(fileContent); 
      System.out.println("Sequence Number: "+putRecordResult.getSequenceNumber()); 

      System.out.println("Data has been PUT successfully"); 


    } 
} 

jedoch Lesen der Datei aus dem Quellordner vom Server aus in Kinesis Strom verwendet werden und dann, was Design i putData anrufen verwenden soll Holen Sie sich den Rekord auf Kinesis-Stream. Benötige ich eine Endlosschleife und lese alle Dateien und mache dann dieses oder irgendein Framework, das dies besser mit der Sorge der Fehlertoleranz, Einzelpunkt des Scheiterns alles tut. Jede Hilfe würde sehr geschätzt werden.

Kurz: Ich brauche eine bessere Technik, um regelmäßig generierte Daten in Kinesis Stream zu legen, die Daten werden in regelmäßigen Abständen zum Server generiert. Dank

Antwort

2

So scheint es, Sie sind bereits mit ... http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html

spezifische Methode, die Sie wollen, ist wie folgt.

Sie benötigen einen Stream-Namen, einen Datensatz und einen Stream-Schlüssel. http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html

Aber es scheint, dass Sie das alles haben?

Sie müssten dann ein Programm ausführen, das immer Ihre Server-Protokolldatei abarbeitet, und wenn es jemals eine neue Zeile gibt, wird es dies tun.

Aber Ihre Daten werden nur für 24 Stunden sitzen. Sie benötigen dann ein Worker-Programm, um die Daten zu verwenden und sie in einer anderen AWS-Ressource zu speichern.

+0

Ja i Daten S3 von Kinesis bewege. Ich suchte nach einer vorgefertigten Lösung, um jeden Tag die Dateien aus dem Ordner von meinem Server zu lesen und alle diese Daten in den Kinesis-Stream zu stellen. Nun, in meinem Server habe ich mehrere Ordner für verschiedene Daten und jeden Tag enthält viele Dateien mit Protokollinformationen. Ich möchte dies auf Kinesis-Stream übertragen. Auf dieser Ebene denke ich, dass ich ein einfaches Programm mit Endlosschleife schreiben kann, mit etwas Thread-Verzögerung, um die Ereignisse weiter zu lesen und nach Kinesis zu gehen, wenn eine bereits bewiesene Lösung nicht vorhanden ist. Danke – Sam

+0

Darf RabbitMQ verwenden, um Daten zu Kinesis Stream zu setzen. ? – Sam

+0

Amazon bietet kein Out-of-the-Box-Push-Programm. Sie müssen es selbst erstellen. Keine Ahnung von RabbitMQ –

0

Wenn Sie Protokolldateien einlesen möchten, versuchen Sie es bitte mit Fluentd. Fluentd kann Dateien fortlaufend protokollieren und Daten puffern, verschlüsseln, komprimieren und erneut versuchen.

Fluentd des Kinesis Plugins wird von Amazon Web Services selbst entwickelt.

Verwandte Themen