2017-09-23 4 views
0

ich mit einem Anfang und Ende Reihe auf Bigtable zu einem Scan versuchen. Die Elemente zwischen dem Scan sind ungefähr 100K. Ich möchte sie in Chargen, die ich in HBase mit Hilfe von verwenden konnte.Hbase vs Google Bigtable: Scan für große Anzahl von Zeilen

In Bigtable, wie es scheint setCaching ignoriert und es versucht, die gesamte resultset in 1 RPC zu bekommen. Wie kann es ähnlich wie HBase erreicht werden?

Ich bin mit Java-Treiber bigtable-hbase-1.1 und Version 1.0.0-pre3

Bigtable Konfiguration:

Configuration conf = new Configuration(); 
conf.set("google.bigtable.buffered.mutator.throttling.enable", "false"); 
conf.set("google.bigtable.rpc.timeout.ms", "1500000"); 
conf.set("google.bigtable.grpc.read.partial.row.timeout.ms","1500000"); 
conf.set("google.bigtable.long.rpc.timeout.ms", "1500000"); 
conf.set("google.bigtable.grpc.retry.deadlineexceeded.enable", "false"); 
conf.set("google.bigtable.buffered.mutator.max.inflight.rpcs", "500"); 
conf.set("google.bigtable.bulk.max.row.key.count", "500"); 

Configuration conff = BigtableConfiguration.configure(conf,projectID,instanceID); 
connection = BigtableConfiguration.connect(conff); 

Scanner-Konfiguration:

byte[] start = "prefix".getbytes() ; 
byte[] end = Bytes.add("prefix".getbytes(),(byte))0xff); 
Scan scan = new Scan(start, end); 

Erwartete Anzahl der Zeilen ist in der Größenordnung von 100 kS zu kommen .

Antwort

0

Sie müssen sich keine Gedanken über batching Sorgen machen, wenn Zeilen zu lesen. Die Bigtable-Antworten werden gestreamt und sind rückdruckbewusst. Wir verlassen uns auf GRPC, um auch Teile des Streams zu puffern. Hier ist ein Link zu einer Einführung über GRPC Streaming: https://grpc.io/docs/guides/concepts.html#server-streaming-rpc

Würde dagegen diesen Beispielcode versucht, und lassen Sie mich wissen, ob es funktioniert (dh keine Frist Fehler überschritten.). Wenn der Beispielcode funktioniert, ändern Sie ihn bitte, um Ihre eigenen Daten zu scannen und sicherzustellen, dass sie weiterhin funktionieren. Wenn etwas nicht stimmt, lass es mich wissen.

pom.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.google.cloud.example</groupId> 
    <artifactId>row-write-read-example</artifactId> 
    <version>1.0-SNAPSHOT</version> 

    <dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.google.cloud.bigtable</groupId> 
     <artifactId>bigtable-hbase-1.x</artifactId> 
     <version>1.0.0-pre3</version> 
    </dependency> 
    </dependencies> 

    <build> 
    <plugins> 
     <plugin> 
     <artifactId>maven-compiler-plugin</artifactId> 
     <version>3.6.2</version> 
     <configuration> 
      <source>1.8</source> 
      <target>1.8</target> 
     </configuration> 
     </plugin> 
    </plugins> 
    </build> 
</project> 

java:

import com.google.cloud.bigtable.hbase.BigtableConfiguration; 
import java.io.IOException; 
import org.apache.hadoop.hbase.HColumnDescriptor; 
import org.apache.hadoop.hbase.HConstants; 
import org.apache.hadoop.hbase.HTableDescriptor; 
import org.apache.hadoop.hbase.TableName; 
import org.apache.hadoop.hbase.client.Admin; 
import org.apache.hadoop.hbase.client.BufferedMutator; 
import org.apache.hadoop.hbase.client.Connection; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.client.Table; 

public class WriteReadTest { 
    private static final String PROJECT_ID = "<YOUR_PROJECT_ID>"; 
    private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>"; 
    private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>"; 
    private static final String FAMILY = "cf"; 

    private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID); 

    public static void main(String[] args) throws IOException { 
    try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID); 
     Admin admin = connection.getAdmin()) { 

     // Setup 
     admin.createTable(
      new HTableDescriptor(TABLE_NAME) 
       .addFamily(new HColumnDescriptor(FAMILY)) 
    ); 

     try { 
     // Write the rows 
     populateTable(connection, 2_000_000); 

     // Read the rows 
     readFullTable(connection); 
     } finally { 
     admin.disableTable(TABLE_NAME); 
     admin.deleteTable(TABLE_NAME); 
     } 

    } 
    } 

    private static void populateTable(Connection connection, int rowCount) throws IOException { 
    long startTime = System.currentTimeMillis(); 
    int buckets = 100; 
    int maxWidth = Integer.toString(buckets).length(); 

    try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) { 
     for (int i = 0; i < rowCount; i++) { 
     String prefix = String.format("%0" + maxWidth + "d", i % buckets); 
     String key = prefix + "-" + String.format("%010d", i); 
     String value = "value-" + key; 

     Put put = new Put(key.getBytes()) 
      .addColumn(
       FAMILY.getBytes(), 
       HConstants.EMPTY_BYTE_ARRAY, 
       value.getBytes() 
      ); 

     bufferedMutator.mutate(put); 
     } 
    } 

    long endTime = System.currentTimeMillis(); 
    System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime)/1000, rowCount); 
    } 

    private static void readFullTable(Connection connection) throws IOException { 
    long startTime = System.currentTimeMillis(); 

    int count = 0; 
    try(Table table = connection.getTable(TABLE_NAME); 
     ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) { 

     for(Result row = scanner.next(); row != null; row = scanner.next()) { 
     count++; 
     } 
    } 

    long endTime = System.currentTimeMillis(); 

    System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime)/1000, count); 
    } 
} 
+0

ich lange RPC-Timeout von 5 Minuten und noch SCHLUSS immer überschritten Fehler gehalten haben. für nur 100k Reihen? – Peter

+0

Gibt es eine Möglichkeit, die Buffer-Chunk-Größe zu optimieren? Vielleicht ist es zu klein. In meinem Fall, da Client ist in Singapur und BigTable in Taiwan eine einzige Rundfahrt dauert ca. 50 ms – Peter

+0

Oder ist es, weil Bigtable zuerst das gesamte Ergebnis auf dem Server erhalten und es dann an den Client übertragen. Ich nehme an, dann könnte das gesamte Ergebnis-Set zeitaufwendig sein? – Peter

Verwandte Themen