2016-06-08 7 views
1

I GIS-Daten haben, die wie folgt aussieht -generieren Verbund hbase RowKey mit Flume Serializer

'111, 2011-02-01 20:30:30, 116.50443, 40.00951' 
'111, 2011-02-01 20:30:31, 116.50443, 40.00951' 
'112, 2011-02-01 20:30:30, 116.58197, 40.06665' 
'112, 2011-02-01 20:30:31, 116.58197, 40.06665' 

Erste Spalte ist driver_id, zweite ist timestamp, dritte ist longitude & vierte ist latitude.

Ich nehme diese Art von Daten mit Flume & meine Spüle ist HBase (Typ - AsyncHBaseSink).
Standardmäßig weist HBase einen Zeilenschlüssel als erste Spalte zu (wie 111). Ich möchte einen zusammengesetzten Zeilenschlüssel erstellen (wie die Kombination der ersten beiden Spalten 111_2011-02-01 20:30:30).
Ich habe versucht, die erforderlichen Änderungen in 'AsyncHbaseLogEventSerializer.java', aber sie wurden nicht wiedergegeben.

Bitte schlagen Sie vor, wie ich das gleiche machen kann.

+0

Verbund RowKey arbeiten und sein sollte normalen solchen Schlüssel zu verwenden. Kannst du bitte dein Code-Snippet einfügen, wie machst du das? –

+0

bitte überprüfen Sie die Beispielschnipsel, nach meiner Erfahrung sollte es möglich sein (wenn Sie nicht einfache Fehler gemacht haben) :-) –

+0

: War meine Antwort hilfreich. –

Antwort

2

Composite-Schlüssel sollte

Unten in AsyncHbaseSerializer arbeiten, ist der Beispielcode-Schnipsel.

auf Klassenebene deklarieren privae List<PutRequest> puts = null;

/** 
    * Method joinRowKeyContent. (with EMPTY string separation) 
    * 
     * Joiner is google guava class 
    * @param objArray Object... 
    * 
    * @return String 
    */ 
    public static String joinRowKeyContent(Object... objArray) { 
     return Joiner.on("").appendTo(new StringBuilder(), objArray).toString(); 
    } 

/** 
    * Method preParePutRequestForBody. 
    * 
    * @param rowKeyBytes 
    * @param timestamp 
    */ 
    private void preParePutRequest(final byte[] rowKeyBytes, final long timestamp) { 
     // Process 

      LOG.debug("Processing ..." + Bytes.toString(rowKeyBytes)); 

     final PutRequest putreq = new PutRequest(table, rowKeyBytes, colFam, Bytes.toBytes("yourcolumn"), yourcolumnasBytearray, timestamp); 
     puts.add(putreq); 
    } 

Ihre get Aktionen Methode sieht aus wie ...

@Override 
     public List<PutRequest> getActions() { 
//create rowkey like this 
    final String rowKey = joinRowKeyContent(driver_id, timestamp, longitude , latitude); 

    // call prepare put requests method here 
    final byte[] rowKeyBytes = Bytes.toBytes(rowKey); 
       puts.clear(); 
    preParePutRequest(rowKeyBytes ,<timestamp>) 
      return puts; 
     }