2017-10-16 3 views
1

Es scheint, dass StreamSets Datenkollektor falsche Datetime-Werte liest.StreamSets-Datenkollektor liest falsche Zeiten von Kafka

Ich habe versucht, einfaches Thema von Confluent zu lesen: wenn ich Datetime-Wert in Millisekunden mit Landoop Kafka Themen überprüfen - es zeigt korrekte Datetime, aber wenn ich es mit Kafka Consumer 0.10.0.0 in StreamSets lese - gibt es mir eine Datetime 3 Stunden und 20 Minuten kleiner. Meine aktuelle Zeitzone ist GMT + 03: 00.

Meine SDC-Einstellungen haben die richtige Zeitzone, Server OS hat auch eine, gerade Zeit: jetzt() in SDC-Funktion gibt das richtige Ergebnis!

Irgendwelche Vermutungen warum?

UPDATE - exportiert meinen Workflow

{ 
    "pipelineConfig" : { 
    "schemaVersion" : 4, 
    "version" : 7, 
    "pipelineId" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb", 
    "title" : "connector_test_data_file2", 
    "description" : "", 
    "uuid" : "13fea9f9-5253-432b-8bf4-09dfbe763dcd", 
    "configuration" : [ { 
     "name" : "executionMode", 
     "value" : "STANDALONE" 
    }, { 
     "name" : "deliveryGuarantee", 
     "value" : "AT_LEAST_ONCE" 
    }, { 
     "name" : "startEventStage", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1" 
    }, { 
     "name" : "stopEventStage", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1" 
    }, { 
     "name" : "shouldRetry", 
     "value" : true 
    }, { 
     "name" : "retryAttempts", 
     "value" : -1 
    }, { 
     "name" : "memoryLimit", 
     "value" : "${jvm:maxMemoryMB() * 0.65}" 
    }, { 
     "name" : "memoryLimitExceeded", 
     "value" : "STOP_PIPELINE" 
    }, { 
     "name" : "notifyOnStates", 
     "value" : [ "RUN_ERROR", "STOPPED", "FINISHED" ] 
    }, { 
     "name" : "emailIDs", 
     "value" : [ ] 
    }, { 
     "name" : "constants", 
     "value" : [ ] 
    }, { 
     "name" : "badRecordsHandling", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_recordstolocalfilesystem_ToErrorLocalFSDTarget::1" 
    }, { 
     "name" : "errorRecordPolicy", 
     "value" : "ORIGINAL_RECORD" 
    }, { 
     "name" : "workerCount", 
     "value" : 0 
    }, { 
     "name" : "clusterSlaveMemory", 
     "value" : 1024 
    }, { 
     "name" : "clusterSlaveJavaOpts", 
     "value" : "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug" 
    }, { 
     "name" : "clusterLauncherEnv", 
     "value" : [ ] 
    }, { 
     "name" : "mesosDispatcherURL", 
     "value" : null 
    }, { 
     "name" : "hdfsS3ConfDir", 
     "value" : null 
    }, { 
     "name" : "rateLimit", 
     "value" : 0 
    }, { 
     "name" : "maxRunners", 
     "value" : 0 
    }, { 
     "name" : "webhookConfigs", 
     "value" : [ ] 
    }, { 
     "name" : "sparkConfigs", 
     "value" : [ ] 
    }, { 
     "name" : "statsAggregatorStage", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget::1" 
    } ], 
    "uiInfo" : { 
     "previewConfig" : { 
     "previewSource" : "CONFIGURED_SOURCE", 
     "batchSize" : "1000", 
     "timeout" : 10000, 
     "writeToDestinations" : false, 
     "executeLifecycleEvents" : false, 
     "showHeader" : false, 
     "showFieldType" : true, 
     "rememberMe" : false 
     } 
    }, 
    "stages" : [ { 
     "instanceName" : "KafkaConsumer_01", 
     "library" : "streamsets-datacollector-apache-kafka_0_10-lib", 
     "stageName" : "com_streamsets_pipeline_stage_origin_kafka_KafkaDSource", 
     "stageVersion" : "5", 
     "configuration" : [ { 
     "name" : "kafkaConfigBean.dataFormatConfig.filePatternInArchive", 
     "value" : "*" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.charset", 
     "value" : "UTF-8" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.removeCtrlChars", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.textMaxLineLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.useCustomDelimiter", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.customDelimiter", 
     "value" : "\\r\\n" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.includeCustomDelimiterInTheText", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.jsonContent", 
     "value" : "MULTIPLE_OBJECTS" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.jsonMaxObjectLen", 
     "value" : 4096 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvFileFormat", 
     "value" : "CSV" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvHeader", 
     "value" : "NO_HEADER" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvMaxObjectLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvAllowExtraColumns", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvExtraColumnPrefix", 
     "value" : "_extra_" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCustomDelimiter", 
     "value" : "|" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCustomEscape", 
     "value" : "\\" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCustomQuote", 
     "value" : "\"" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvEnableComments", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCommentMarker", 
     "value" : "#" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvIgnoreEmptyLines", 
     "value" : true 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvRecordType", 
     "value" : "LIST_MAP" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvSkipStartLines", 
     "value" : 0 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.parseNull", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.nullConstant", 
     "value" : "\\\\N" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.xmlRecordElement", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.includeFieldXpathAttributes", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.xPathNamespaceContext", 
     "value" : [ ] 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.outputFieldAttributes", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.xmlMaxObjectLen", 
     "value" : 4096 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.logMode", 
     "value" : "COMMON_LOG_FORMAT" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.logMaxObjectLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.retainOriginalLine", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.customLogFormat", 
     "value" : "%h %l %u %t \"%r\" %>s %b" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.regex", 
     "value" : "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.fieldPathsToGroupName", 
     "value" : [ { 
      "fieldPath" : "/", 
      "group" : 1 
     } ] 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.grokPatternDefinition", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.grokPattern", 
     "value" : "%{COMMONAPACHELOG}" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.onParseError", 
     "value" : "ERROR" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.maxStackTraceLines", 
     "value" : 50 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.enableLog4jCustomLogFormat", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.log4jCustomLogFormat", 
     "value" : "%r [%t] %-5p %c %x - %m%n" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.avroSchemaSource", 
     "value" : "REGISTRY" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.avroSchema", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.schemaRegistryUrls", 
     "value" : [ "http://test-kafka01.mytest.com:8081" ] 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.schemaLookupMode", 
     "value" : "AUTO" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.subject", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.schemaId", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.protoDescriptorFile", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.messageType", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.isDelimited", 
     "value" : true 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.binaryMaxObjectLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.datagramMode", 
     "value" : "SYSLOG" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.typesDbPath", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.convertTime", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.excludeInterval", 
     "value" : true 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.authFilePath", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.wholeFileMaxObjectLen", 
     "value" : 8192 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.rateLimit", 
     "value" : "-1" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.verifyChecksum", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormat", 
     "value" : "AVRO" 
     }, { 
     "name" : "kafkaConfigBean.metadataBrokerList", 
     "value" : "test-kafka01:9092,test-kafka02:9092,test-kafka03:9092" 
     }, { 
     "name" : "kafkaConfigBean.zookeeperConnect", 
     "value" : "test-kafka01:2181,test-kafka02:2181,test-kafka03:2181" 
     }, { 
     "name" : "kafkaConfigBean.consumerGroup", 
     "value" : "connector_elastic" 
     }, { 
     "name" : "kafkaConfigBean.topic", 
     "value" : "test_data" 
     }, { 
     "name" : "kafkaConfigBean.produceSingleRecordPerMessage", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.maxBatchSize", 
     "value" : 1000 
     }, { 
     "name" : "kafkaConfigBean.maxWaitTime", 
     "value" : 2000 
     }, { 
     "name" : "kafkaConfigBean.maxRatePerPartition", 
     "value" : 1000 
     }, { 
     "name" : "kafkaConfigBean.keyDeserializer", 
     "value" : "CONFLUENT" 
     }, { 
     "name" : "kafkaConfigBean.valueDeserializer", 
     "value" : "CONFLUENT" 
     }, { 
     "name" : "kafkaConfigBean.kafkaConsumerConfigs", 
     "value" : [ ] 
     }, { 
     "name" : "stageOnRecordError", 
     "value" : "TO_ERROR" 
     } ], 
     "uiInfo" : { 
     "yPos" : 40.85545349121094, 
     "stageType" : "SOURCE", 
     "rawSource" : { 
      "configuration" : [ { 
      "name" : "brokerHost", 
      "value" : "localhost" 
      }, { 
      "name" : "brokerPort", 
      "value" : 9092 
      }, { 
      "name" : "topic", 
      "value" : "myTopic" 
      }, { 
      "name" : "partition", 
      "value" : 0 
      }, { 
      "name" : "maxWaitTime", 
      "value" : 1000 
      } ] 
     }, 
     "description" : "", 
     "label" : "input", 
     "xPos" : 184.59434509277344 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ "KafkaConsumer_01OutputLane15081523550290" ], 
     "eventLanes" : [ ] 
    }, { 
     "instanceName" : "LocalFS_01", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_localfilesystem_LocalFileSystemDTarget", 
     "stageVersion" : "3", 
     "configuration" : [ { 
     "name" : "configs.uniquePrefix", 
     "value" : "sdc-${sdc:id()}" 
     }, { 
     "name" : "configs.fileNameSuffix", 
     "value" : null 
     }, { 
     "name" : "configs.dirPathTemplateInHeader", 
     "value" : false 
     }, { 
     "name" : "configs.dirPathTemplate", 
     "value" : "/tmp/data_monitor/out/success/${YYYY()}-${MM()}-${DD()}-${hh()}" 
     }, { 
     "name" : "configs.timeZoneID", 
     "value" : "Europe/Moscow" 
     }, { 
     "name" : "configs.timeDriver", 
     "value" : "${time:now()}" 
     }, { 
     "name" : "configs.maxRecordsPerFile", 
     "value" : 0 
     }, { 
     "name" : "configs.maxFileSize", 
     "value" : 0 
     }, { 
     "name" : "configs.idleTimeout", 
     "value" : "${1 * HOURS}" 
     }, { 
     "name" : "configs.compression", 
     "value" : "NONE" 
     }, { 
     "name" : "configs.otherCompression", 
     "value" : null 
     }, { 
     "name" : "configs.fileType", 
     "value" : "TEXT" 
     }, { 
     "name" : "configs.keyEl", 
     "value" : "${uuid()}" 
     }, { 
     "name" : "configs.lateRecordsLimit", 
     "value" : "${1 * HOURS}" 
     }, { 
     "name" : "configs.rollIfHeader", 
     "value" : false 
     }, { 
     "name" : "configs.rollHeaderName", 
     "value" : "roll" 
     }, { 
     "name" : "configs.lateRecordsAction", 
     "value" : "SEND_TO_ERROR" 
     }, { 
     "name" : "configs.lateRecordsDirPathTemplate", 
     "value" : "/tmp/late/${YYYY()}-${MM()}-${DD()}" 
     }, { 
     "name" : "configs.dataFormat", 
     "value" : "JSON" 
     }, { 
     "name" : "configs.hdfsPermissionCheck", 
     "value" : true 
     }, { 
     "name" : "configs.permissionEL", 
     "value" : null 
     }, { 
     "name" : "configs.skipOldTempFileRecovery", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.charset", 
     "value" : "UTF-8" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvFileFormat", 
     "value" : "CSV" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvHeader", 
     "value" : "NO_HEADER" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvReplaceNewLines", 
     "value" : true 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvReplaceNewLinesString", 
     "value" : " " 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvCustomDelimiter", 
     "value" : "|" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvCustomEscape", 
     "value" : "\\" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvCustomQuote", 
     "value" : "\"" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.jsonMode", 
     "value" : "MULTIPLE_OBJECTS" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textFieldPath", 
     "value" : "/text" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textRecordSeparator", 
     "value" : "\\n" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textFieldMissingAction", 
     "value" : "ERROR" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textEmptyLineIfNull", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.avroSchemaSource", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.avroSchema", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.registerSchema", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaRegistryUrlsForRegistration", 
     "value" : [ ] 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaRegistryUrls", 
     "value" : [ ] 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaLookupMode", 
     "value" : "SUBJECT" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.subject", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.subjectToRegister", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaId", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.avroCompression", 
     "value" : "NULL" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.binaryFieldPath", 
     "value" : "/" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.protoDescriptorFile", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.messageType", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.fileNameEL", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.wholeFileExistsAction", 
     "value" : "TO_ERROR" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.includeChecksumInTheEvents", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.checksumAlgorithm", 
     "value" : "MD5" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.xmlPrettyPrint", 
     "value" : true 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.xmlValidateSchema", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.xmlSchema", 
     "value" : null 
     }, { 
     "name" : "stageOnRecordError", 
     "value" : "TO_ERROR" 
     }, { 
     "name" : "stageRequiredFields", 
     "value" : [ ] 
     }, { 
     "name" : "stageRecordPreconditions", 
     "value" : [ ] 
     } ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "local_sink", 
     "xPos" : 471.2611083984375, 
     "yPos" : 44.572269439697266, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ "KafkaConsumer_01OutputLane15081523550290" ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    } ], 
    "errorStage" : { 
     "instanceName" : "WritetoFile_ErrorStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_recordstolocalfilesystem_ToErrorLocalFSDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ { 
     "name" : "directory", 
     "value" : "/tmp/data_monitor/out/error" 
     }, { 
     "name" : "uniquePrefix", 
     "value" : "sdc-${sdc:id()}" 
     }, { 
     "name" : "rotationIntervalSecs", 
     "value" : "${1 * HOURS}" 
     }, { 
     "name" : "maxFileSizeMbs", 
     "value" : 512 
     } ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Error Records - Write to File", 
     "xPos" : 622, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    }, 
    "info" : { 
     "pipelineId" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb", 
     "title" : "connector_test_data_file2", 
     "description" : "", 
     "created" : 1508175563039, 
     "lastModified" : 1508177203011, 
     "creator" : "admin", 
     "lastModifier" : "admin", 
     "lastRev" : "0", 
     "uuid" : "13fea9f9-5253-432b-8bf4-09dfbe763dcd", 
     "valid" : true, 
     "metadata" : { 
     "labels" : [ ] 
     }, 
     "name" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb", 
     "sdcVersion" : "2.7.1.1", 
     "sdcId" : "a2869998-a9c9-11e7-94f2-31416694a1b6" 
    }, 
    "metadata" : { 
     "labels" : [ ] 
    }, 
    "statsAggregatorStage" : { 
     "instanceName" : "WritetoDPMdirectly_StatsAggregatorStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Stats Aggregator - Write to DPM directly", 
     "xPos" : 280, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    }, 
    "startEventStages" : [ { 
     "instanceName" : "Discard_StartEventStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Start Event - Discard", 
     "xPos" : 280, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    } ], 
    "stopEventStages" : [ { 
     "instanceName" : "Discard_StopEventStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Stop Event - Discard", 
     "xPos" : 280, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    } ], 
    "valid" : true, 
    "issues" : { 
     "stageIssues" : { }, 
     "pipelineIssues" : [ ], 
     "issueCount" : 0 
    }, 
    "previewable" : true 
    }, 
    "pipelineRules" : { 
    "schemaVersion" : 3, 
    "version" : 2, 
    "metricsRuleDefinitions" : [ { 
     "id" : "badRecordsAlertID", 
     "alertText" : "High incidence of Error Records", 
     "metricId" : "pipeline.batchErrorRecords.counter", 
     "metricType" : "COUNTER", 
     "metricElement" : "COUNTER_COUNT", 
     "condition" : "${value() > 100}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "stageErrorAlertID", 
     "alertText" : "High incidence of Stage Errors", 
     "metricId" : "pipeline.batchErrorMessages.counter", 
     "metricType" : "COUNTER", 
     "metricElement" : "COUNTER_COUNT", 
     "condition" : "${value() > 100}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "idleGaugeID", 
     "alertText" : "Pipeline is Idle", 
     "metricId" : "RuntimeStatsGauge.gauge", 
     "metricType" : "GAUGE", 
     "metricElement" : "TIME_OF_LAST_RECEIVED_RECORD", 
     "condition" : "${time:now() - value() > 120000}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "batchTimeAlertID", 
     "alertText" : "Batch taking more time to process", 
     "metricId" : "RuntimeStatsGauge.gauge", 
     "metricType" : "GAUGE", 
     "metricElement" : "CURRENT_BATCH_AGE", 
     "condition" : "${value() > 200}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "memoryLimitAlertID", 
     "alertText" : "Memory limit for pipeline exceeded", 
     "metricId" : "pipeline.memoryConsumed.counter", 
     "metricType" : "COUNTER", 
     "metricElement" : "COUNTER_COUNT", 
     "condition" : "${value() > (jvm:maxMemoryMB() * 0.65)}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    } ], 
    "dataRuleDefinitions" : [ ], 
    "driftRuleDefinitions" : [ ], 
    "uuid" : "6ab2f9cf-0043-48e7-8e5f-1a3d2ec86d4d", 
    "configuration" : [ { 
     "name" : "emailIDs", 
     "value" : [ ] 
    }, { 
     "name" : "webhookConfigs", 
     "value" : [ ] 
    } ], 
    "configIssues" : [ ], 
    "ruleIssues" : [ ] 
    }, 
    "libraryDefinitions" : null 
} 
+0

Zeigen Sie Ihren Code! – Clonkex

+0

Nicht sicher, warum das so wäre, aber ... Es gibt eine StreamSets-Google-Gruppe, einen Slack-Kanal und eine dedizierte Q & A-Site - siehe https://streamsets.com/community/ - dort kannst du direkt mit der Community interagieren. – metadaddy

+0

Ich benutze keinen Code, ich benutze nur visuellen Editor - ich habe einen "Ursprung" Kafka Consumer und gleich danach ein "Ziel" Local FS. –

Antwort

1

Für den Rekord haben wir dies anderswo herausgefunden. Es war mit Java date patterns zu tun.

Parsen einer Zeichenfolge mit Millisekunden, wie 2017-10-20 13:15:33.611796 mit einem Ausdruck wie ${time:extractDateFromString(record:value("/ts"),"yyyy-MM-dd hh:mm:ss.SSSSSS")} funktioniert nicht wie Sie vielleicht erwarten. S bedeutet Millisekunden, also werden 611,796 Mikrosekunden als 611,796 Millisekunden interpretiert, und die 611 Sekunden (etwas über 10 Minuten) werden zu der Zeit hinzugefügt, was zu einem Date-Objekt mit dem Wert 2017-10-20 01:25:44.796 führt.

Trimmen Mikrosekunden aus der eingehenden Zeichenfolge, wie folgt: funktioniert. Die Mikrosekunden können bei Bedarf erfasst werden mit ${str:substring(record:value("/ts"), 23, 26)}

0

ich ein sehr ähnliches Problem begegnet sind, wenn sie Dienstleistungen in Docker für Mac. Es gibt ein seit langem bekanntes Problem, bei dem die Systemzeit der VM von der der Hostmaschine abweicht, was durch Neustarten von Docker behoben werden kann. Gibt es eine Chance, dass auch Sie davon betroffen sind?

+0

Ich laufe Streams auf einem CentOS Server, Kafka auf dem anderen, auch auf CentOS. Docker wird nicht verwendet. Ich denke, das ist nicht mein Fall. –

Verwandte Themen