2017-04-01 1 views
2

Ich versuche, Daten aus Akkumulator Speicher mit GeoMesa Native API einfügen und lesen. Ich habe eine Klassendatei erstellt, um geomesa accumulo nativ zu verwenden. Hier ist mein Java-Code:Verwenden von GeoMesa Native API zum Einfügen von Daten in akkumulo

package org.locationtech.geomesa.api; 

import com.google.common.base.Function; 
import com.google.common.collect.ImmutableMap; 
import com.google.common.collect.Iterables; 
import com.google.common.collect.Lists; 
import com.google.gson.Gson; 
import com.vividsolutions.jts.geom.Coordinate; 
import com.vividsolutions.jts.geom.Geometry; 
import com.vividsolutions.jts.geom.GeometryFactory; 
import org.apache.accumulo.core.client.Connector; 
import org.apache.accumulo.core.client.mock.MockInstance; 
import org.apache.accumulo.core.client.security.tokens.PasswordToken; 
import org.apache.accumulo.core.security.Authorizations; 
import org.geotools.factory.CommonFactoryFinder; 
import org.geotools.feature.AttributeTypeBuilder; 
import org.geotools.geometry.jts.JTSFactoryFinder; 
import org.junit.Assert; 
import org.junit.Test; 
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$; 
import org.locationtech.geomesa.utils.index.IndexMode$; 
import org.opengis.feature.simple.SimpleFeature; 
import org.opengis.feature.type.AttributeDescriptor; 
import org.opengis.filter.FilterFactory2; 

import javax.annotation.Nullable; 
import java.time.ZonedDateTime; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.Map; 
import java.util.SortedSet; 
import java.util.TreeSet; 

public class WorkerBeta { 
    public static void main(String[] args){ 
     try { 
      DomainObjectValueSerializer dovs = new DomainObjectValueSerializer(); 
      final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
      "aj_v14", 
      "localhost:2181", 
      "hps", 
      "root", "9869547580", 
      false, 
      dovs, 
      new SimpleFeatureView<DomainObject>() { 
       AttributeTypeBuilder atb = new AttributeTypeBuilder(); 
       private List<AttributeDescriptor> attributeDescriptors = 
       Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId") 
        , atb.binding(String.class).buildDescriptor("dId") 
        , atb.binding(Integer.class).buildDescriptor("s") 
        , atb.binding(Integer.class).buildDescriptor("a") 
        , atb.binding(Integer.class).buildDescriptor("e") 
       ); 
       @Override 
       public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) { 
       f.setAttribute("rId", domainObject.rideId); 
       f.setAttribute("dId", domainObject.deviceId); 
       f.setAttribute("s", domainObject.speed); 
       f.setAttribute("a", domainObject.angle); 
       f.setAttribute("e", domainObject.error); 
       } 

       @Override 
       public List<AttributeDescriptor> getExtraAttributes() { 
       return attributeDescriptors; 
       } 
      } 
     ); 

     //Inserting 
     final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1); 
     final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory(); 
     System.out.println(index.insert(
       one, 
       gf.createPoint(new Coordinate(-74.0, 34.0)), 
       date("2017-03-31T01:15:00.000Z") 
      )); 

      //Read 
      GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder() 
       .within(-90.0, -180, 90, 180) 
       .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z")) 
       .build(); 
      Iterable<DomainObject> results = index.query(q); 
      int counter = 0; 
      for(DomainObject dm : results){ 
       counter += 1; 
       System.out.println("result counter: " + counter); 
       dovs.toBytes(dm); 
      } 
     } 
     catch (Exception ex){ 
     ex.printStackTrace(); 
     } 
    } 
    public static class DomainObject { 
     public final int rideId; 
     public final String deviceId; 
     public final int angle; 
     public final int speed; 
     public final int error; 

     public DomainObject(int rideId, String deviceId, int angle, int speed, int error) { 
      this.rideId = rideId; 
      this.deviceId = deviceId; 
      this.angle = angle; 
      this.speed = speed; 
      this.error = error; 
     } 
    } 
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> { 
     public static final Gson gson = new Gson(); 
     @Override 
     public byte[] toBytes(DomainObject o) { 
      return gson.toJson(o).getBytes(); 
     } 
     @Override 
     public DomainObject fromBytes(byte[] bytes) { 
      return gson.fromJson(new String(bytes), DomainObject.class); 
     } 
    } 
    public static Date date(String s) { 
     return Date.from(ZonedDateTime.parse(s).toInstant()); 
    } 
} 

