在中介绍了三种语义Event time、Processing-time、Ingestion time。
这里需要注意的是:punctuate方法必须使用新数据才能触发。
时间语义设置
如何实现不同的时间语义主要取决于两个方面:
message timestamp类型
在0.10.x版本之后timestamps自动嵌入Kafka messages中。基于Kafka配置,这些timestamp代表event-time或者ingestion-time。参数如下,broker参数为log.message.timestamp.type,topic参数为message.timestamp.type。
log.message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。broker的参数配置。
message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。topic创建时参数配置,不设置则使用log.message.timestamp.type。
TimestampExtractor类型
接口TimestampExractor分为两类:
WallclockTimestampExtractor
WallclockTimestampExtractor提供了processing-time语义,只提供了一个方法extract。
- extract方法获取时间为当前系统时间(System.currentTimeMillis())
ExtractRecordMetadataTimestamp
ExtractRecordMetadataTimestamp为抽象类及其子类提供了非processing-time语义。提供了两个方法extract和onInvalidTimestamp。
- extract方法获取的是message的timestamp值。与message timestamp类型共同作用来提供event-time或者ingestion-time语义。
- onInvalidTimestamp抽象方法需实现,主要是当遇到无效的message timestamp时如何进行处理。
ExtractRecordMetadataTimestamp包含如下子类:
- FailOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,extractor会抛出异常。
- LogAndSkipOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将此记录在WARN日志中打印,依然返回当前的message timestamp值,最终会在数据处理时会导致忽略处理这条记录。
- UsePreviousTimeOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将上一条有效记录的timestamp作为当前记录的timestamp值。
自定义TimestampExtractor
可以自定义实现TimestampExtractor接口,使用提取器提取记录中的部分数据作为返回,这样可以灵活设置语义。
一般情况下,我们使用producer发送message到kafka集群时,可以指定message timestamp来设置(也就是event-time),但是如果message timestamp并不是我们需要的,那么就需要自定义提取器来提取message的某个field。
总结
通过如下表格可见:
当使用WallclockTimestampExtractor提供processing-time语义。
当ExtractRecordMetadataTimestamp子类与CreateTime类型一起时,提供event-time语义。
当ExtractRecordMetadataTimestamp子类与LogAppendTime类型一起时,提供ingestion-time语义。
自定义实现TimestampExtractor接口,提供自定义time语义。
语义类型 | message timestamp | TimestampExractor |
---|---|---|
processing-time | WallclockTimestampExtractor | |
event-time | CreateTime | ExtractRecordMetadataTimestamp子类 |
ingestion-time | LogAppendTime | ExtractRecordMetadataTimestamp子类 |
自定义语义 | 自己实现TimestampExtractor |
processing-time
事件或数据记录被流处理程序开始处理时的时间点,晚于event-time和ingestion-time
这里需要注意的是:punctuate方法必须使用新数据才能触发。
比如在process-time语义中,设置context.schedule(5000),程序执行时间为20秒,在0到5秒有数据,第一条数据到来会触发第一次punctuate:
如果5秒之后不再获取任何新数据(因为程序我们设置执行时间为20秒)则永远不会触发punctuate;
如果5秒之后才获取一条新数据,则同时触发(0-5)一次punctuate;
如果10秒之后才获取一条新数据,则同时触发(0-5),(5-10)两次punctuate;
如果15秒之后才获取一条新数据,则同时触发(0-5),(5-10),(10-15)三次punctuate;
在processing-time语义下我们需要如下配置:
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");//timestamp.extractor
import org.apache.commons.io.FileUtils;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;import org.apache.kafka.streams.processor.ProcessorSupplier;import org.apache.kafka.streams.processor.TopologyBuilder;import org.apache.kafka.streams.state.KeyValueIterator;import org.apache.kafka.streams.state.KeyValueStore;import org.apache.kafka.streams.state.Stores;import java.io.File;import java.util.Locale;import java.util.Properties;/** * Demonstrates, using the low-level Processor APIs, how to implement the * WordCount program that computes a simple word occurrence histogram from an * input text. * * In this example, the input stream reads from a topic named * "streams-file-input", where the values of messages represent lines of text; * and the histogram output is written to topic * "streams-wordcount-processor-output" where each record is an updated count of * a single word. * * Before running this example you must create the input topic and the output * topic (e.g. via bin/kafka-topics.sh --create ...), and write some data to the * input topic (e.g. via bin/kafka-console-producer.sh). Otherwise you won't see * any data arriving in the output topic. */public class WordCountProcessorDemo { private static class MyProcessorSupplier implements ProcessorSupplier |
event-time
事件或数据记录发生的时间点,通常是源头产生。早于processing-time和ingestion-time
event-time的timestamp生成方式有两种
- 用户可以指定timestamp,在producer.send方法中发送的ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)进行设置。
- 用户不指定timestamp,则producer.send当时的时间作为timestamp。
在event-time语义下我们需要保证topic的message timestamp类型为CreateTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor
ingestion-time
事件或数据记录被存储在Kafka broker的某个topic partition中的时间点。早于processing-time,晚于event-time。
在event-time语义下我们需要保证topic的message timestamp类型为LogAppendTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor