diff --git a/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafKaTopics.java b/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafKaTopics.java new file mode 100644 index 0000000..0891e6a --- /dev/null +++ b/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafKaTopics.java @@ -0,0 +1,17 @@ +package com.yxt.messagecenter.api.kafka; + +/** + * @author liuguohui + * @version 1.0 + * @description kafka Topics (使用枚举报错) + * @date 2022/03/22 + */ +public class KafKaTopics { + + /** + * 提醒联系潜在客户 + */ + public static final String CUSTOMERREMIND = "customerRemind"; + + +} diff --git a/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafaGroupId.java b/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafaGroupId.java new file mode 100644 index 0000000..d7fcd0c --- /dev/null +++ b/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafaGroupId.java @@ -0,0 +1,20 @@ +package com.yxt.messagecenter.api.kafka; + +/** + * @author liuguohui + * @version 1.0 + * @description + * @date 2022/03/22 + */ +public class KafaGroupId { + + /** + * 客户管理 + */ + public static final String CRM = "crm"; + + /** + * 销售业务 + */ + public static final String buscenter = "buscenter"; +} diff --git a/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafkaMessageDto.java b/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafkaMessageDto.java new file mode 100644 index 0000000..52702fd --- /dev/null +++ b/message-center-api/src/main/java/com/yxt/messagecenter/api/kafka/KafkaMessageDto.java @@ -0,0 +1,26 @@ +package com.yxt.messagecenter.api.kafka; + +import com.yxt.messagecenter.api.message.AppMessageDto; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; + +/** + * @author liuguohui + * @version 1.0 + * @description + * @date 2022/03/22 + */ +@ApiModel("kafka消息对象") +@Data +public class KafkaMessageDto extends AppMessageDto implements Serializable { + + private static final long serialVersionUID = 1L; + + @ApiModelProperty("业务参数") + private Map map; + +} diff --git a/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MessageDto.java b/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MessageDto.java index 8a09bc0..17329c6 100644 --- a/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MessageDto.java +++ b/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MessageDto.java @@ -7,6 +7,8 @@ import io.swagger.annotations.ApiModelProperty; import lombok.Data; import javax.validation.constraints.NotBlank; +import java.util.List; +import java.util.Set; /** * Project: message-center(消息中心)
@@ -93,4 +95,8 @@ public class MessageDto implements Dto { @ApiModelProperty(value = "接收人sid(多个人员以,分隔)", required = true) @NotBlank(message = "接收人sid不能为空") private String receiveSids; + + private String assignee; + + private List groupIdList; } \ No newline at end of file diff --git a/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MsgTargetUriEnum.java b/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MsgTargetUriEnum.java new file mode 100644 index 0000000..aa83006 --- /dev/null +++ b/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MsgTargetUriEnum.java @@ -0,0 +1,35 @@ +package com.yxt.messagecenter.api.message; + +/** + * @author liuguohui + * @version 1.0 + * @Description: 推送的消息打开的action(APP)枚举类 + * @date 2022/03/28 + */ +public enum MsgTargetUriEnum { + APPUPDATE("app更新", "com.anrui.android.activity.UpdateActivity"), + CUSTOMERREMIND("客户提醒", "com.anrui.android.plugin.autoservice.activity.DetailCustomerActivity"); + + /** + * uri名称 + */ + private final String uriName; + + /** + * uri + */ + private final String uri; + + MsgTargetUriEnum(String uriName, String uri) { + this.uriName = uriName; + this.uri = uri; + } + + public String getUriName() { + return uriName; + } + + public String getUri() { + return uri; + } +} diff --git a/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MsgTitleEnum.java b/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MsgTitleEnum.java new file mode 100644 index 0000000..ef7d68d --- /dev/null +++ b/message-center-api/src/main/java/com/yxt/messagecenter/api/message/MsgTitleEnum.java @@ -0,0 +1,36 @@ +package com.yxt.messagecenter.api.message; + +/** + * @author liuguohui + * @version 1.0 + * @Description: 消息标题枚举类 + * @date 2022/03/28 + */ +public enum MsgTitleEnum { + + CUSTOMERREMIND("客户跟进提醒", "customerRemind"), + ; + + /** + * 标题名称 + */ + private final String titleName; + + /** + * 标题 + */ + private final String title; + + MsgTitleEnum(String titleName, String title) { + this.titleName = titleName; + this.title = title; + } + + public String getTitleName() { + return titleName; + } + + public String getTitle() { + return title; + } +} diff --git a/message-center-api/src/main/java/com/yxt/messagecenter/api/messagetype/MessageTypeEnum.java b/message-center-api/src/main/java/com/yxt/messagecenter/api/messagetype/MessageTypeEnum.java new file mode 100644 index 0000000..0c9048f --- /dev/null +++ b/message-center-api/src/main/java/com/yxt/messagecenter/api/messagetype/MessageTypeEnum.java @@ -0,0 +1,39 @@ +package com.yxt.messagecenter.api.messagetype; + +/** + * @author liuguohui + * @version 1.0 + * @Description: 消息类别枚举类 + * @date 2022/03/21 + */ +public enum MessageTypeEnum { + + SYSTEM("4598b44e-52df-4d83-b012-4db1ea7e5ed7", "系统消息"), + CUSTOMER("7344e7d7-4f5e-11ec-981f-6479f0dd0373", "客户"), + FINANCE("2ab6dac4-87c0-11ec-b13a-fa163e0cb33c", "财务"), + APPROVAL("b4b50229-cce0-4cb1-8c0a-ae7545a9ec59", "审批中心"), + ; + + /** + * 消息类别sid + */ + private final String msgTypeSid; + + /** + * 消息类别名称 + */ + private final String msgTypeName; + + MessageTypeEnum(String msgTypeSid, String msgTypeName) { + this.msgTypeSid = msgTypeSid; + this.msgTypeName = msgTypeName; + } + + public String getMsgTypeSid() { + return msgTypeSid; + } + + public String getMsgTypeName() { + return msgTypeName; + } +} diff --git a/message-center-biz/message-center-biz.iml b/message-center-biz/message-center-biz.iml index 1f6735c..91b6a39 100644 --- a/message-center-biz/message-center-biz.iml +++ b/message-center-biz/message-center-biz.iml @@ -41,8 +41,6 @@ - - @@ -54,11 +52,7 @@ - - - - @@ -78,11 +72,9 @@ - - @@ -148,6 +140,9 @@ + + + @@ -191,6 +186,21 @@ + + + + + + + + + + + + + + + diff --git a/message-center-biz/pom.xml b/message-center-biz/pom.xml index 189695f..c6a4866 100644 --- a/message-center-biz/pom.xml +++ b/message-center-biz/pom.xml @@ -29,6 +29,11 @@ com.yxt.anrui 0.0.1 + + anrui-crm-api + com.yxt.anrui + 0.0.1 + @@ -36,6 +41,16 @@ spring-cloud-starter-alibaba-nacos-discovery + + + org.springframework.kafka + spring-kafka + + + org.apache.kafka + kafka-clients + + mysql mysql-connector-java diff --git a/message-center-biz/src/main/java/com/yxt/messagecenter/MessageCenterApplication.java b/message-center-biz/src/main/java/com/yxt/messagecenter/MessageCenterApplication.java index 6e35041..021396b 100644 --- a/message-center-biz/src/main/java/com/yxt/messagecenter/MessageCenterApplication.java +++ b/message-center-biz/src/main/java/com/yxt/messagecenter/MessageCenterApplication.java @@ -13,7 +13,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; "com.yxt.messagecenter" }) -@EnableFeignClients(basePackages = {"com.yxt.anrui.portal"}) +@EnableFeignClients(basePackages = {"com.yxt.anrui.portal", "com.yxt.anrui.crm"}) public class MessageCenterApplication { public static void main( String[] args ) diff --git a/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaConsumer.java b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaConsumer.java new file mode 100644 index 0000000..7482ded --- /dev/null +++ b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaConsumer.java @@ -0,0 +1,149 @@ +package com.yxt.messagecenter.biz.kafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.yxt.anrui.crm.api.crmcustomertemp.CrmCustomerTempVo; +import com.yxt.anrui.portal.api.sysuser.SysUserFeign; +import com.yxt.anrui.portal.api.sysuser.SysUserVo; +import com.yxt.anrui.portal.api.sysuserrole.SysUserRoleFeign; +import com.yxt.common.base.utils.JPushServer; +import com.yxt.common.base.utils.StringUtils; +import com.yxt.common.core.result.ResultBean; +import com.yxt.messagecenter.api.kafka.KafKaTopics; +import com.yxt.messagecenter.api.kafka.KafaGroupId; +import com.yxt.messagecenter.api.kafka.KafkaMessageDto; +import com.yxt.messagecenter.api.message.MessageDto; +import com.yxt.messagecenter.api.messagelist.MessageListDto; +import com.yxt.messagecenter.api.messagetype.MessageTypeEnum; +import com.yxt.messagecenter.biz.message.MessageService; +import com.yxt.messagecenter.biz.messagelist.MessageListService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalTime; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Component +@Slf4j +public class KafkaConsumer { + + @Autowired + private MessageService messageService; + + @Autowired + private MessageListService messageListService; + + @Autowired + private SysUserFeign sysUserFeign; + + @Autowired + private SysUserRoleFeign sysUserRoleFeign; + + @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1) + public void topic_test(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + + Optional message = Optional.ofNullable(record.value()); + if (message.isPresent()) { + Object msg = message.get(); + log.info("buscenter 消费了: Topic:" + topic + ",Message:" + msg); + ack.acknowledge(); + MessageDto message2 = JSONObject.parseObject(msg.toString(), MessageDto.class); + messageService.saveOrUpdateDto(message2); + log.info("buscenter 消费了: Topic:" + topic + ",message2:" + message2); + List groupIdList = message2.getGroupIdList(); + if (CollectionUtils.isNotEmpty(groupIdList)) { + ResultBean> resultBean = sysUserRoleFeign.getUserIdListByRoleSid(groupIdList); + if (resultBean.getSuccess() && resultBean.getData() != null) { + List list = resultBean.getData(); + // 保存消息体、保存消息列表 + // TODO 与kafka集成 + JPushServer.sendPushAlias(message2.getMsgTitle(), message2.getMsgContent(), message2.getMsgTypeKey(), message2.getMsgType(), list.toArray(list.toArray(new String[list.size()]))); + } + } + } + } + + /** + * 提醒业务员联系客户 + * @param record + */ + @Transactional(rollbackFor = Exception.class) + @KafkaListener(topics = KafKaTopics.CUSTOMERREMIND, groupId = KafaGroupId.CRM) + public void customerRemind(ConsumerRecord record) { + Optional message = Optional.ofNullable(record.value()); + if (message.isPresent()) { + Object msg = message.get(); + KafkaMessageDto kafkaMessage = JSONObject.parseObject(msg.toString(), KafkaMessageDto.class); + Map map = kafkaMessage.getMap(); + // json串转为实体类 + List customerList = JSON.parseArray(JSON.parse(map.get("customerList").toString()).toString(), CrmCustomerTempVo.class); + MessageListDto messageListDto = null; + String msgSid = ""; + String userSid = ""; + String id = ""; + for (int i = 0; i < customerList.size(); i++) { + CrmCustomerTempVo vo = customerList.get(i); + // 1、保存消息 + kafkaMessage.setBusinessSid(vo.getSid()); + // 内容:姓名 电话 客户类型 客户级别 客户提醒 + String name = StringUtils.isBlank(vo.getName()) ? "" : vo.getName()+" "; + String mobile = StringUtils.isBlank(vo.getMobile()) ? "" : vo.getMobile()+" "; + String customerType = StringUtils.isBlank(vo.getCustomerType()) ? "" : vo.getCustomerType()+" "; + String level = StringUtils.isBlank(vo.getLevel()) ? "" : vo.getLevel()+" "; + String remind_remark = StringUtils.isBlank(vo.getRemind_remark()) ? "" : ("/n" + vo.getRemind_remark()); + kafkaMessage.setMsgContent(name+mobile+customerType+level+remind_remark); + JSONObject json = new JSONObject(); + json.put("customerSid", vo.getSid()); + kafkaMessage.setArgs_json(json.toJSONString()); + ResultBean resultBean = messageService.saveOrUpdateAppMessage(kafkaMessage); + msgSid = (String) resultBean.getData(); + // 2、保存至消息列表 + if (!userSid.equals(vo.getCreateBySid())) { + userSid = vo.getCreateBySid(); + ResultBean userResultBean = sysUserFeign.fetchBySid(userSid); + if (userResultBean.getSuccess() && userResultBean.getData() != null) { + id = userResultBean.getData().getId().toString(); + } + } + messageListDto = new MessageListDto(); + messageListDto.setMsgSid(msgSid); + messageListDto.setRDelStatus(0); + messageListDto.setSDelStatus(0); + messageListDto.setReceiverSid(userSid); + messageListService.saveOrUpdateDto(messageListDto); + // 3、极光推送 + // JPushServer.sendPushAlias("联系客户", MessageFormat.format("请联系 {0} 客户", vo.getName()), MessageTypeEnum.CUSTOMER.getMsgTypeSid(), "客户", id); + JPushServer.sendPushAlias(kafkaMessage.getMsgTitle(), + StringUtils.isBlank(vo.getRemind_remark()) ? kafkaMessage.getMsgContent() : kafkaMessage.getMsgContent(), + MessageTypeEnum.CUSTOMER.getMsgTypeSid(), "客户", id); + } + } + } + + @KafkaListener(topics ="abcTest", groupId ="test") + public void abcTest(ConsumerRecord record) { + Object value = record.value(); + System.out.println("value:"+value); + System.out.println(LocalTime.now()); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(LocalTime.now()); + System.out.println("record:"+ JSON.toJSON(record.value())); + KafkaMessageDto messageDto = JSONObject.parseObject(record.value().toString(), KafkaMessageDto.class); + System.out.println("businessSid:" + messageDto.getBusinessSid()); + System.out.println("msgContent:" + messageDto.getMsgContent()); + } +} diff --git a/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaConsumerTest.java b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaConsumerTest.java new file mode 100644 index 0000000..cf3d31d --- /dev/null +++ b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaConsumerTest.java @@ -0,0 +1,59 @@ +package com.yxt.messagecenter.biz.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; + +/** + * @author liuguohui + * @version 1.0 + * @description + * @date 2022/03/25 + */ + +public class KafkaConsumerTest { + + public static void main(String[] args) throws InterruptedException { + + Properties props = new Properties(); + props.put("bootstrap.servers", "127.0.0.1:9092"); + // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据 + // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 + props.setProperty("group.id", "test"); + // 自动提交offset + props.setProperty("enable.auto.commit", "true"); + // 自动提交offset的时间间隔 + props.setProperty("auto.commit.interval.ms", "1000"); + // 拉取的key、value数据的 + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + KafkaConsumer kafkaConsumer = new KafkaConsumer(props); + + // 指定消费者从哪个topic中拉取数据 + kafkaConsumer.subscribe(Arrays.asList("test")); + + // 4.使用一个while循环,不断从Kafka的topic中拉取消息 + while(true) { + // Kafka的消费者一次拉取一批的数据 + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); + // 5.将将记录(record)的offset、key、value都打印出来 + for (ConsumerRecord consumerRecord : consumerRecords) { + // 主题 + String topic = consumerRecord.topic(); + // offset:这条消息处于Kafka分区中的哪个位置 + long offset = consumerRecord.offset(); + // key\value + String key = consumerRecord.key(); + String value = consumerRecord.value(); + + System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); + } + Thread.sleep(1000); + } + } +} diff --git a/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaProducer.java b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaProducer.java new file mode 100644 index 0000000..dac6ba7 --- /dev/null +++ b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaProducer.java @@ -0,0 +1,14 @@ +package com.yxt.messagecenter.biz.kafka; + +public class KafkaProducer { + + //自定义topic + public static final String TOPIC_TEST = "aaa"; + + // + public static final String TOPIC_GROUP1 = "crm"; + + // + public static final String TOPIC_GROUP2 = "scm"; + +} diff --git a/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaProducerTest.java b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaProducerTest.java new file mode 100644 index 0000000..35a7ab7 --- /dev/null +++ b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/kafka/KafkaProducerTest.java @@ -0,0 +1,66 @@ +package com.yxt.messagecenter.biz.kafka; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.Properties; + +/** + * @author liuguohui + * @version 1.0 + * @description + * @date 2022/03/25 + */ +public class KafkaProducerTest { + + public static void main(String[] args) { + Properties props = new Properties(); + props.put("bootstrap.servers", "127.0.0.1:9092"); + props.put("acks", "all"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + KafkaProducer kafkaProducer = new KafkaProducer(props); + + for(int i = 0; i < 2; ++i) { + // 一、使用同步等待的方式发送消息 + // // 构建一条消息,直接new ProducerRecord + // ProducerRecord producerRecord = new ProducerRecord<>("test", null, i + ""); + // Future future = kafkaProducer.send(producerRecord); + // // 调用Future的get方法等待响应 + // future.get(); + // System.out.println("第" + i + "条消息写入成功!"); + + // 二、使用异步回调的方式发送消息 + ProducerRecord producerRecord = new ProducerRecord<>("test", null, i + ""); + kafkaProducer.send(producerRecord, new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + // 1. 判断发送消息是否成功 + if(exception == null) { + // 发送成功 + // 主题 + String topic = metadata.topic(); + // 分区id + int partition = metadata.partition(); + // 偏移量 + long offset = metadata.offset(); + System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset); + } + else { + // 发送出现错误 + System.out.println("生产消息出现异常!"); + // 打印异常消息 + System.out.println(exception.getMessage()); + // 打印调用栈 + System.out.println(exception.getStackTrace()); + } + } + }); + } + + kafkaProducer.close(); + } +} diff --git a/message-center-biz/src/main/java/com/yxt/messagecenter/biz/messagelist/MessageListService.java b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/messagelist/MessageListService.java index 0707ad0..a337e8a 100644 --- a/message-center-biz/src/main/java/com/yxt/messagecenter/biz/messagelist/MessageListService.java +++ b/message-center-biz/src/main/java/com/yxt/messagecenter/biz/messagelist/MessageListService.java @@ -306,11 +306,11 @@ public class MessageListService extends MybatisBaseService pagerVo = PagerUtil.pageToVo(page, null); pagerVo.getRecords().stream().forEach(vo -> { if (vo.getType().equals("2")) { - AppSubsetVersionVo subsetVersionVo = appSubsetVersionFeign.fetchBySid(vo.getModuleSid()).getData(); + AppSubsetVersionVo subsetVersionVo = appSubsetVersionFeign.getLastByAppSid(vo.getModuleSid()).getData(); vo.setPath(subsetVersionVo.getUpdateUrl()); vo.setModulePluginName(subsetVersionVo.getModulePluginName()); if (StringUtils.isNotBlank(subsetVersionVo.getVersionName())) { - vo.setModuleVersion(Integer.parseInt(subsetVersionVo.getVersionName())); + vo.setModuleVersion(subsetVersionVo.getVersionCode()); } } if (StringUtils.isBlank(vo.getJson())) { diff --git a/message-center-biz/src/main/resources/logback-spring.xml b/message-center-biz/src/main/resources/logback-spring.xml index b425310..f9f196c 100644 --- a/message-center-biz/src/main/resources/logback-spring.xml +++ b/message-center-biz/src/main/resources/logback-spring.xml @@ -1,7 +1,7 @@ - +