博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Time Semantics
阅读量:6244 次
发布时间:2019-06-22

本文共 8715 字,大约阅读时间需要 29 分钟。

hot3.png

23160756_6CAV.jpg

在中介绍了三种语义Event time、Processing-time、Ingestion time。

这里需要注意的是:punctuate方法必须使用新数据才能触发。

时间语义设置

如何实现不同的时间语义主要取决于两个方面:

message timestamp类型

在0.10.x版本之后timestamps自动嵌入Kafka messages中。基于Kafka配置,这些timestamp代表event-time或者ingestion-time。参数如下,broker参数为log.message.timestamp.type,topic参数为message.timestamp.type。

log.message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。broker的参数配置。

message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。topic创建时参数配置,不设置则使用log.message.timestamp.type。

TimestampExtractor类型

接口TimestampExractor分为两类:

WallclockTimestampExtractor

WallclockTimestampExtractor提供了processing-time语义,只提供了一个方法extract。
  • extract方法获取时间为当前系统时间(System.currentTimeMillis())

ExtractRecordMetadataTimestamp

ExtractRecordMetadataTimestamp为抽象类及其子类提供了非processing-time语义。提供了两个方法extract和onInvalidTimestamp。
  • extract方法获取的是message的timestamp值。与message timestamp类型共同作用来提供event-time或者ingestion-time语义。
  • onInvalidTimestamp抽象方法需实现,主要是当遇到无效的message timestamp时如何进行处理。
ExtractRecordMetadataTimestamp包含如下子类:
  • FailOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,extractor会抛出异常。
  • LogAndSkipOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将此记录在WARN日志中打印,依然返回当前的message timestamp值,最终会在数据处理时会导致忽略处理这条记录。
  • UsePreviousTimeOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将上一条有效记录的timestamp作为当前记录的timestamp值。

自定义TimestampExtractor

可以自定义实现TimestampExtractor接口,使用提取器提取记录中的部分数据作为返回,这样可以灵活设置语义。

一般情况下,我们使用producer发送message到kafka集群时,可以指定message timestamp来设置(也就是event-time),但是如果message timestamp并不是我们需要的,那么就需要自定义提取器来提取message的某个field。

总结

通过如下表格可见:

当使用WallclockTimestampExtractor提供processing-time语义。

当ExtractRecordMetadataTimestamp子类与CreateTime类型一起时,提供event-time语义。

当ExtractRecordMetadataTimestamp子类与LogAppendTime类型一起时,提供ingestion-time语义。

自定义实现TimestampExtractor接口,提供自定义time语义。

语义类型 message timestamp TimestampExractor
processing-time
 
WallclockTimestampExtractor
event-time
CreateTime
ExtractRecordMetadataTimestamp子类
ingestion-time
LogAppendTime
ExtractRecordMetadataTimestamp子类
自定义语义   自己实现TimestampExtractor

processing-time

事件或数据记录被流处理程序开始处理时的时间点,晚于event-time和ingestion-time

这里需要注意的是:punctuate方法必须使用新数据才能触发。

比如在process-time语义中,设置context.schedule(5000),程序执行时间为20秒,在0到5秒有数据,第一条数据到来会触发第一次punctuate:

如果5秒之后不再获取任何新数据(因为程序我们设置执行时间为20秒)则永远不会触发punctuate;

如果5秒之后才获取一条新数据,则同时触发(0-5)一次punctuate;

如果10秒之后才获取一条新数据,则同时触发(0-5),(5-10)两次punctuate;

如果15秒之后才获取一条新数据,则同时触发(0-5),(5-10),(10-15)三次punctuate;

在processing-time语义下我们需要如下配置:

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");//timestamp.extractor

 

