Kafka Stream TimestampExtractor

问题

线上的KafkaStream程序出现了下面的异常,”has invalid (negative) timestamp.”:

1
2
3
4
5
6
7
8
9
10
11
Exception in thread "ad-effect-application-001-bf034f5b-9012-4e5e-aab3-128774768e74-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = dmpRawLog7, partition = 5, offset = 375653352, CreateTime = -1, serialized key size = -1, serialized value size = 980, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"data":"{\"type\":\"getad\",\"sid\":\"e52db2495177bd3c\",\"time\":1538208879,\"zone\":{\"id\":\"1xsgiwb0jqfh\",\"resourceid\":\"\",\"agentid\":\"1xsgh2et8jk7\",\"businessid\":\"sndo_sndo\"},\"user\":{\"id\":\"5d4073746afabbf9a11a27fbf9fe33eb\",\"clkip\":\"\",\"clkuseragent\":\"\",\"clkvalid\":0},\"scenario\":{\"page\":\"http://sp360.9idudu.com/sdk/defaultr/app/share/share.html?appId=9214af24213ab9212c5235fafe82c4d4\",\"referrer\":\"http://sp360.9idudu.com/sdk/defaultr/app/share/share.html?appId=9214af24213ab9212c5235fafe82c4d4\\u0026appSecret=a943ff9a630c3c01c3e92444f354f860\\u0026messageId=cmsqukngt1irs\\u0026messageType=NEWS\\u0026parentId=af274d7b4026bcff1c04131321eaf7ec\"},\"device\":{\"ip\":\"171.220.55.140\",\"agent\":\"Mozilla/5.0 (Linux; Android 7.1.2; KINGSUN-F16 Build/N2G47H; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/52.0.2743.100 Mobile Safari/537.36 Imei/434766830973068DCBEA8AFB32463F5F\",\"screen\":\"\"},\"fill\":1,\"appid\":\"\"}"}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63)
at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61)
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:493)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:628)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)

解决

查阅资料发现是时间戳导致的,kafka中我存的是一个Json字符串,里面包含timestamp字段,出问题的数据是因为没有timestamp字段,所以自己实现TimestampExtractor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

/**
* Created by hadoop on 2018/9/30.
*/
public class CurrentTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
long timestamp = record.timestamp();
if (timestamp < 0) {
return System.currentTimeMillis();
} else {
return timestamp;
}
}
}

Kafka Stream 配置自定义的TimestampExtractor:

1
kafkaStreamConf.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CurrentTimestampExtractor.class.getName());