博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka可插拔增强如何实现?
阅读量:4211 次
发布时间:2019-05-26

本文共 2511 字,大约阅读时间需要 8 分钟。

image.png

导弹拦截,精准防御。

背景

拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链; 类比springMVC的拦截器:

image.png

file

这些都是通过配置拦截器,插入到应用程序中,实现可插拔的修改业务逻辑;

kafka在0.10.0.0版本中开始引入拦截器。分为生产者拦截器和消费者拦截器,类似责任链的方式编排多个拦截器为一个大拦截器。

配置方法:配置参数

Properties props = new Properties();List
interceptors = new ArrayList<>();interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);……

file

注意: 配置拦截器需要制定拦截器的全限定名,并且保证生产者或者消费者客户端能够正确加载到配置的拦截器;

file

通过拦截器实现,强制让所有的生产者,消费者配置该拦截器,实现消息审计的功能; |

生产者拦截器

拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor

file

消费者拦截器

org.apache.kafka.clients.consumer.ConsumerInterceptor

file

实操

实现端到端的性能监控:

处理过程:

file

生产者代码:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor
{ private Jedis jedis; // 省略Jedis初始化 @Override public ProducerRecord
onSend(ProducerRecord
record) { jedis.incr("totalSentMessage"); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map
configs) { }

消费者代码:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor
{ private Jedis jedis; //省略Jedis初始化 @Override public ConsumerRecords
onConsume(ConsumerRecords
records) { long lantency = 0L; for (ConsumerRecord
record : records) { lantency += (System.currentTimeMillis() - record.timestamp()); } jedis.incrBy("totalLatency", lantency); long totalLatency = Long.parseLong(jedis.get("totalLatency")); long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage")); jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs)); return records; } @Override public void onCommit(Map
offsets) { } @Override public void close() { } @Override public void configure(Map
configs)

配置到拦截器到对应的生产者和消费者对象,即简单的实现了平均消息延时的端到端性能统计。

小结

类比AOP是Spring提供的核心功能,即面向切面编程,可以把跟业务逻辑无关的安全,审计,性能相关功能放到切面增强中实现。 对Kafka进行一些可插拔的功能增强可以通过拦截器实现。

本篇介绍了kafka的拦截器的使用方法,以及通过实例展示了具体的用法,希望对团队使用的kafka做一些增强功能的时候可以利用这个点去扩展。

image.png

原创不易,关注诚可贵,转发价更高!转载请注明出处,让我们互通有无,共同进步,欢迎沟通交流。 我会持续分享Java软件编程知识和程序员发展职业之路,欢迎关注,我整理了这些年编程学习的各种资源,关注公众号‘李福春持续输出’,发送'学习资料'分享给你!

你可能感兴趣的文章
常见的排序算法
查看>>
5.PyTorch实现逻辑回归(二分类)
查看>>
6.PyTorch实现逻辑回归(多分类)
查看>>
8.Pytorch实现5层全连接结构的MNIST(手写数字识别)
查看>>
9.PyTorch实现MNIST(手写数字识别)(2卷积1全连接)
查看>>
hdu 3460 Ancient Printer(trie tree)
查看>>
中间数
查看>>
KMP求前缀函数(next数组)
查看>>
KMP
查看>>
poj 3863Business Center
查看>>
Android编译系统简要介绍和学习计划
查看>>
Android编译系统环境初始化过程分析
查看>>
user2eng 笔记
查看>>
DRM in Android
查看>>
ARC MRC 变换
查看>>
Swift cell的自适应高度
查看>>
【linux】.fuse_hiddenXXXX 文件是如何生成的?
查看>>
【LKM】整合多个LKM为1个
查看>>
【Windows C++】调用powershell上传指定目录下所有文件
查看>>
Java图形界面中单选按钮JRadioButton和按钮Button事件处理
查看>>