
16 changed files with 504 additions and 12 deletions
@ -0,0 +1,17 @@ |
|||
package com.yxt.messagecenter.api.kafka; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @description kafka Topics (使用枚举报错) |
|||
* @date 2022/03/22 |
|||
*/ |
|||
public class KafKaTopics { |
|||
|
|||
/** |
|||
* 提醒联系潜在客户 |
|||
*/ |
|||
public static final String CUSTOMERREMIND = "customerRemind"; |
|||
|
|||
|
|||
} |
@ -0,0 +1,20 @@ |
|||
package com.yxt.messagecenter.api.kafka; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @description |
|||
* @date 2022/03/22 |
|||
*/ |
|||
public class KafaGroupId { |
|||
|
|||
/** |
|||
* 客户管理 |
|||
*/ |
|||
public static final String CRM = "crm"; |
|||
|
|||
/** |
|||
* 销售业务 |
|||
*/ |
|||
public static final String buscenter = "buscenter"; |
|||
} |
@ -0,0 +1,26 @@ |
|||
package com.yxt.messagecenter.api.kafka; |
|||
|
|||
import com.yxt.messagecenter.api.message.AppMessageDto; |
|||
import io.swagger.annotations.ApiModel; |
|||
import io.swagger.annotations.ApiModelProperty; |
|||
import lombok.Data; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.Map; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @description |
|||
* @date 2022/03/22 |
|||
*/ |
|||
@ApiModel("kafka消息对象") |
|||
@Data |
|||
public class KafkaMessageDto extends AppMessageDto implements Serializable { |
|||
|
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
@ApiModelProperty("业务参数") |
|||
private Map<String, Object> map; |
|||
|
|||
} |
@ -0,0 +1,35 @@ |
|||
package com.yxt.messagecenter.api.message; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @Description: 推送的消息打开的action(APP)枚举类 |
|||
* @date 2022/03/28 |
|||
*/ |
|||
public enum MsgTargetUriEnum { |
|||
APPUPDATE("app更新", "com.anrui.android.activity.UpdateActivity"), |
|||
CUSTOMERREMIND("客户提醒", "com.anrui.android.plugin.autoservice.activity.DetailCustomerActivity"); |
|||
|
|||
/** |
|||
* uri名称 |
|||
*/ |
|||
private final String uriName; |
|||
|
|||
/** |
|||
* uri |
|||
*/ |
|||
private final String uri; |
|||
|
|||
MsgTargetUriEnum(String uriName, String uri) { |
|||
this.uriName = uriName; |
|||
this.uri = uri; |
|||
} |
|||
|
|||
public String getUriName() { |
|||
return uriName; |
|||
} |
|||
|
|||
public String getUri() { |
|||
return uri; |
|||
} |
|||
} |
@ -0,0 +1,36 @@ |
|||
package com.yxt.messagecenter.api.message; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @Description: 消息标题枚举类 |
|||
* @date 2022/03/28 |
|||
*/ |
|||
public enum MsgTitleEnum { |
|||
|
|||
CUSTOMERREMIND("客户跟进提醒", "customerRemind"), |
|||
; |
|||
|
|||
/** |
|||
* 标题名称 |
|||
*/ |
|||
private final String titleName; |
|||
|
|||
/** |
|||
* 标题 |
|||
*/ |
|||
private final String title; |
|||
|
|||
MsgTitleEnum(String titleName, String title) { |
|||
this.titleName = titleName; |
|||
this.title = title; |
|||
} |
|||
|
|||
public String getTitleName() { |
|||
return titleName; |
|||
} |
|||
|
|||
public String getTitle() { |
|||
return title; |
|||
} |
|||
} |
@ -0,0 +1,39 @@ |
|||
package com.yxt.messagecenter.api.messagetype; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @Description: 消息类别枚举类 |
|||
* @date 2022/03/21 |
|||
*/ |
|||
public enum MessageTypeEnum { |
|||
|
|||
SYSTEM("4598b44e-52df-4d83-b012-4db1ea7e5ed7", "系统消息"), |
|||
CUSTOMER("7344e7d7-4f5e-11ec-981f-6479f0dd0373", "客户"), |
|||
FINANCE("2ab6dac4-87c0-11ec-b13a-fa163e0cb33c", "财务"), |
|||
APPROVAL("b4b50229-cce0-4cb1-8c0a-ae7545a9ec59", "审批中心"), |
|||
; |
|||
|
|||
/** |
|||
* 消息类别sid |
|||
*/ |
|||
private final String msgTypeSid; |
|||
|
|||
/** |
|||
* 消息类别名称 |
|||
*/ |
|||
private final String msgTypeName; |
|||
|
|||
MessageTypeEnum(String msgTypeSid, String msgTypeName) { |
|||
this.msgTypeSid = msgTypeSid; |
|||
this.msgTypeName = msgTypeName; |
|||
} |
|||
|
|||
public String getMsgTypeSid() { |
|||
return msgTypeSid; |
|||
} |
|||
|
|||
public String getMsgTypeName() { |
|||
return msgTypeName; |
|||
} |
|||
} |
@ -0,0 +1,149 @@ |
|||
package com.yxt.messagecenter.biz.kafka; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.yxt.anrui.crm.api.crmcustomertemp.CrmCustomerTempVo; |
|||
import com.yxt.anrui.portal.api.sysuser.SysUserFeign; |
|||
import com.yxt.anrui.portal.api.sysuser.SysUserVo; |
|||
import com.yxt.anrui.portal.api.sysuserrole.SysUserRoleFeign; |
|||
import com.yxt.common.base.utils.JPushServer; |
|||
import com.yxt.common.base.utils.StringUtils; |
|||
import com.yxt.common.core.result.ResultBean; |
|||
import com.yxt.messagecenter.api.kafka.KafKaTopics; |
|||
import com.yxt.messagecenter.api.kafka.KafaGroupId; |
|||
import com.yxt.messagecenter.api.kafka.KafkaMessageDto; |
|||
import com.yxt.messagecenter.api.message.MessageDto; |
|||
import com.yxt.messagecenter.api.messagelist.MessageListDto; |
|||
import com.yxt.messagecenter.api.messagetype.MessageTypeEnum; |
|||
import com.yxt.messagecenter.biz.message.MessageService; |
|||
import com.yxt.messagecenter.biz.messagelist.MessageListService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.collections4.CollectionUtils; |
|||
import org.apache.kafka.clients.consumer.ConsumerRecord; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.kafka.annotation.KafkaListener; |
|||
import org.springframework.kafka.support.Acknowledgment; |
|||
import org.springframework.kafka.support.KafkaHeaders; |
|||
import org.springframework.messaging.handler.annotation.Header; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
|
|||
import java.time.LocalTime; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
|
|||
@Component |
|||
@Slf4j |
|||
public class KafkaConsumer { |
|||
|
|||
@Autowired |
|||
private MessageService messageService; |
|||
|
|||
@Autowired |
|||
private MessageListService messageListService; |
|||
|
|||
@Autowired |
|||
private SysUserFeign sysUserFeign; |
|||
|
|||
@Autowired |
|||
private SysUserRoleFeign sysUserRoleFeign; |
|||
|
|||
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1) |
|||
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { |
|||
|
|||
Optional message = Optional.ofNullable(record.value()); |
|||
if (message.isPresent()) { |
|||
Object msg = message.get(); |
|||
log.info("buscenter 消费了: Topic:" + topic + ",Message:" + msg); |
|||
ack.acknowledge(); |
|||
MessageDto message2 = JSONObject.parseObject(msg.toString(), MessageDto.class); |
|||
messageService.saveOrUpdateDto(message2); |
|||
log.info("buscenter 消费了: Topic:" + topic + ",message2:" + message2); |
|||
List<String> groupIdList = message2.getGroupIdList(); |
|||
if (CollectionUtils.isNotEmpty(groupIdList)) { |
|||
ResultBean<List<String>> resultBean = sysUserRoleFeign.getUserIdListByRoleSid(groupIdList); |
|||
if (resultBean.getSuccess() && resultBean.getData() != null) { |
|||
List<String> list = resultBean.getData(); |
|||
// 保存消息体、保存消息列表
|
|||
// TODO 与kafka集成
|
|||
JPushServer.sendPushAlias(message2.getMsgTitle(), message2.getMsgContent(), message2.getMsgTypeKey(), message2.getMsgType(), list.toArray(list.toArray(new String[list.size()]))); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 提醒业务员联系客户 |
|||
* @param record |
|||
*/ |
|||
@Transactional(rollbackFor = Exception.class) |
|||
@KafkaListener(topics = KafKaTopics.CUSTOMERREMIND, groupId = KafaGroupId.CRM) |
|||
public void customerRemind(ConsumerRecord<String, Object> record) { |
|||
Optional message = Optional.ofNullable(record.value()); |
|||
if (message.isPresent()) { |
|||
Object msg = message.get(); |
|||
KafkaMessageDto kafkaMessage = JSONObject.parseObject(msg.toString(), KafkaMessageDto.class); |
|||
Map<String, Object> map = kafkaMessage.getMap(); |
|||
// json串转为实体类
|
|||
List<CrmCustomerTempVo> customerList = JSON.parseArray(JSON.parse(map.get("customerList").toString()).toString(), CrmCustomerTempVo.class); |
|||
MessageListDto messageListDto = null; |
|||
String msgSid = ""; |
|||
String userSid = ""; |
|||
String id = ""; |
|||
for (int i = 0; i < customerList.size(); i++) { |
|||
CrmCustomerTempVo vo = customerList.get(i); |
|||
// 1、保存消息
|
|||
kafkaMessage.setBusinessSid(vo.getSid()); |
|||
// 内容:姓名 电话 客户类型 客户级别 客户提醒
|
|||
String name = StringUtils.isBlank(vo.getName()) ? "" : vo.getName()+" "; |
|||
String mobile = StringUtils.isBlank(vo.getMobile()) ? "" : vo.getMobile()+" "; |
|||
String customerType = StringUtils.isBlank(vo.getCustomerType()) ? "" : vo.getCustomerType()+" "; |
|||
String level = StringUtils.isBlank(vo.getLevel()) ? "" : vo.getLevel()+" "; |
|||
String remind_remark = StringUtils.isBlank(vo.getRemind_remark()) ? "" : ("/n" + vo.getRemind_remark()); |
|||
kafkaMessage.setMsgContent(name+mobile+customerType+level+remind_remark); |
|||
JSONObject json = new JSONObject(); |
|||
json.put("customerSid", vo.getSid()); |
|||
kafkaMessage.setArgs_json(json.toJSONString()); |
|||
ResultBean resultBean = messageService.saveOrUpdateAppMessage(kafkaMessage); |
|||
msgSid = (String) resultBean.getData(); |
|||
// 2、保存至消息列表
|
|||
if (!userSid.equals(vo.getCreateBySid())) { |
|||
userSid = vo.getCreateBySid(); |
|||
ResultBean<SysUserVo> userResultBean = sysUserFeign.fetchBySid(userSid); |
|||
if (userResultBean.getSuccess() && userResultBean.getData() != null) { |
|||
id = userResultBean.getData().getId().toString(); |
|||
} |
|||
} |
|||
messageListDto = new MessageListDto(); |
|||
messageListDto.setMsgSid(msgSid); |
|||
messageListDto.setRDelStatus(0); |
|||
messageListDto.setSDelStatus(0); |
|||
messageListDto.setReceiverSid(userSid); |
|||
messageListService.saveOrUpdateDto(messageListDto); |
|||
// 3、极光推送
|
|||
// JPushServer.sendPushAlias("联系客户", MessageFormat.format("请联系 {0} 客户", vo.getName()), MessageTypeEnum.CUSTOMER.getMsgTypeSid(), "客户", id);
|
|||
JPushServer.sendPushAlias(kafkaMessage.getMsgTitle(), |
|||
StringUtils.isBlank(vo.getRemind_remark()) ? kafkaMessage.getMsgContent() : kafkaMessage.getMsgContent(), |
|||
MessageTypeEnum.CUSTOMER.getMsgTypeSid(), "客户", id); |
|||
} |
|||
} |
|||
} |
|||
|
|||
@KafkaListener(topics ="abcTest", groupId ="test") |
|||
public void abcTest(ConsumerRecord<String, Object> record) { |
|||
Object value = record.value(); |
|||
System.out.println("value:"+value); |
|||
System.out.println(LocalTime.now()); |
|||
try { |
|||
Thread.sleep(10000); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
System.out.println(LocalTime.now()); |
|||
System.out.println("record:"+ JSON.toJSON(record.value())); |
|||
KafkaMessageDto messageDto = JSONObject.parseObject(record.value().toString(), KafkaMessageDto.class); |
|||
System.out.println("businessSid:" + messageDto.getBusinessSid()); |
|||
System.out.println("msgContent:" + messageDto.getMsgContent()); |
|||
} |
|||
} |
@ -0,0 +1,59 @@ |
|||
package com.yxt.messagecenter.biz.kafka; |
|||
|
|||
import org.apache.kafka.clients.consumer.ConsumerRecord; |
|||
import org.apache.kafka.clients.consumer.ConsumerRecords; |
|||
import org.apache.kafka.clients.consumer.KafkaConsumer; |
|||
|
|||
import java.time.Duration; |
|||
import java.util.Arrays; |
|||
import java.util.Properties; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @description |
|||
* @date 2022/03/25 |
|||
*/ |
|||
|
|||
public class KafkaConsumerTest { |
|||
|
|||
public static void main(String[] args) throws InterruptedException { |
|||
|
|||
Properties props = new Properties(); |
|||
props.put("bootstrap.servers", "127.0.0.1:9092"); |
|||
// 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
|
|||
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
|
|||
props.setProperty("group.id", "test"); |
|||
// 自动提交offset
|
|||
props.setProperty("enable.auto.commit", "true"); |
|||
// 自动提交offset的时间间隔
|
|||
props.setProperty("auto.commit.interval.ms", "1000"); |
|||
// 拉取的key、value数据的
|
|||
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
|||
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
|||
|
|||
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); |
|||
|
|||
// 指定消费者从哪个topic中拉取数据
|
|||
kafkaConsumer.subscribe(Arrays.asList("test")); |
|||
|
|||
// 4.使用一个while循环,不断从Kafka的topic中拉取消息
|
|||
while(true) { |
|||
// Kafka的消费者一次拉取一批的数据
|
|||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); |
|||
// 5.将将记录(record)的offset、key、value都打印出来
|
|||
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { |
|||
// 主题
|
|||
String topic = consumerRecord.topic(); |
|||
// offset:这条消息处于Kafka分区中的哪个位置
|
|||
long offset = consumerRecord.offset(); |
|||
// key\value
|
|||
String key = consumerRecord.key(); |
|||
String value = consumerRecord.value(); |
|||
|
|||
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); |
|||
} |
|||
Thread.sleep(1000); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,14 @@ |
|||
package com.yxt.messagecenter.biz.kafka; |
|||
|
|||
public class KafkaProducer { |
|||
|
|||
//自定义topic
|
|||
public static final String TOPIC_TEST = "aaa"; |
|||
|
|||
//
|
|||
public static final String TOPIC_GROUP1 = "crm"; |
|||
|
|||
//
|
|||
public static final String TOPIC_GROUP2 = "scm"; |
|||
|
|||
} |
@ -0,0 +1,66 @@ |
|||
package com.yxt.messagecenter.biz.kafka; |
|||
|
|||
import org.apache.kafka.clients.producer.Callback; |
|||
import org.apache.kafka.clients.producer.KafkaProducer; |
|||
import org.apache.kafka.clients.producer.ProducerRecord; |
|||
import org.apache.kafka.clients.producer.RecordMetadata; |
|||
|
|||
import java.util.Properties; |
|||
|
|||
/** |
|||
* @author liuguohui |
|||
* @version 1.0 |
|||
* @description |
|||
* @date 2022/03/25 |
|||
*/ |
|||
public class KafkaProducerTest { |
|||
|
|||
public static void main(String[] args) { |
|||
Properties props = new Properties(); |
|||
props.put("bootstrap.servers", "127.0.0.1:9092"); |
|||
props.put("acks", "all"); |
|||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
|||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
|||
|
|||
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); |
|||
|
|||
for(int i = 0; i < 2; ++i) { |
|||
// 一、使用同步等待的方式发送消息
|
|||
// // 构建一条消息,直接new ProducerRecord
|
|||
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
|
|||
// Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
|
|||
// // 调用Future的get方法等待响应
|
|||
// future.get();
|
|||
// System.out.println("第" + i + "条消息写入成功!");
|
|||
|
|||
// 二、使用异步回调的方式发送消息
|
|||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + ""); |
|||
kafkaProducer.send(producerRecord, new Callback() { |
|||
@Override |
|||
public void onCompletion(RecordMetadata metadata, Exception exception) { |
|||
// 1. 判断发送消息是否成功
|
|||
if(exception == null) { |
|||
// 发送成功
|
|||
// 主题
|
|||
String topic = metadata.topic(); |
|||
// 分区id
|
|||
int partition = metadata.partition(); |
|||
// 偏移量
|
|||
long offset = metadata.offset(); |
|||
System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset); |
|||
} |
|||
else { |
|||
// 发送出现错误
|
|||
System.out.println("生产消息出现异常!"); |
|||
// 打印异常消息
|
|||
System.out.println(exception.getMessage()); |
|||
// 打印调用栈
|
|||
System.out.println(exception.getStackTrace()); |
|||
} |
|||
} |
|||
}); |
|||
} |
|||
|
|||
kafkaProducer.close(); |
|||
} |
|||
} |
Loading…
Reference in new issue