问题
线上的KafkaStream程序出现了下面的异常,”has invalid (negative) timestamp.”:
1 | Exception in thread "ad-effect-application-001-bf034f5b-9012-4e5e-aab3-128774768e74-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = dmpRawLog7, partition = 5, offset = 375653352, CreateTime = -1, serialized key size = -1, serialized value size = 980, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"data":"{\"type\":\"getad\",\"sid\":\"e52db2495177bd3c\",\"time\":1538208879,\"zone\":{\"id\":\"1xsgiwb0jqfh\",\"resourceid\":\"\",\"agentid\":\"1xsgh2et8jk7\",\"businessid\":\"sndo_sndo\"},\"user\":{\"id\":\"5d4073746afabbf9a11a27fbf9fe33eb\",\"clkip\":\"\",\"clkuseragent\":\"\",\"clkvalid\":0},\"scenario\":{\"page\":\"http://sp360.9idudu.com/sdk/defaultr/app/share/share.html?appId=9214af24213ab9212c5235fafe82c4d4\",\"referrer\":\"http://sp360.9idudu.com/sdk/defaultr/app/share/share.html?appId=9214af24213ab9212c5235fafe82c4d4\\u0026appSecret=a943ff9a630c3c01c3e92444f354f860\\u0026messageId=cmsqukngt1irs\\u0026messageType=NEWS\\u0026parentId=af274d7b4026bcff1c04131321eaf7ec\"},\"device\":{\"ip\":\"171.220.55.140\",\"agent\":\"Mozilla/5.0 (Linux; Android 7.1.2; KINGSUN-F16 Build/N2G47H; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/52.0.2743.100 Mobile Safari/537.36 Imei/434766830973068DCBEA8AFB32463F5F\",\"screen\":\"\"},\"fill\":1,\"appid\":\"\"}"}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data. |
解决
查阅资料发现是时间戳导致的,kafka中我存的是一个Json字符串,里面包含timestamp字段,出问题的数据是因为没有timestamp字段,所以自己实现TimestampExtractor。
1 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
Kafka Stream 配置自定义的TimestampExtractor:
1 | kafkaStreamConf.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CurrentTimestampExtractor.class.getName()); |