From cf399b316e428e3461dd197830926c274e45c42a Mon Sep 17 00:00:00 2001 From: N1KO Date: Fri, 13 Dec 2024 15:45:09 +0800 Subject: [PATCH] init --- .gitignore | 38 ++++ .idea/.gitignore | 8 + .idea/encodings.xml | 7 + .idea/inspectionProfiles/Project_Default.xml | 73 +++++++ .idea/misc.xml | 20 ++ .idea/uiDesigner.xml | 124 ++++++++++++ pom.xml | 130 ++++++++++++ .../demo/BaoGuTangDemoApplication.java | 24 +++ .../baogutang/demo/config/RedisConfig.java | 70 +++++++ .../baogutang/demo/constants/MsgConstant.java | 21 ++ .../baogutang/demo/constants/RedisKey.java | 36 ++++ .../demo/constants/enums/QueuePlatform.java | 17 ++ .../demo/constants/enums/QueueType.java | 47 +++++ .../top/baogutang/demo/message/IQueueMsg.java | 66 ++++++ .../demo/message/ImagePushMessage.java | 22 ++ .../top/baogutang/demo/message/QueueMsg.java | 101 +++++++++ .../baogutang/demo/message/SendResult.java | 36 ++++ .../AbstractQueueMessageConsumer.java | 39 ++++ .../consumer/IQueueMessageConsumer.java | 31 +++ .../consumer/MessageConsumerFactory.java | 37 ++++ .../RedisImagePushQueueMessageConsumer.java | 49 +++++ .../producer/IQueueMessageProducer.java | 28 +++ .../producer/MessageProducerFactory.java | 38 ++++ .../producer/RedisQueueMessageProducer.java | 50 +++++ .../demo/service/IConsumeMessageService.java | 13 ++ .../demo/service/ISendMessageService.java | 15 ++ .../impl/ConsumeMessageServiceImpl.java | 52 +++++ .../service/impl/SendMessageServiceImpl.java | 46 +++++ .../top/baogutang/demo/utils/JacksonUtil.java | 191 ++++++++++++++++++ src/main/resources/application.yml | 36 ++++ .../demo/service/ConsumerServiceTest.java | 32 +++ .../demo/service/SendMessageServiceTest.java | 34 ++++ 32 files changed, 1531 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/encodings.xml create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/uiDesigner.xml create mode 100644 pom.xml create mode 100644 src/main/java/top/baogutang/demo/BaoGuTangDemoApplication.java create mode 100644 src/main/java/top/baogutang/demo/config/RedisConfig.java create mode 100644 src/main/java/top/baogutang/demo/constants/MsgConstant.java create mode 100644 src/main/java/top/baogutang/demo/constants/RedisKey.java create mode 100644 src/main/java/top/baogutang/demo/constants/enums/QueuePlatform.java create mode 100644 src/main/java/top/baogutang/demo/constants/enums/QueueType.java create mode 100644 src/main/java/top/baogutang/demo/message/IQueueMsg.java create mode 100644 src/main/java/top/baogutang/demo/message/ImagePushMessage.java create mode 100644 src/main/java/top/baogutang/demo/message/QueueMsg.java create mode 100644 src/main/java/top/baogutang/demo/message/SendResult.java create mode 100644 src/main/java/top/baogutang/demo/message/consumer/AbstractQueueMessageConsumer.java create mode 100644 src/main/java/top/baogutang/demo/message/consumer/IQueueMessageConsumer.java create mode 100644 src/main/java/top/baogutang/demo/message/consumer/MessageConsumerFactory.java create mode 100644 src/main/java/top/baogutang/demo/message/consumer/RedisImagePushQueueMessageConsumer.java create mode 100644 src/main/java/top/baogutang/demo/message/producer/IQueueMessageProducer.java create mode 100644 src/main/java/top/baogutang/demo/message/producer/MessageProducerFactory.java create mode 100644 src/main/java/top/baogutang/demo/message/producer/RedisQueueMessageProducer.java create mode 100644 src/main/java/top/baogutang/demo/service/IConsumeMessageService.java create mode 100644 src/main/java/top/baogutang/demo/service/ISendMessageService.java create mode 100644 src/main/java/top/baogutang/demo/service/impl/ConsumeMessageServiceImpl.java create mode 100644 src/main/java/top/baogutang/demo/service/impl/SendMessageServiceImpl.java create mode 100644 src/main/java/top/baogutang/demo/utils/JacksonUtil.java create mode 100644 src/main/resources/application.yml create mode 100644 src/test/java/top/baogutang/demo/service/ConsumerServiceTest.java create mode 100644 src/test/java/top/baogutang/demo/service/SendMessageServiceTest.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..aa00ffa --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..a3aa9db --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,73 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..13b62e0 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..2b63946 --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8a32970 --- /dev/null +++ b/pom.xml @@ -0,0 +1,130 @@ + + + 4.0.0 + + com.baogutang.demo + baogutang-demo + 1.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 2.3.2.RELEASE + + + + 11 + 11 + UTF-8 + + + + + org.projectlombok + lombok + + + + org.springframework.boot + spring-boot-starter-web + + + + mysql + mysql-connector-java + + + + com.baomidou + mybatis-plus-boot-starter + 3.5.3.1 + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + + org.apache.commons + commons-lang3 + + + + com.squareup.okhttp3 + okhttp + 4.9.3 + + + + org.aspectj + aspectjweaver + 1.9.6 + + + + com.google.guava + guava + 32.1.1-jre + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + + + src/main/resources + true + + templates/fonts/** + + + + src/main/resources + false + + templates/fonts/** + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19 + + true + + + + + + \ No newline at end of file diff --git a/src/main/java/top/baogutang/demo/BaoGuTangDemoApplication.java b/src/main/java/top/baogutang/demo/BaoGuTangDemoApplication.java new file mode 100644 index 0000000..82fd5ee --- /dev/null +++ b/src/main/java/top/baogutang/demo/BaoGuTangDemoApplication.java @@ -0,0 +1,24 @@ +package top.baogutang.demo; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 14:48 + */ +@Slf4j +@SpringBootApplication(scanBasePackages = {"top.baogutang.demo.*"}) +@EnableAsync +public class BaoGuTangDemoApplication { + + public static void main(String[] args) { + SpringApplication.run(BaoGuTangDemoApplication.class, args); + log.info("==============DEMO服务启动成功!!================="); + } +} diff --git a/src/main/java/top/baogutang/demo/config/RedisConfig.java b/src/main/java/top/baogutang/demo/config/RedisConfig.java new file mode 100644 index 0000000..88f1ba8 --- /dev/null +++ b/src/main/java/top/baogutang/demo/config/RedisConfig.java @@ -0,0 +1,70 @@ +package top.baogutang.demo.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +import java.net.UnknownHostException; +import java.util.TimeZone; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 10:56 + */ + +@Configuration +@Slf4j +public class RedisConfig { + + /** + * 编写自定义的 redisTemplate + * 这是一个比较固定的模板 + */ + @SuppressWarnings("all") + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { + // 为了开发方便,直接使用 + RedisTemplate template = new RedisTemplate(); + template.setConnectionFactory(redisConnectionFactory); + + // Json 配置序列化 + // 使用 jackson 解析任意的对象 + Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); + // 使用 objectMapper 进行转义 + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.setTimeZone(TimeZone.getDefault()); + jackson2JsonRedisSerializer.setObjectMapper(objectMapper); + // String 的序列化 + StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); + + // key 采用 String 的序列化方式 + template.setKeySerializer(stringRedisSerializer); + // Hash 的 key 采用 String 的序列化方式 + template.setHashKeySerializer(stringRedisSerializer); + // value 采用 jackson 的序列化方式 + template.setValueSerializer(jackson2JsonRedisSerializer); + // Hash 的 value 采用 String 的序列化方式 + template.setHashValueSerializer(jackson2JsonRedisSerializer); + // 把所有的配置 set 进 template + template.afterPropertiesSet(); + + return template; + } + + +} diff --git a/src/main/java/top/baogutang/demo/constants/MsgConstant.java b/src/main/java/top/baogutang/demo/constants/MsgConstant.java new file mode 100644 index 0000000..3eed0da --- /dev/null +++ b/src/main/java/top/baogutang/demo/constants/MsgConstant.java @@ -0,0 +1,21 @@ +package top.baogutang.demo.constants; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 13:38 + */ +public class MsgConstant { + + /** + * 消费端策略:异常时再次消费 + */ + public static final String CONSUMER_STRATEGY_RECONSUME = "RECONSUME"; + + /** + * 调整支持多topic订阅时,兼容旧的项目,未设置handler的topic时,使用默认值 + */ + public static final String DEFAULT_TOPIC_NAME = "DEFAULT_TOPIC_NAME"; +} diff --git a/src/main/java/top/baogutang/demo/constants/RedisKey.java b/src/main/java/top/baogutang/demo/constants/RedisKey.java new file mode 100644 index 0000000..ce1a0cc --- /dev/null +++ b/src/main/java/top/baogutang/demo/constants/RedisKey.java @@ -0,0 +1,36 @@ +package top.baogutang.demo.constants; + +import com.google.common.base.Joiner; +import top.baogutang.demo.constants.enums.QueueType; + +import java.util.Arrays; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 13:44 + */ +public class RedisKey { + + private RedisKey() { + // empty private constructor + } + + private static final String MODULE = "baogutang-demo"; + + private static final String SEPARATOR = ":"; + + /** + * 影像推送key + * + * @param queueType queueType + * @param userId userId + * @param messageKey messageKey + * @return key + */ + public static String getImagePushMsgLockKey(QueueType queueType, Long userId, String messageKey) { + return Joiner.on(SEPARATOR).join(Arrays.asList(MODULE, queueType.getTopicName(), userId, messageKey)); + } +} diff --git a/src/main/java/top/baogutang/demo/constants/enums/QueuePlatform.java b/src/main/java/top/baogutang/demo/constants/enums/QueuePlatform.java new file mode 100644 index 0000000..3438dfb --- /dev/null +++ b/src/main/java/top/baogutang/demo/constants/enums/QueuePlatform.java @@ -0,0 +1,17 @@ +package top.baogutang.demo.constants.enums; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 11:42 + */ +public enum QueuePlatform { + + REDIS, + + KAFKA, + + ; +} diff --git a/src/main/java/top/baogutang/demo/constants/enums/QueueType.java b/src/main/java/top/baogutang/demo/constants/enums/QueueType.java new file mode 100644 index 0000000..01c48d1 --- /dev/null +++ b/src/main/java/top/baogutang/demo/constants/enums/QueueType.java @@ -0,0 +1,47 @@ +package top.baogutang.demo.constants.enums; + +import lombok.Getter; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 10:56 + */ +@Getter +public enum QueueType { + + /** + * 影像推送 + */ + IMAGE_PUSH("IMAGE_PUSH", "影像推送"), + + ; + + private final String topicName; + + private final String topicDesc; + + QueueType(String topicName, String topicDesc) { + this.topicName = topicName; + this.topicDesc = topicDesc; + } + + public static QueueType[] allTypes() { + + return values(); + } + + public static QueueType get(String topicName) { + for (QueueType value : values()) { + if (value.getTopicName().equals(topicName)) { + return value; + } + } + + return null; + } + + +} diff --git a/src/main/java/top/baogutang/demo/message/IQueueMsg.java b/src/main/java/top/baogutang/demo/message/IQueueMsg.java new file mode 100644 index 0000000..c89a16b --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/IQueueMsg.java @@ -0,0 +1,66 @@ +package top.baogutang.demo.message; + +import top.baogutang.demo.constants.enums.QueueType; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 10:56 + */ +public interface IQueueMsg extends Serializable { + + /** + * getMsgType + * + * @return QueueType + */ + QueueType getMsgType(); + + /** + * body + * + * @return body + */ + T getMsgBody(); + + /** + * getMsgBizKey + * + * @return getMsgBizKey + */ + String getMsgBizKey(); + + /** + * getDelayTime + * + * @return getDelayTime + */ + long getDelayTime(); + + /** + * setDelayTime + * + * @param delayTime delayTime + */ + void setDelayTime(long delayTime); + + /** + * getTimeUnit + * + * @return getTimeUnit + */ + TimeUnit getTimeUnit(); + + /** + * setTimeUnit + * + * @param timeUnit timeUnit + */ + void setTimeUnit(TimeUnit timeUnit); + +} diff --git a/src/main/java/top/baogutang/demo/message/ImagePushMessage.java b/src/main/java/top/baogutang/demo/message/ImagePushMessage.java new file mode 100644 index 0000000..813c2c9 --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/ImagePushMessage.java @@ -0,0 +1,22 @@ +package top.baogutang.demo.message; + +import lombok.Data; + +import java.io.Serializable; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 13:48 + */ +@Data +public class ImagePushMessage implements Serializable { + + private static final long serialVersionUID = -870177103586251908L; + + private Long userId; + + // 这个类就是具体的消息类,需要什么字段就加什么字段 +} diff --git a/src/main/java/top/baogutang/demo/message/QueueMsg.java b/src/main/java/top/baogutang/demo/message/QueueMsg.java new file mode 100644 index 0000000..5188ba1 --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/QueueMsg.java @@ -0,0 +1,101 @@ +package top.baogutang.demo.message; + + +import lombok.Data; +import top.baogutang.demo.constants.enums.QueueType; + +import java.util.concurrent.TimeUnit; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 10:56 + */ +@Data +public class QueueMsg implements IQueueMsg { + + private static final long serialVersionUID = 8969365246492490122L; + + /** + * 消息类型 + */ + private QueueType msgType; + + /** + * 消息内容 + */ + private T msgBody; + + /** + * 消息业务key + */ + private String msgBizKey; + + /** + * 延迟时间 + */ + private long delayTime = 0; + + /** + * 延迟时间单位 + */ + private TimeUnit timeUnit = TimeUnit.SECONDS; + + public QueueMsg() { + + } + + public QueueMsg(QueueType msgType, T msgBody, String msgBizKey) { + this.msgType = msgType; + this.msgBody = msgBody; + this.msgBizKey = msgBizKey; + } + + + public QueueMsg(QueueType msgType, T msgBody) { + this.msgType = msgType; + this.msgBody = msgBody; + } + + + @Override + public QueueType getMsgType() { + return msgType; + } + + + @Override + public T getMsgBody() { + + return msgBody; + } + + @Override + public String getMsgBizKey() { + return msgBizKey; + } + + @Override + public long getDelayTime() { + return delayTime; + } + + @Override + public void setDelayTime(long delayTime) { + this.delayTime = delayTime; + } + + @Override + public TimeUnit getTimeUnit() { + return timeUnit; + } + + @Override + public void setTimeUnit(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } + + +} diff --git a/src/main/java/top/baogutang/demo/message/SendResult.java b/src/main/java/top/baogutang/demo/message/SendResult.java new file mode 100644 index 0000000..51a89eb --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/SendResult.java @@ -0,0 +1,36 @@ +package top.baogutang.demo.message; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 10:56 + */ +@Data +@Builder +public class SendResult implements Serializable { + + private static final long serialVersionUID = -3562968976733007349L; + + /** + * 发送结果 + */ + private boolean success; + + /** + * 已发送消息的ID(如果有) + */ + private String messageId; + + /** + * 已发送消息的主题 + */ + private String topic; + +} diff --git a/src/main/java/top/baogutang/demo/message/consumer/AbstractQueueMessageConsumer.java b/src/main/java/top/baogutang/demo/message/consumer/AbstractQueueMessageConsumer.java new file mode 100644 index 0000000..a1492e5 --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/consumer/AbstractQueueMessageConsumer.java @@ -0,0 +1,39 @@ +package top.baogutang.demo.message.consumer; + +import org.springframework.data.redis.core.RedisTemplate; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 13:41 + */ +public abstract class AbstractQueueMessageConsumer implements IQueueMessageConsumer { + + @Resource + private RedisTemplate redisTemplate; + + /** + * 重复消息处理 + * + * @param key key + * @return res + */ + protected boolean duplicateMsg(String key) { + Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, 1, 300, TimeUnit.SECONDS); + return Boolean.FALSE.equals(flag); + } + + /** + * 移除lock + * + * @param key key + */ + protected void removeLock(String key) { + redisTemplate.delete(key); + } +} diff --git a/src/main/java/top/baogutang/demo/message/consumer/IQueueMessageConsumer.java b/src/main/java/top/baogutang/demo/message/consumer/IQueueMessageConsumer.java new file mode 100644 index 0000000..bf719a2 --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/consumer/IQueueMessageConsumer.java @@ -0,0 +1,31 @@ +package top.baogutang.demo.message.consumer; + + +import top.baogutang.demo.constants.MsgConstant; +import top.baogutang.demo.message.IQueueMsg; + + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 11:04 + */ +public interface IQueueMessageConsumer { + + /** + * 消费消息 + * @param queueMsg 消息体 + */ + void consume(IQueueMsg queueMsg); + + /** + * getTopicName + * + * @return topic + */ + default String getTopicName() { + return MsgConstant.DEFAULT_TOPIC_NAME; + } +} diff --git a/src/main/java/top/baogutang/demo/message/consumer/MessageConsumerFactory.java b/src/main/java/top/baogutang/demo/message/consumer/MessageConsumerFactory.java new file mode 100644 index 0000000..3deab8e --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/consumer/MessageConsumerFactory.java @@ -0,0 +1,37 @@ +package top.baogutang.demo.message.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 14:37 + */ +@Slf4j +@Component +public class MessageConsumerFactory implements ApplicationListener { + + public final Map consumerMap = new ConcurrentHashMap<>(); + + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + ConfigurableApplicationContext applicationContext = event.getApplicationContext(); + Map beansOfType = applicationContext.getBeansOfType(IQueueMessageConsumer.class); + beansOfType.forEach((key, value) -> consumerMap.put(value.getTopicName(), value)); + log.info(">>>>>>>>>>queue message consumer inject complete<<<<<<<<<<"); + } + + public IQueueMessageConsumer getConsumer(String topicName) { + return consumerMap.get(topicName); + } +} diff --git a/src/main/java/top/baogutang/demo/message/consumer/RedisImagePushQueueMessageConsumer.java b/src/main/java/top/baogutang/demo/message/consumer/RedisImagePushQueueMessageConsumer.java new file mode 100644 index 0000000..f96873a --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/consumer/RedisImagePushQueueMessageConsumer.java @@ -0,0 +1,49 @@ +package top.baogutang.demo.message.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import top.baogutang.demo.constants.RedisKey; +import top.baogutang.demo.constants.enums.QueueType; +import top.baogutang.demo.message.IQueueMsg; +import top.baogutang.demo.message.ImagePushMessage; +import top.baogutang.demo.utils.JacksonUtil; + + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 13:47 + */ +@Slf4j +@Component +public class RedisImagePushQueueMessageConsumer extends AbstractQueueMessageConsumer { + + + @Override + public void consume(IQueueMsg queueMsg) { + log.info("RedisImagePushQueueMessageConsumer received message:{}", JacksonUtil.toJson(queueMsg)); + ImagePushMessage imagePushMessage = (ImagePushMessage) queueMsg.getMsgBody(); + String lockKey = RedisKey.getImagePushMsgLockKey(queueMsg.getMsgType(), imagePushMessage.getUserId(), queueMsg.getMsgBizKey()); + if (duplicateMsg(lockKey)) { + log.info("RedisImagePushQueueMessageConsumer received duplicate message:{}", lockKey); + return; + } + try { + // TODO: process message + // ... + } catch (Exception e) { + log.error("RedisImagePushQueueMessageConsumer process message:{} error:{}", JacksonUtil.toJson(queueMsg), e.getMessage(), e); + } finally { + removeLock(lockKey); + } + + + } + + @Override + public String getTopicName() { + return QueueType.IMAGE_PUSH.getTopicName(); + } +} diff --git a/src/main/java/top/baogutang/demo/message/producer/IQueueMessageProducer.java b/src/main/java/top/baogutang/demo/message/producer/IQueueMessageProducer.java new file mode 100644 index 0000000..3c88fe9 --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/producer/IQueueMessageProducer.java @@ -0,0 +1,28 @@ +package top.baogutang.demo.message.producer; + +import top.baogutang.demo.constants.enums.QueuePlatform; +import top.baogutang.demo.message.IQueueMsg; +import top.baogutang.demo.message.SendResult; + +import java.io.Serializable; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 10:56 + */ +public interface IQueueMessageProducer { + + QueuePlatform getPlatform(); + + /** + * 发送延迟消息 + * @param topic 队列 + * @param queueMsg 消息 + * @return 发送结果 + */ + SendResult sendDelayMessage(String topic, IQueueMsg queueMsg); + +} diff --git a/src/main/java/top/baogutang/demo/message/producer/MessageProducerFactory.java b/src/main/java/top/baogutang/demo/message/producer/MessageProducerFactory.java new file mode 100644 index 0000000..0920b13 --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/producer/MessageProducerFactory.java @@ -0,0 +1,38 @@ +package top.baogutang.demo.message.producer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; +import top.baogutang.demo.constants.enums.QueuePlatform; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 14:27 + */ +@Slf4j +@Component +public class MessageProducerFactory implements ApplicationListener { + + public final Map producerMap = new ConcurrentHashMap<>(); + + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + ConfigurableApplicationContext applicationContext = event.getApplicationContext(); + Map beansOfType = applicationContext.getBeansOfType(IQueueMessageProducer.class); + beansOfType.forEach((key, value) -> producerMap.put(value.getPlatform(), value)); + log.info(">>>>>>>>>>queue message producer inject complete<<<<<<<<<<"); + } + + public IQueueMessageProducer getProducer(QueuePlatform platform) { + return producerMap.get(platform); + } +} diff --git a/src/main/java/top/baogutang/demo/message/producer/RedisQueueMessageProducer.java b/src/main/java/top/baogutang/demo/message/producer/RedisQueueMessageProducer.java new file mode 100644 index 0000000..7e71496 --- /dev/null +++ b/src/main/java/top/baogutang/demo/message/producer/RedisQueueMessageProducer.java @@ -0,0 +1,50 @@ +package top.baogutang.demo.message.producer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import top.baogutang.demo.constants.enums.QueuePlatform; +import top.baogutang.demo.message.IQueueMsg; +import top.baogutang.demo.message.SendResult; +import top.baogutang.demo.utils.JacksonUtil; + +import javax.annotation.Resource; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 11:43 + */ +@Slf4j +@Component +public class RedisQueueMessageProducer implements IQueueMessageProducer { + + @Resource + private RedisTemplate redisTemplate; + + @Override + public QueuePlatform getPlatform() { + return QueuePlatform.REDIS; + } + + @Override + public SendResult sendDelayMessage(String topic, IQueueMsg queueMsg) { + long delayTime = System.currentTimeMillis() + queueMsg.getTimeUnit().toMillis(queueMsg.getDelayTime()); + Boolean addResult = null; + try { + addResult = redisTemplate.opsForZSet().add(topic, queueMsg, delayTime); + } catch (Exception e) { + log.error(">>>>>>>>>>send delay message error:{} topic:{},message:{}<<<<<<<<<<", e.getMessage(), topic, JacksonUtil.toJson(queueMsg), e); + return SendResult.builder() + .success(Boolean.FALSE) + .topic(topic) + .build(); + } + return SendResult.builder() + .success(Boolean.TRUE.equals(addResult)) + .topic(topic) + .build(); + } +} diff --git a/src/main/java/top/baogutang/demo/service/IConsumeMessageService.java b/src/main/java/top/baogutang/demo/service/IConsumeMessageService.java new file mode 100644 index 0000000..623cd11 --- /dev/null +++ b/src/main/java/top/baogutang/demo/service/IConsumeMessageService.java @@ -0,0 +1,13 @@ +package top.baogutang.demo.service; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 15:00 + */ +public interface IConsumeMessageService { + + void consumeRedisMessage(String messageTopic); +} diff --git a/src/main/java/top/baogutang/demo/service/ISendMessageService.java b/src/main/java/top/baogutang/demo/service/ISendMessageService.java new file mode 100644 index 0000000..cb928df --- /dev/null +++ b/src/main/java/top/baogutang/demo/service/ISendMessageService.java @@ -0,0 +1,15 @@ +package top.baogutang.demo.service; + +import java.util.concurrent.TimeUnit; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 14:40 + */ +public interface ISendMessageService { + + void sendMessage(T msgBody, Long delayTime, TimeUnit timeUnit, String messageTopic); +} diff --git a/src/main/java/top/baogutang/demo/service/impl/ConsumeMessageServiceImpl.java b/src/main/java/top/baogutang/demo/service/impl/ConsumeMessageServiceImpl.java new file mode 100644 index 0000000..6055275 --- /dev/null +++ b/src/main/java/top/baogutang/demo/service/impl/ConsumeMessageServiceImpl.java @@ -0,0 +1,52 @@ +package top.baogutang.demo.service.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import top.baogutang.demo.message.IQueueMsg; +import top.baogutang.demo.message.consumer.MessageConsumerFactory; +import top.baogutang.demo.service.IConsumeMessageService; + +import javax.annotation.Resource; +import java.util.Set; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 15:00 + */ +@Slf4j +@Service +public class ConsumeMessageServiceImpl implements IConsumeMessageService { + + @Resource + private MessageConsumerFactory messageConsumerFactory; + + @Resource + private RedisTemplate redisTemplate; + + @SuppressWarnings("unchecked") + public void consumeRedisMessage(String messageTopic) { + while (true) { + long currentTime = System.currentTimeMillis(); + // 拉取到期消息 + Set queueMsgSet = redisTemplate.opsForZSet().rangeByScore(messageTopic, 0, currentTime); + if (CollectionUtils.isEmpty(queueMsgSet)) { + return; + } + queueMsgSet.forEach(queueMsgObj -> { + IQueueMsg queueMsg = (IQueueMsg) queueMsgObj; + messageConsumerFactory.getConsumer(queueMsg.getMsgType().getTopicName()) + .consume(queueMsg); + // 删除已处理消息 + redisTemplate.opsForZSet().remove(messageTopic, queueMsgObj); + + }); + } + } + + +} diff --git a/src/main/java/top/baogutang/demo/service/impl/SendMessageServiceImpl.java b/src/main/java/top/baogutang/demo/service/impl/SendMessageServiceImpl.java new file mode 100644 index 0000000..a903375 --- /dev/null +++ b/src/main/java/top/baogutang/demo/service/impl/SendMessageServiceImpl.java @@ -0,0 +1,46 @@ +package top.baogutang.demo.service.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import top.baogutang.demo.constants.enums.QueuePlatform; +import top.baogutang.demo.constants.enums.QueueType; +import top.baogutang.demo.message.IQueueMsg; +import top.baogutang.demo.message.QueueMsg; +import top.baogutang.demo.message.SendResult; +import top.baogutang.demo.message.producer.IQueueMessageProducer; +import top.baogutang.demo.message.producer.MessageProducerFactory; +import top.baogutang.demo.service.ISendMessageService; + +import javax.annotation.Resource; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 14:40 + */ +@Slf4j +@Service +public class SendMessageServiceImpl implements ISendMessageService { + + @Resource + private MessageProducerFactory messageProducerFactory; + + public void sendMessage(T msgBody, Long delayTime, TimeUnit timeUnit,String messageTopic) { + // TODO 业务消息key,可自定义 + String msgKey = UUID.randomUUID().toString(); + IQueueMsg queueMsg = new QueueMsg<>(QueueType.IMAGE_PUSH, msgBody, msgKey); + queueMsg.setDelayTime(delayTime); + queueMsg.setTimeUnit(timeUnit); + IQueueMessageProducer producer = messageProducerFactory.getProducer(QueuePlatform.REDIS); + SendResult sendResult = producer.sendDelayMessage(messageTopic, queueMsg); + if (Objects.isNull(sendResult) || !Boolean.TRUE.equals(sendResult.isSuccess())) { + // TODO 消息发送不成功,逻辑处理 + } + } + +} diff --git a/src/main/java/top/baogutang/demo/utils/JacksonUtil.java b/src/main/java/top/baogutang/demo/utils/JacksonUtil.java new file mode 100644 index 0000000..0655547 --- /dev/null +++ b/src/main/java/top/baogutang/demo/utils/JacksonUtil.java @@ -0,0 +1,191 @@ +package top.baogutang.demo.utils; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 11:50 + */ +@Slf4j +public class JacksonUtil { + + private JacksonUtil() { + // empty private constructor + } + + private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + + static { + + // yyyy-MM-dd HH:mm:ss + OBJECT_MAPPER.setDateFormat(new SimpleDateFormat(DEFAULT_DATE_FORMAT)); + // JavaTimeModule Java 8 + OBJECT_MAPPER.registerModule(new JavaTimeModule()); + // + OBJECT_MAPPER.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + // ObjectMapper null + OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + // ObjectMapper Bean () + OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + // ObjectMapper JSON Java + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + } + + /** + * json + * + * @param content + * @return json + */ + public static String toJson(Object content) { + String result = StringUtils.EMPTY; + if (Objects.isNull(content)) { + return result; + } + try { + result = OBJECT_MAPPER.writeValueAsString(content); + } catch (Exception e) { + log.error(">>>>>>>>>>parse object to json fail:{}<<<<<<<<<<", e.getMessage()); + } + return result; + } + + /** + * + * + * @param content + * @param clazz + * @param + * @return + */ + public static T fromJson(String content, Class clazz) { + T result = null; + if (StringUtils.isBlank(content)) { + return result; + } + try { + result = OBJECT_MAPPER.readValue(content, clazz); + } catch (Exception e) { + log.error(">>>>>>>>>>parse json to object fail:{}<<<<<<<<<<", e.getMessage()); + } + return result; + } + + /** + * + * + * @param content + * @param type + * @param + * @return + */ + public static T fromJson(String content, TypeReference type) { + T result = null; + if (StringUtils.isBlank(content)) { + return result; + } + try { + result = OBJECT_MAPPER.readValue(content, type); + } catch (Exception e) { + log.error(">>>>>>>>>>parse json to object fail:{}<<<<<<<<<<", e.getMessage()); + } + return result; + } + + /** + * jsonList + * + * @param content + * @param clazz + * @param + * @return + */ + public static List jsonToList(String content, Class clazz) { + List list = new ArrayList<>(); + if (StringUtils.isBlank(content) || clazz == null) { + return list; + } + try { + list = OBJECT_MAPPER.readValue(content, new TypeReference>() { + }); + } catch (Exception e) { + log.error(">>>>>>>>>>parse json to list fail:{}<<<<<<<<<<", e.getMessage()); + } + return list; + } + + /** + * jsonMap + * + * @param content + * @param keyType key + * @param valueType value + * @param Class + * @param Class + * @return Map + */ + public static Map jsonToMap(String content, Class keyType, Class valueType) { + Map result = new HashMap<>(); + if (StringUtils.isBlank(content) || Objects.isNull(keyType) || Objects.isNull(valueType)) { + return result; + } + try { + result = OBJECT_MAPPER.readValue(content, new TypeReference<>() { + }); + } catch (Exception e) { + log.error(">>>>>>>>>>parse json to map fail:{}<<<<<<<<<<", e.getMessage()); + } + return result; + } + + public static Map beanToMap(Object content, Class keyType, Class valueType) { + Map result = new HashMap<>(); + if (Objects.isNull(content) || Objects.isNull(keyType) || Objects.isNull(valueType)) { + return result; + } + try { + result = OBJECT_MAPPER.convertValue(content, new TypeReference<>() { + }); + } catch (Exception e) { + log.error(">>>>>>>>>>parse json to map fail:{}<<<<<<<<<<", e.getMessage()); + } + return result; + } + + /** + * jsonJsonNode + * + * @param content + * @return JsonNode + */ + public static JsonNode jsonToNode(String content) { + JsonNode jsonNode = null; + if (StringUtils.isBlank(content)) { + return null; + } + try { + jsonNode = OBJECT_MAPPER.readTree(content); + } catch (Exception e) { + log.error(">>>>>>>>>>parse json to node fail:{}<<<<<<<<<<", e.getMessage()); + } + return jsonNode; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..2c00697 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,36 @@ + +server: + port: 8106 + +spring: + profiles: + active: ${SPRING_ACTIVE_PROFILE:local} + application: + name: baogutang-demo + datasource: + driverClassName: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://117.72.78.133:3306/baogutang-music?characterEncoding=UTF-8 + username: baogutang-music + password: xCcFfY3zXh8sC28e + hikari: + max-lifetime: 60000 + redis: + host: 117.72.78.133 + port: 6379 + password: admin@pwdpwd + servlet: + multipart: + max-file-size: 2048MB + max-request-size: 32MB + resources: + static-locations: classpath:/templates/ + +thread: + pool: + core: 10 + max: 10 + alive: 10 + capacity: 200 + + + diff --git a/src/test/java/top/baogutang/demo/service/ConsumerServiceTest.java b/src/test/java/top/baogutang/demo/service/ConsumerServiceTest.java new file mode 100644 index 0000000..7cbc421 --- /dev/null +++ b/src/test/java/top/baogutang/demo/service/ConsumerServiceTest.java @@ -0,0 +1,32 @@ +package top.baogutang.demo.service; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +import javax.annotation.Resource; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 15:26 + */ +@Slf4j +@SpringBootTest +class ConsumerServiceTest { + + @Resource + private IConsumeMessageService consumeMessageService; + + private static final String MESSAGE_TOPIC = "baogutang.demo.message"; + + + @Test + void test_consume_message() throws InterruptedException { + consumeMessageService.consumeRedisMessage(MESSAGE_TOPIC); + + Thread.sleep(10000); + } +} diff --git a/src/test/java/top/baogutang/demo/service/SendMessageServiceTest.java b/src/test/java/top/baogutang/demo/service/SendMessageServiceTest.java new file mode 100644 index 0000000..6e61982 --- /dev/null +++ b/src/test/java/top/baogutang/demo/service/SendMessageServiceTest.java @@ -0,0 +1,34 @@ +package top.baogutang.demo.service; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import top.baogutang.demo.message.ImagePushMessage; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +/** + * + * @description: + * + * @author: N1KO + * @date: 2024/12/13 : 15:28 + */ +@Slf4j +@SpringBootTest +class SendMessageServiceTest { + + @Resource + private ISendMessageService sendMessageService; + + private static final String MESSAGE_TOPIC = "baogutang.demo.message"; + + @Test + void test_send_message() { + ImagePushMessage imagePushMessage = new ImagePushMessage(); + imagePushMessage.setUserId(1L); + sendMessageService.sendMessage(imagePushMessage, 2L, TimeUnit.SECONDS, MESSAGE_TOPIC); + } + +}