Logs für Befehl:

[email protected]:~/GeomesaAccumuloNativeClient $ java -cp target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar org.locationtech.geomesa.api.WorkerBeta 
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files. 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:host.name=192.168.1.103 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_121 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.class.path=target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/Users/suresh/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/var/folders/yk/h858t8h57nz42t6t4nqmwhcc0000gp/T/ 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA> 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.name=Mac OS X 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.arch=x86_64 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.version=10.12.3 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.name=suresh 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.home=/Users/suresh 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.dir=/Users/suresh/GeomesaAccumuloNativeClient 
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 w[email protected]73eb439a 
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15aea0c41f601a1, negotiated timeout = 30000 
17/04/01 15:11:52 WARN data.AccumuloDataStore: Configured server-side iterators do not match client version - client version: 1.3.2-SNAPSHOT, server version: 1.3.0 
50fa12fb-11f8-4776-bb35-95b32da9225d 
[] 

Aber wenn ich versuche, den eingefügten Datensatz, um zu überprüfen, ich bin nicht in der Lage irgendwelche spezifischen Einträge in erstellt mit Bezug zu finden eingefügt, um Daten in Tabellen der accumulo Webschnittstelle. Hier ist der Screenshot für die Accumulo-Tabellen enter image description here. Bitte korrigieren Sie mich, wenn ich etwas vermisse. Tonnenweise Dank im Voraus.

Antwort

1

Wahrscheinlich wird Ihre Einfügung nicht auf Platte gespült. Accumulo verwendet einen Batch-Writer für die Leistung - dies wird in regelmäßigen Abständen auf die Festplatte schreiben, sobald der interne Puffer gefüllt ist. Da Sie nur einen einzelnen Datensatz einfügen, geschieht dies nicht. Um zu beheben, können Sie close auf Ihrer GeoMesaIndex Instanz aufrufen. Dadurch werden alle vorhandenen Datensätze auf die Festplatte gelöscht. Sie müssten dann eine neue Instanz für Ihre Abfrage instanziieren.

+0

Großartig !!. Es funktioniert. Können Sie mir den Hinweis auf diese Information geben oder wie Sie zu dieser Antwort gekommen sind. –

+0

Kann ich flush() verwenden, ohne den Index zu schließen ?. Ich höre einen Warteschlangenkanal für die Einfügeanforderung und möchte kein Indexobjekt für jede Anfrage erstellen. –

+0

Ich habe versucht, flush() aufzurufen, aber es funktioniert nicht. Nach der [Geschichte] (https://geomesa.atlassian.net/browse/GEOMESA-885?focusedCommentId=16309&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16309) können wir senden Mutationen zu akkumulieren, ohne den Schreiber zu schließen. –

1

Zwei schnelle Notizen:

  1. Ihre Art nicht über kein Feld namens ‚dtg‘ und GeoMesaQuery geht man davon aus. Um dies einfach zu umgehen, können Sie 'GeoMesaQuery.GeoMesaQueryBuilder.builder(). Include(). Build()' verwenden. Auf lange Sicht könnte die native API einige Verbesserungen verwenden, um es einfach zu machen, was Sie fließend wollen.

  2. Um zu sehen, ob die Datensätze in Accumulo geschrieben wurden, können Sie die Accumulo-Shell verwenden und die einzelnen Tabellen scannen. Wenn in den Tabellen nichts vorhanden ist, kann es sich lohnen, dieses code zu debuggen.

Verwandte Themen