2017-05-16 2 views
0

Ich dränge Kafka-Stream in Druide über die Ruhe. Kafka-Version ist 0.9.1, Ruhe ist 0.8, Druide ist 0.10. Ruhe gestartet gut, wenn keine Meldung erzeugt, aber wenn Produzent Nachricht senden, wird i JsonMappingException wie diese:Ausnahme wirft, wenn Daten über Ruhe in Druide einfügen

ava.lang.IllegalArgumentException: Can not deserialize instance of java.util.ArrayList out of VALUE_STRING token 
at [Source: N/A; line: -1, column: -1] 
    at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:2774) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6] 
    at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:2700) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6] 
    at com.metamx.tranquility.druid.DruidBeams$.makeFireDepartment(DruidBeams.scala:406) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:291) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:199) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.KafkaBeamUtils$.createTranquilizer(KafkaBeamUtils.scala:40) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.KafkaBeamUtils.createTranquilizer(KafkaBeamUtils.scala) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.<init>(TranquilityEventWriter.java:64) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.writer.WriterController.createWriter(WriterController.java:171) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.writer.WriterController.getWriter(WriterController.java:98) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_67] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_67] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67] 

und meine kafka.json ist:

{ 
    "dataSources" : { 
    "stock-index-topic" : { 
     "spec" : { 
     "dataSchema" : { 
      "dataSource" : "stock-index-topic", 
      "parser" : { 
      "type" : "string", 
      "parseSpec" : { 
       "timestampSpec" : { 
       "column" : "timestamp", 
       "format" : "auto" 
       }, 
       "dimensionsSpec" : { 
       "dimensions" : ["code","name","acronym","market","tradeVolume","totalValueTraded","preClosePx","openPrice","highPrice","lowPrice","tradePrice","closePx","timestamp"], 
       "dimensionExclusions" : [ 
        "timestamp", 
        "value" 
       ] 
       }, 
       "format" : "json" 
      } 
      }, 
      "granularitySpec" : { 
      "type" : "uniform", 
      "segmentGranularity" : "DAY", 
      "queryGranularity" : "none", 
      "intervals":"no" 
      }, 
      "metricsSpec" : [ 
      { 
       "name" : "firstPrice", 
       "type" : "doubleFirst", 
       "fieldName" : "tradePrice" 
      },{ 
       "name" : "lastPrice", 
       "type" : "doubleLast", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "minPrice", 
       "type" : "doubleMin", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "maxPrice", 
       "type" : "doubleMax", 
       "fieldName" : "tradePrice" 
      } 
      ] 
     }, 
     "ioConfig" : { 
      "type" : "realtime" 
     }, 
     "tuningConfig" : { 
      "type" : "realtime", 
      "maxRowsInMemory" : "100000", 
      "intermediatePersistPeriod" : "PT10M", 
      "windowPeriod" : "PT10M" 
     } 
     }, 
     "properties" : { 
     "task.partitions" : "1", 
     "task.replicants" : "1", 
     "topicPattern" : "stock-index-topic" 
     } 
    } 
    }, 
    "properties" : { 
    "zookeeper.connect" : "localhost:2181", 
    "druid.discovery.curator.path" : "/druid/discovery", 
    "druid.selectors.indexing.serviceName" : "druid/overlord", 
    "commit.periodMillis" : "15000", 
    "consumer.numThreads" : "2", 
    "kafka.zookeeper.connect" : "localhost:2181", 
    "kafka.group.id" : "tranquility-kafka" 
    } 
} 

Ich verwende die kafka-Konsolen- Verbraucher, um die Daten zu erhalten, sieht es aus wie

{"code": "399982", "name": "500等权", "acronym": "500DQ", "market": "102", "tradeVolume": 0, "totalValueTraded": 0.0, "preClosePx": 0.0, "openPrice": 0.0, "highPrice": 0.0, "lowPrice": 0.0, "tradePrice": 7184.7142, "closePx": 0.0, "timestamp": "2017-05-16T09:06:39.000+08:00"} 

Irgendeine Idee warum? Vielen Dank.

Antwort

0
"metricsSpec" : [ 
      { 
       "name" : "firstPrice", 
       "type" : "doubleFirst", 
       "fieldName" : "tradePrice" 
      },{ 
       "name" : "lastPrice", 
       "type" : "doubleLast", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "minPrice", 
       "type" : "doubleMin", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "maxPrice", 
       "type" : "doubleMax", 
       "fieldName" : "tradePrice" 
      } 
      ] 
     }, 

es falsch.Die Dokument sagte: Erste und letzter Aggregator nicht in Einnahme spec verwendet werden können, und nur im Rahmen von Anfragen angegeben werden sollen. Also, das Problem ist gelöst.