一、疑问描述
spring-kafka通过 @KafkaListener 的方式配置订阅的topic,通过@Configuration 配置创建kafkaListenerContainerFactory。
如下:
@Configuration
@EnableKafka
public class KafkaConfig {
private static final String KAFKA_SERVERS_CONFIG = "10.192.77.202:9092";
private static final String LOCAL_GROUP_ID = "test";
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, LOCAL_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@KafkaListener(topics = "TEST_TOPIC_NEW")
public void listen(String data) {
System.out.println("kafkaconfig =listen======="+data);
}
}
但想要动态的创建监听者对象,如通过数据库的方式配置KAFKA_SERVERS_CONFIG 和LOCAL_GROUP_ID ,并且可以不用重启服务,实现热更新。通过spring-kafka提供的接口没有找到好的解决方法。
二、解决方案
所以,考虑通过最基本的手动创建消费者对象。
通过定时任务,每三分钟check一次,从数据库读取相应配置,将已有配置写入缓存,当读取的配置和缓存不一致时,销毁已有消费者,创建新的消费者。
如果有好的方案,谢谢告知~
/**
* 每三分钟check一次kafka配置
* @throws Exception
*/
@Scheduled(cron = "1 1/3 * * * ? ")
public void deviceNotifyConfig(){
Map<String, String> kafkaConfigs = systemConfigService.fetchConfigLikeKey("kafka");
if(kafkaConfigs != null && kafkaConfigs.size() != 0)
{
String kafkaIp = kafkaConfigs.get("kafkaIp");
String kafkaPort = kafkaConfigs.get("kafkaPort");
String kafkaUserName = kafkaConfigs.get("kafkaUserName");
String kafkaPassword = kafkaConfigs.get("kafkaPassword");
if(StringUtils.isNotEmpty(KafkaLinkCache.kafkaConfigCache))
{
if (!KafkaLinkCache.kafkaConfigCache.equals(kafkaIp + "_" + kafkaPort))
{
//关闭已有消费者对象
KafkaConsumer<String, String> consumer = KafkaLinkCache.DEVICE_CONSUMER_MAP.get("kafkaComsumer");
if(consumer != null)
{
resourceNotifyConsumer.closeConsumer();
}
KafkaLinkCache.DEVICE_CONSUMER_MAP.clear();
this.handlerConsumer(kafkaIp, kafkaPort);
}
}
else
{
this.handlerConsumer(kafkaIp, kafkaPort);
}
}else
{
//关闭已有消费者对象
KafkaConsumer<String, String> consumer = KafkaLinkCache.DEVICE_CONSUMER_MAP.get("kafkaComsumer");
if(consumer != null)
{
resourceNotifyConsumer.closeConsumer();
}
KafkaLinkCache.DEVICE_CONSUMER_MAP.clear();
}
}
private void handlerConsumer(String kafkaIp, String kafkaPort) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaIp + ":" + kafkaPort);
// key反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 每个消费者都必须属于某一个消费组,所以必须指定group.id
props.put("group.id", "test");
// 构造消费者对象
deviceNoifyThreadExecutor.execute(()->{
KafkaConsumer<String, String> consumerObj = null;
// 指定多主题:
List<String> topics = CbdmOptUtil.stringToStringList(PropertiesUtil.getProperty("kafka.subscribe.topics"), ConstParamErrorCode.DEFAULT_SPLIT_KEY, false);
try {
consumerObj = new KafkaConsumer<>(props);
if(consumerObj != null) {
consumerObj.subscribe(topics);
resourceNotifyConsumer.setConsumer(consumerObj);
KafkaLinkCache.DEVICE_CONSUMER_MAP.put("kafkaComsumer", consumerObj);
resourceNotifyConsumer.onMessage();
}
} catch(Exception e) {
LogUtils.logError(RunTimeLogUtil.toErrorLog(ConstParamErrorCode.SYSTEM_CODE_FAIL + "", LogObjectTypeEnum.SYSTEM,"consume",
"resolve data platform notify error"),e);
}finally {
// 关闭
consumerObj.close();
}
});
//保存配置
KafkaLinkCache.kafkaConfigCache = kafkaIp + "_" + kafkaPort;
}
@Component(value = "resourceNotifyConsumer")
public class ResourceNotifyConsumer {
private Logger logger = LoggerFactory.getLogger(ResourceNotifyConsumer.class);
@Resource
IAccessDeviceService resourceService;
private KafkaConsumer<String, String> consumer = null;
public KafkaConsumer<String, String> getConsumer() {
return consumer;
}
public void setConsumer(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
public void closeConsumer()
{
//consumer非线程安全,依靠gc回收
consumer = null;
}
public void onMessage(){
try{
logger.info(RunTimeLogUtil.toLog(LogObjectTypeEnum.SYSTEM,"consume","Get resource Notify start",null,null));
while (true) {
if(consumer != null)
{
// timeout 阻塞时间,从kafka中取出100毫秒的数据,有可能一次取出0到N条
List<Map<String,Object>> datas = new ArrayList<>();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 遍历
for (ConsumerRecord<String, String> record : records) {
Map<String,Object> notifyDto = ( Map<String,Object> ) JsonUtils.jsonToMap(record.value());
datas.add(notifyDto);
}
// 拿出结果
if(CollectionUtils.isNotEmpty(datas)){
logger.info(RunTimeLogUtil.toLog(LogObjectTypeEnum.SYSTEM,"consume","Get resource Notify",null,null, "record"),JsonUtils.object2Json(datas));
// 起线程处理 资源变更通知
resourceHandle(datas);
}
} else {
break;
}
}
}catch (Throwable e){
logger.error(RunTimeLogUtil.toErrorLog(ConstParamErrorCode.SYSTEM_CODE_FAIL + "",LogObjectTypeEnum.SYSTEM,"consume",
"resolve resource notify error"),e);
}
}
/**
*
* @param datas
*/
private void resourceHandle(List<Map<String,Object>> datas){
if(CollectionUtils.isNotEmpty(datas)){
try {
new Thread(() -> resourceService.dealResource(datas)).start();
}catch (Throwable e){
logger.error(RunTimeLogUtil.toErrorLog(ConstParamErrorCode.SYSTEM_CODE_FAIL + "",LogObjectTypeEnum.SYSTEM,"consume",
"resourceHandle error"),e);
}
}else{
logger.info(RunTimeLogUtil.toLog(LogObjectTypeEnum.SYSTEM,"consume","resource notify data is empty!",null,null));
}
}
}
文章浏览阅读410次,点赞4次,收藏4次。MyBatis在Mapper接口上使用了动态代理 代理机制是Java中常用的设计模式,分为静态代理和动态代理。静态代理:在程序编译时已经将接口、代理类和被代理类等确定下来动态代理:代理类在程序运行期间动态创建。
文章浏览阅读7.5k次,点赞14次,收藏47次。目录前言:为什么要使用linux服务器?一、握手包的抓取二、握手包格式转换(cap to hc22000)三、linux服务器的环境配置四、利用hashcat进行暴力测试总结前言:为什么要使用linux服务器?当下对于wifi密码的破解,主流的两种手段是握手包破解和WPS破解,而WPS破解却仅限于无线路由器。如果你到了一条陌生的街道或者办公场所,大部分情况下,手机热点是会比无线路由器信号要多的。所以握手包破解还是更加普适。hashcat作为最快的密码恢复工具之一,能够支持GPU加速计算,相关的测试表明_hashcat 22000
文章浏览阅读5.8k次,点赞8次,收藏56次。 【百度百科】 百度百科给出人工智能的概念及知识,专业科学,但一定也会让不少人望而生畏。 人工智能(Artificial Intelligence),英文缩写为AI。它是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。 人工智能是计算机科学的一个分支,它企图了解智能的实质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器,该领域的研究包括机器人、语言识别、图像识别、自然语言处理和专家系统等。人工智能从诞生以来,理论和技术日益成熟,应用领域_人工智能科普
文章浏览阅读2.6w次,点赞29次,收藏146次。前段时间玩微信爬虫,仿佛打开了新世界的大门。使用 wxpy 库调用微信的接口,可以获取好友信息,聊天消息收发等功能。脑洞一开,这样不就可以用来做 聊天消息防撤回,红包提醒,自动回复这些有意思的玩意儿了嘛!言归正传,这次我搜集整理了一些比较好用的 ”聊天机器人“ 的 API 接口。如果后期有机会的话,把它们接入我的程序中,做一个陪聊小助手,也是一个不错的选择。小 i 机..._免费的chatai接口
文章浏览阅读889次,点赞24次,收藏36次。毕业设计:基于深度学习的网课学生坐姿检测系统利用深度学习算法对摄像头捕捉的学生图像进行特征提取和分类,以判断学生的坐姿是否正确。首先,通过图像采集模块获取学生的实时视频流,判断学生的坐姿状态。为计算机毕业设计提供了一个创新的方向,为毕业生提供了一个有意义的研究课题。对于计算机专业、软件工程专业、人工智能专业、大数据专业的毕业生而言,提供了一个具有挑战性和创新性的研究课题。无论您对深度学习技术保持浓厚兴趣,还是希望探索机器学习、信息安全、算法或人工智能的领域的同学,能为您提供灵感和指导;_坐姿检测系统
文章浏览阅读300次。前言先说说2020_n1CTF的web题Easy_tp5复现问题。这个题在保留thinkphp的RCE点的同时,并且RCE中ban掉许多危险函数,只能允许单参数的函数执行。对于现在在网络中流传的文件包含的点也增加了限制。smile yyds!先说一下这个题限制条件:thinkphp版本:5.0.0php版本:7对于包含文件增加了限制ban掉所有的单参数危险函数设置open_basedir为web目..._thinkphp5.x getshell 禁用函数
文章浏览阅读1.6k次。使用as.matrix报错:_as_matrix() 代替
文章浏览阅读1w次,点赞3次,收藏35次。北邮计算机学院2017届复试经验分享 初试完了再来担心复试,有看复试经验的时间还不如多做两道数学题! 导师:了解导师的情况,最差也不要找一个人不好的老师,其次尽量选自己喜欢的方向,出成绩以后尽早联系导师,加一加群,不要信息闭塞,有学长学姐就更好了,礼貌的咨询他们。 考完试对一下答案,如果差不多能过就可以准备一下复试了,因为北邮的复试尤其是机试挺难的。 机试:很重要,..._北邮计算机技术复试上机很重要吗
文章浏览阅读1.5w次,点赞11次,收藏76次。Python第四周作业之选择题1. 以下关于递归函数基例的说法错误的是:2. 以下选项不是函数作用的是:3. 以下关于Python函数说法错误的是:4. 以下关于模块化设计描述错误的是:5. 以下对递归描述错误的是:6. 以下关于函数说法错误的是:7. 哪个选项对于函数的定义是错误的?8. 函数定义时,以下不需要使用global声明就可能操作全局变量的类型是:9. 以下关于lambda函数说法错误的是:10. 以下能够返回struct_time类型时间的函数是:11. 哪个选项是下面代码的执行结果?12. _下关于lambda函数说法错误的是:()
文章浏览阅读66次。Linux系统下安装jdk及环境配置1、利用yun 云安装 :首先在liunx下输入:yum -y list java*→yum install -y java-1.8.0-openjdk-devel.x86_64(jdk1.8版本)2、手动安装:去Oracle官网下载需要安装的jdk版本,我这里用的是jdk-8u181-linux-x64.tar.gz将该文件包放在user/java下...
文章浏览阅读983次,点赞26次,收藏6次。接着我们介绍后面的三道题,虽然代码变多了但我们的思路更加通顺了。题目:4. 单链表相关经典算法OJ题3:合并两个有序链表5. 循环链表经典应⽤-环形链表的约瑟夫问题6. 单链表相关经典算法OJ题5:分割链表
文章浏览阅读3.2k次。下面是《Android App开发入门与项目实战》一书用到的工具和代码资源:1、本书使用的Android Studio版本为4.1,最新的安装包可前往。2、本书提供所有示例源码的demo工程下载,源码(适配Android4.4到Android11)的下载方式见该书前言末尾的二维码,获取ppt课件同样扫描前言末尾的二维码。最新的源码也可访问我的gitee获取。_android app开发入门与项目实战pdf