2017-04-04 1 views
0

Ich schreibe Daten in den nano-Speicher unter Verwendung von Geomesa Native Client. Hier ist mein Java-CodeAusführen der Flush-Methode, um Mutationen an acumulo zu senden, ohne den Schreiber zu schließen

package org.locationtech.geomesa.api; 

import com.google.common.base.Function; 
import com.google.common.collect.ImmutableMap; 
import com.google.common.collect.Iterables; 
import com.google.common.collect.Lists; 
import com.google.gson.Gson; 
import com.vividsolutions.jts.geom.Coordinate; 
import com.vividsolutions.jts.geom.Geometry; 
import com.vividsolutions.jts.geom.GeometryFactory; 
import org.apache.accumulo.core.client.Connector; 
import org.apache.accumulo.core.client.mock.MockInstance; 
import org.apache.accumulo.core.client.security.tokens.PasswordToken; 
import org.apache.accumulo.core.security.Authorizations; 
import org.geotools.factory.CommonFactoryFinder; 
import org.geotools.feature.AttributeTypeBuilder; 
import org.geotools.geometry.jts.JTSFactoryFinder; 
import org.junit.Assert; 
import org.junit.Test; 
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex; 
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$; 
import org.locationtech.geomesa.utils.index.IndexMode$; 
import org.opengis.feature.simple.SimpleFeature; 
import org.opengis.feature.type.AttributeDescriptor; 
import org.opengis.filter.FilterFactory2; 

import javax.annotation.Nullable; 
import java.time.ZonedDateTime; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.Map; 
import java.util.SortedSet; 
import java.util.TreeSet; 

public class WorkerBeta { 
    public static void main(String[] args){ 
     try { 
      DomainObjectValueSerializer dovs = new DomainObjectValueSerializer(); 
      final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
      "aj_v14", 
      "localhost:2181", 
      "hps", 
      "root", "9869547580", 
      false, 
      dovs, 
      new SimpleFeatureView<DomainObject>() { 
       AttributeTypeBuilder atb = new AttributeTypeBuilder(); 
       private List<AttributeDescriptor> attributeDescriptors = 
       Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId") 
        , atb.binding(String.class).buildDescriptor("dId") 
        , atb.binding(Integer.class).buildDescriptor("s") 
        , atb.binding(Integer.class).buildDescriptor("a") 
        , atb.binding(Integer.class).buildDescriptor("e") 
       ); 
       @Override 
       public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) { 
       f.setAttribute("rId", domainObject.rideId); 
       f.setAttribute("dId", domainObject.deviceId); 
       f.setAttribute("s", domainObject.speed); 
       f.setAttribute("a", domainObject.angle); 
       f.setAttribute("e", domainObject.error); 
       } 

       @Override 
       public List<AttributeDescriptor> getExtraAttributes() { 
       return attributeDescriptors; 
       } 
      } 
     ); 

     //Inserting 
     final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1); 
     final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory(); 
     System.out.println(index.insert(
       one, 
       gf.createPoint(new Coordinate(-74.0, 34.0)), 
       date("2017-03-31T01:15:00.000Z") 
      )); 

      //Read 
      GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder() 
       .within(-90.0, -180, 90, 180) 
       .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z")) 
       .build(); 
      Iterable<DomainObject> results = index.query(q); 
      int counter = 0; 
      for(DomainObject dm : results){ 
       counter += 1; 
       System.out.println("result counter: " + counter); 
       dovs.toBytes(dm); 
      } 
     } 
     catch (Exception ex){ 
      ex.printStackTrace(); 
     } 
     index.close(); 
    } 
    public static class DomainObject { 
     public final int rideId; 
     public final String deviceId; 
     public final int angle; 
     public final int speed; 
     public final int error; 

     public DomainObject(int rideId, String deviceId, int angle, int speed, int error) { 
      this.rideId = rideId; 
      this.deviceId = deviceId; 
      this.angle = angle; 
      this.speed = speed; 
      this.error = error; 
     } 
    } 
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> { 
     public static final Gson gson = new Gson(); 
     @Override 
     public byte[] toBytes(DomainObject o) { 
      return gson.toJson(o).getBytes(); 
     } 
     @Override 
     public DomainObject fromBytes(byte[] bytes) { 
      return gson.fromJson(new String(bytes), DomainObject.class); 
     } 
    } 
    public static Date date(String s) { 
     return Date.from(ZonedDateTime.parse(s).toInstant()); 
    } 
} 

Das Problem mit diesem Code ist, muss ich index Objekt jedes Mal für einen neuen Einsatz Anfrage erstellen und rufen index.close() das gleiche zu reflektieren. Aber ich kann insert() agin nicht ausführen, sobald index.close() aufgerufen wird. Allerdings akzeptiere ich die Einfügeanforderung aus der Warteschlange mit sehr hoher Rate, und ich möchte nicht jedes Mal das Objekt index erstellen. Wie kann ich das machen?

Kurz gesagt, wie ich Schreibvorgänge ohne Aufruf close() löschen kann.

Antwort

0

Ich habe Geomesa Client-Datei erstellt, um geomesa nativ zu verwenden. Unten ist die teilweise Implementierung des gleichen, die zeigt, wie Sie mit AccumuloAppendFeatureWriter bündig ohne Aufruf zum Schließen können.

public class GeomesaClient { 
    private AccumuloDataStore ds = null; 
    private AccumuloAppendFeatureWriter fw = null; 
    private SimpleFeatureSource sfs = null; 
    private String tableName = ""; 
    private FeatureStore fst = null; 
    private SimpleFeatureType sft; 

    public GeomesaClient(Map<String, String> dsConf) throws Exception { 
    this.ds = (AccumuloDataStore) DataStoreFinder.getDataStore(dsConf); 
    this.tableName = dsConf.get("tableName"); 

    sft = createFeatureType(); 
    if(!Arrays.asList(this.ds.getTypeNames()).contains(sft.getTypeName())){ 
     ds.createSchema(sft); 
    } 
    this.fst = (FeatureStore)sfs; 
    this.fw = (AccumuloAppendFeatureWriter) (this.ds.getFeatureWriterAppend(sft.getTypeName(), 
     Transaction.AUTO_COMMIT)); 
    this.sfs = ds.getFeatureSource(sft.getTypeName()); 
    } 
    /* 
     Flush with AccumuloAppendFeatureWriter 
    */ 
    public void flush(boolean force) { 
    fw.flush(); 
    } 
} 
Verwandte Themen