import org.apache.commons.io.FileUtils;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;import org.apache.kafka.streams.processor.ProcessorSupplier;import org.apache.kafka.streams.processor.TopologyBuilder;import org.apache.kafka.streams.state.KeyValueIterator;import org.apache.kafka.streams.state.KeyValueStore;import org.apache.kafka.streams.state.Stores;import java.io.File;import java.util.Locale;import java.util.Properties;/** * Demonstrates, using the low-level Processor APIs, how to implement the * WordCount program that computes a simple word occurrence histogram from an * input text. * * In this example, the input stream reads from a topic named * "streams-file-input", where the values of messages represent lines of text; * and the histogram output is written to topic * "streams-wordcount-processor-output" where each record is an updated count of * a single word. * * Before running this example you must create the input topic and the output * topic (e.g. via bin/kafka-topics.sh --create ...), and write some data to the * input topic (e.g. via bin/kafka-console-producer.sh). Otherwise you won't see * any data arriving in the output topic. */public class WordCountProcessorDemo {	private static class MyProcessorSupplier implements ProcessorSupplier
{ @Override public Processor
get() { return new Processor
() { private ProcessorContext context; private KeyValueStore
kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { this.context = context; this.context.schedule(5000); this.kvStore = (KeyValueStore
) context.getStateStore("Counts"); } @Override public void process(String dummy, String line) { String[] words = line.toLowerCase(Locale.getDefault()).split(" "); System.out.println("line:" + line); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } context.commit(); } @Override public void punctuate(long timestamp) { try (KeyValueIterator
iter = this.kvStore.all()) { System.out.println("----------- " + timestamp + "----------- "); //System.out.println(TimestampUtil.TimestampFormat(timestamp)); while (iter.hasNext()) { KeyValue
entry = iter.next(); System.out.println("[" + entry.key + ", " + entry.value + "]"); context.forward(entry.key, entry.value.toString()); } } } @Override public void close() { } }; } } public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "breath:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor"); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", "streams-file-input"); builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); Thread.sleep(20000L); streams.close(); // 清空state store String appStateDir = props.get(StreamsConfig.STATE_DIR_CONFIG) + System.getProperty("file.separator") + props.get(StreamsConfig.APPLICATION_ID_CONFIG); FileUtils.deleteDirectory(new File(appStateDir)); // 清空application相关中间topic以及input topic的offset String kafkaHome = System.getenv("KAFKA_HOME"); Runtime runtime = Runtime.getRuntime(); runtime.exec(kafkaHome + "/bin/kafka-streams-application-reset.sh " + "--application-id " + props.get(StreamsConfig.APPLICATION_ID_CONFIG) + " " + "--bootstrap-servers breath:9092 " + "--zookeeper breath:2181/kafka01021 " + "--input-topics streams-file-input"); }}

event-time

事件或数据记录发生的时间点,通常是源头产生。早于processing-time和ingestion-time

event-time的timestamp生成方式有两种

  • 用户可以指定timestamp,在producer.send方法中发送的ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)进行设置。
  • 用户不指定timestamp,则producer.send当时的时间作为timestamp。

在event-time语义下我们需要保证topic的message timestamp类型为CreateTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

ingestion-time

事件或数据记录被存储在Kafka broker的某个topic partition中的时间点。早于processing-time,晚于event-time。

在event-time语义下我们需要保证topic的message timestamp类型为LogAppendTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

转载于:https://my.oschina.net/yulongblog/blog/1519359

你可能感兴趣的文章
LNMP部署实例及HTTPS服务实现
查看>>
9种用户体验设计的状态是必须知道的(四)
查看>>
什么是DVB-SI?对PSI(PAT,PMT,CAT,NIT,SDT,EIT)的理解
查看>>
JavaSE 学习参考:方法重写
查看>>
Percona MySQL 5.7 Linux通用二进制tar包安装(CentOS 6.5)
查看>>
90后女生吴江平独闯9个国家 吴江平穷游照片欣赏
查看>>
linux密码策略
查看>>
【REACT NATIVE 跨平台应用开发】环境搭建问题记录&&XCODE7模拟器上COMMAND+R失效的几种替换方法...
查看>>
C++实现选择排序
查看>>
面试题:合并两个排序的链表
查看>>
PPT控件 Spire.Presentation for .NET V2.8.35发布 | 支持设置演示幻灯片布局
查看>>
云环境所面临的安全威胁
查看>>
STM32 USB转串口驱动移植到SylixOS中遇到的问题总结
查看>>
组播学习分享 第三天
查看>>
【C#小知识】C#中一些易混淆概念总结(五)---------深入解析C#继承
查看>>
数据库优化
查看>>
TensorFlow的基本运算01-03
查看>>
Hive-有意思的query
查看>>
SylixOS调试与性能分析技术--内存泄漏检测
查看>>
消息队列-ActiveMQ
查看>>