2016-04-14 3 views

Antwort

0

Ja sollte es möglich sein. Eine Möglichkeit, dies zu tun, ist die Kontrolle über das Offset-Management.

Ihr Kunde kann eine Nachricht von Kafka um eine Uhrzeit lesen und diese als Objekt in AWS ablegen, während der Offset + Partitionsname als Schlüssel in der AWS gespeichert wird. Nun sagen wir, Ihr Client ist abgestürzt. Wenn Sie das nächste Mal auftauchen, fragen Sie S3, um herauszufinden, was der letzte Versatz in S3 ist, und beginnen Sie, die Nachricht von dort zu lesen. Für zusätzlichen Schutz, bevor Sie Nachricht in S3 setzen, überprüfen Sie, ob Objekt mit diesem Schlüssel (Es wäre besser, wenn Ihr Produzent UUID für Nachricht produziert und Sie können das verwenden) existiert in S3, wenn ja, überschreiben Sie es nicht, stattdessen überspringen Sie die Nachricht.

kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() { 
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {} 
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
     Iterator<TopicPartition> topicPartitionIterator = partitions.iterator(); 
     while(topicPartitionIterator.hasNext()){ 
       TopicPartition topicPartition = topicPartitionIterator.next(); 
       System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is kafkaConsumer.committed(topicPartition) 
       System.out.println("Resetting offset to " + startingOffset); 
       kafkaConsumer.seek(topicPartition, startingOffset); 
      } 
     } 
     } 
    }); 

Hoffnung, die

+0

hilft Ja, das ist Ihre Idee möglich. Aber es gibt vielleicht viele Offset + Partition Schlüssel in S3, Abfrage der letzte Offset wird langsamer und langsamer. Und check key existiert in s3 ist auch nicht einfach außer für einige mem db. Darüber hinaus benötigen wir Daten der Gruppe s3 in der Datumszeit, um Daten bei Bedarf in einem bestimmten Datumsbereich wiederherzustellen. Wie man s3 Schlüssel dann entwirft? –