Browse Source

kafka消息队列

master
wangpengfei 2 years ago
parent
commit
aabccff1c8
  1. 26
      yxt_supervise/supervise-customer/supervise-customer-biz/pom.xml
  2. 9
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragegd/GdInstorageGdMapper.java
  3. 36
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragegd/GdInstorageGdRest.java
  4. 18
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragegd/GdInstorageGdService.java
  5. 4
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragelog/GdInstorageLogMapper.java
  6. 5
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragelog/GdInstorageLogService.java
  7. 5
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventorylog/GdInventoryLogMapper.java
  8. 2
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventorylog/GdInventoryLogMapper.xml
  9. 6
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventorylog/GdInventoryLogService.java
  10. 30
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventoryok/GdInventoryOkRest.java
  11. 4
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventoryok/GdInventoryOkService.java
  12. 32
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsales/GdSalesRest.java
  13. 4
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsales/GdSalesService.java
  14. 5
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsaleslog/GdSalesLogMapper.java
  15. 6
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsaleslog/GdSalesLogService.java
  16. 11
      yxt_supervise/supervise-customer/supervise-customer-biz/src/main/resources/application.yml
  17. 4
      yxt_supervise/supervise-report/supervise-report-biz/pom.xml
  18. 24
      yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/biz/reportsalesdaylog/ReportSalesDayLogService.java
  19. 15
      yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageconsumer/MessageConsumerRest.java
  20. 25
      yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageconsumer/MessageConsumerService.java
  21. 24
      yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageproducer/MessageProducerRest.java
  22. 26
      yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageproducer/MessageProducerService.java
  23. 13
      yxt_supervise/supervise-report/supervise-report-biz/src/main/resources/application.yml

26
yxt_supervise/supervise-customer/supervise-customer-biz/pom.xml

@ -123,17 +123,21 @@
<classifier>jdk15</classifier>
<version>2.4</version>
</dependency>
<dependency>
<groupId>com.yxt.supervise</groupId>
<artifactId>supervise-report-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.yxt.supervise</groupId>
<artifactId>supervise-report-biz</artifactId>
<version>0.0.1</version>
<scope>compile</scope>
<!-- <dependency>-->
<!-- <groupId>com.yxt.supervise</groupId>-->
<!-- <artifactId>supervise-report-api</artifactId>-->
<!-- <version>0.0.1-SNAPSHOT</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.yxt.supervise</groupId>-->
<!-- <artifactId>supervise-report-biz</artifactId>-->
<!-- <version>0.0.1</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>

9
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragegd/GdInstorageGdMapper.java

@ -7,9 +7,6 @@ import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.yxt.supervise.customer.api.gdinstoragegd.GdInstorageGd;
import com.yxt.supervise.customer.api.gdinstoragegd.GdInstorageGdExcelVo;
import com.yxt.supervise.customer.api.gdinstoragegd.GdInstorageGdVo;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLog;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLogDto;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLogVo;
import org.apache.ibatis.annotations.*;
import java.util.List;
@ -28,10 +25,4 @@ public interface GdInstorageGdMapper extends BaseMapper<GdInstorageGd> {
@Select("select CONVERT(IFNULL(sum(colq16),0),DECIMAL(12,2)) as amount from gd_instorage where orderDate=#{orderDate}")
double amountOfDay(@Param("orderDate") String orderDate);
@Select("select * from report_sales_day_log where orderDate=#{orderDate}")
public ReportSalesDayLogVo getReportLog (@Param("orderDate") String orderDate);
@Insert("insert into report_sales_day_log ")
public int insertReportLog(ReportSalesDayLog reportSalesDayLog);
@Update("update ")
public int udpateReportLog(ReportSalesDayLog reportSalesDayLog);
}

36
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragegd/GdInstorageGdRest.java

@ -8,6 +8,9 @@ import com.yxt.supervise.customer.api.gdinstoragegd.*;
import com.yxt.supervise.customer.api.gdinstoragelog.GdInstorageLog;
import com.yxt.supervise.customer.api.gdsales.GdSalesExcelVo;
import com.yxt.supervise.customer.api.gdsales.GdSalesQuery;
import com.yxt.supervise.customer.biz.gdinstoragelog.GdInstorageLogService;
import com.yxt.supervise.customer.biz.gdinventorylog.GdInventoryLogService;
import com.yxt.supervise.customer.biz.gdsaleslog.GdSalesLogService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
@ -29,6 +32,18 @@ import java.util.List;
public class GdInstorageGdRest implements GdInstorageGdFeign {
@Autowired
private GdInstorageGdService gdInstorageGdService;
@Autowired
GdInstorageLogService instorageLogService;
@Autowired
GdInventoryLogService inventoryLogService;
@Autowired
GdSalesLogService gdSalesLogService;
@PostMapping("/sendMsg")
public void send(@RequestBody String message) {
gdInstorageGdService.sendMsg(message);
}
@ApiOperation("根据条件分页查询数据的列表")
@PostMapping("/listPage")
public ResultBean<PagerVo<GdInstorageGd>> listPage(@RequestBody PagerQuery<GdInstorageGdQuery> pq) {
@ -56,21 +71,18 @@ public class GdInstorageGdRest implements GdInstorageGdFeign {
public ResultBean<GdInstorageLog> uploadGdData(@RequestParam("file") MultipartFile file) {
ResultBean rb = ResultBean.fireFail();
GdInstorageLog gdInstorageLog=gdInstorageGdService.uploadAndInsert(file);
gdInstorageLog.getOrderDate();
//判断都就给report服务发消息
if(ee(gdInstorageLog.getOrderDate())==true){
this.send(gdInstorageLog.getOrderDate());
}
rb.success().setData(gdInstorageLog);
return rb;
}
public class GdSalesThread extends Thread{
String orderDate;
public GdSalesThread(String orderDate) {
this.orderDate = orderDate;
}
public void run(){
System.out.println("excel线程开启");
gdInstorageGdService.uploadAndInsert(null);
System.out.println("excel线程结束");
public Boolean ee(String orderDate){
//判断入库 销售 库存已经是否上传excel
if(instorageLogService.getLogByOrderDate(orderDate).equals(null) && inventoryLogService.getLogByOrderDate(orderDate).equals(null) && gdSalesLogService.getLogByOrderDate(orderDate).equals(null)){
return true;
}
return false;
}
}

18
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragegd/GdInstorageGdService.java

@ -34,15 +34,13 @@ import com.yxt.supervise.customer.biz.gdrescategoryprod.GdRescategoryProdService
import com.yxt.supervise.customer.biz.storeindex.StoreIndexService;
import com.yxt.supervise.customer.biz.storeinfo.StoreInfoService;
import com.yxt.supervise.customer.biz.supplierindex.SupplierIndexService;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLog;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLogDto;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLogVo;
import com.yxt.supervise.report.biz.reportsalesdaygather.ReportSalesDayGatherService;
import com.yxt.supervise.report.biz.reportsalesdaylog.ReportSalesDayLogService;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
@ -56,6 +54,7 @@ import java.util.List;
*/
@Slf4j
@Service
@EnableBinding(Source.class)
public class GdInstorageGdService extends MybatisBaseService<GdInstorageGdMapper, GdInstorageGd> {
@Autowired
FileUploadComponent fileUploadComponent;
@ -80,10 +79,13 @@ public class GdInstorageGdService extends MybatisBaseService<GdInstorageGdMapper
@Autowired
private GdInstorageJmdService gdInstorageJmdService;
@Autowired
private ReportSalesDayGatherService reportSalesDayGatherService;
@Autowired
private ReportSalesDayLogService reportSalesDayLogService;
private Source source;
public void sendMsg(String msg) {
System.out.println("customer开始执行");
source.output().send(MessageBuilder.withPayload(msg).build());
}
public PagerVo<GdInstorageGdVo> listPageVo(PagerQuery<GdInstorageGdQuery> pq) {
GdInstorageGdQuery query = pq.getParams();

4
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragelog/GdInstorageLogMapper.java

@ -3,7 +3,11 @@ package com.yxt.supervise.customer.biz.gdinstoragelog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yxt.supervise.customer.api.gdinstoragelog.GdInstorageLog;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface GdInstorageLogMapper extends BaseMapper<GdInstorageLog> {
@Select("select * from gd_instorage_log where orderDate=#{orderDate}")
public GdInstorageLog getLogByOrderDate(@Param("oderDate") String orderDate);
}

5
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinstoragelog/GdInstorageLogService.java

@ -21,4 +21,9 @@ public class GdInstorageLogService extends ServiceImpl<GdInstorageLogMapper, GdI
PagerVo<GdInstorageLog> p = PagerUtil.pageToVo(pagging, null);
return p;
}
public GdInstorageLog getLogByOrderDate(String orderDate) {
//GdRukuQuery query = pq.getParams();
return baseMapper.getLogByOrderDate(orderDate);
}
}

5
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventorylog/GdInventoryLogMapper.java

@ -1,8 +1,11 @@
package com.yxt.supervise.customer.biz.gdinventorylog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yxt.supervise.customer.api.gdinstoragelog.GdInstorageLog;
import com.yxt.supervise.customer.api.gdinventoryok.GdInventoryLog;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
/**
* @author wangpengfei
@ -10,4 +13,6 @@ import org.apache.ibatis.annotations.Mapper;
*/
@Mapper
public interface GdInventoryLogMapper extends BaseMapper<GdInventoryLog> {
@Select("select * from gd_inventory_log where orderDate=#{orderDate}")
public GdInstorageLog getLogByOrderDate(@Param("oderDate") String orderDate);
}

2
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventorylog/GdInventoryLogMapper.xml

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yxt.supervise.customer.biz.gdinstoragegd.GdInstorageGdMapper">
<mapper namespace="com.yxt.supervise.customer.biz.gdinventorylog.GdInventoryLogMapper">
<!-- <where> ${ew.sqlSegment} </where>-->
<!-- ${ew.customSqlSegment} -->
<select id="selectPageVo" resultType="com.yxt.supervise.customer.api.gdinstoragegd.GdInstorageGdVo">

6
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventorylog/GdInventoryLogService.java

@ -1,6 +1,7 @@
package com.yxt.supervise.customer.biz.gdinventorylog;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yxt.supervise.customer.api.gdinstoragelog.GdInstorageLog;
import com.yxt.supervise.customer.api.gdinventoryok.GdInventoryLog;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -13,5 +14,10 @@ import org.springframework.stereotype.Service;
@Service
public class GdInventoryLogService extends ServiceImpl<GdInventoryLogMapper, GdInventoryLog> {
public GdInstorageLog getLogByOrderDate(String orderDate) {
//GdRukuQuery query = pq.getParams();
return baseMapper.getLogByOrderDate(orderDate);
}
}

30
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventoryok/GdInventoryOkRest.java

@ -9,7 +9,11 @@ import com.yxt.supervise.customer.api.gdsales.GdSales;
import com.yxt.supervise.customer.api.gdsales.GdSalesExcelVo;
import com.yxt.supervise.customer.api.gdsales.GdSalesQuery;
import com.yxt.supervise.customer.api.gdsales.GdSalesVo;
import com.yxt.supervise.customer.biz.gdinstoragegd.GdInstorageGdService;
import com.yxt.supervise.customer.biz.gdinstoragelog.GdInstorageLogService;
import com.yxt.supervise.customer.biz.gdinventorylog.GdInventoryLogService;
import com.yxt.supervise.customer.biz.gdinventoryyc.GdInventoryYcService;
import com.yxt.supervise.customer.biz.gdsaleslog.GdSalesLogService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
@ -32,7 +36,19 @@ public class GdInventoryOkRest implements GdInventoryOkFeign {
@Autowired
GdInventoryOkService gdInventoryOkService;
@Autowired
private GdInstorageGdService gdInstorageGdService;
@Autowired
GdInventoryYcService gdInventoryYcService;
@Autowired
GdInstorageLogService instorageLogService;
@Autowired
GdInventoryLogService inventoryLogService;
@Autowired
GdSalesLogService gdSalesLogService;
@PostMapping("/sendMsg")
public void send(@RequestBody String message) {
gdInstorageGdService.sendMsg(message);
}
//@Override
@ApiOperation("根据条件分页查询数据的列表")
@PostMapping("/listPage")
@ -60,7 +76,19 @@ public class GdInventoryOkRest implements GdInventoryOkFeign {
@PostMapping("/uploadGdData")
public ResultBean<GdInventoryLog> uploadGdData(@RequestParam("file") MultipartFile file) {
return gdInventoryOkService.uploadAndResetData(file);
ResultBean rb = ResultBean.fireFail();
GdInventoryLog log=gdInventoryOkService.uploadAndResetData(file);
if(ee(log.getOrderDate())==true){
this.send(log.getOrderDate());
}
return rb.success().setData(log);
}
public Boolean ee(String orderDate){
//判断入库 销售 库存已经是否上传excel
if(instorageLogService.getLogByOrderDate(orderDate).equals(null) && inventoryLogService.getLogByOrderDate(orderDate).equals(null) && gdSalesLogService.getLogByOrderDate(orderDate).equals(null)){
return true;
}
return false;
}
@GetMapping("/kchz")
public ResultBean kchz() {

4
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdinventoryok/GdInventoryOkService.java

@ -135,7 +135,7 @@ public class GdInventoryOkService extends MybatisBaseService<GdInventoryOkMapper
List<GdInventoryOkExcelVo> pagging = baseMapper.exportExcel(qw);
return pagging;
}
public ResultBean<GdInventoryLog> uploadAndResetData(MultipartFile file) {
public GdInventoryLog uploadAndResetData(MultipartFile file) {
ResultBean rb = ResultBean.fireFail();
ResultBean<FileUploadResult> fub = fileUploadComponent.uploadFile(file, "kcxxcx");
@ -151,7 +151,7 @@ public class GdInventoryOkService extends MybatisBaseService<GdInventoryOkMapper
gdlog.setDurations(System.currentTimeMillis() - millis);
gdInventoryLogService.save(gdlog);
return rb.success().setData(gdlog);
return gdlog;
}
private RowHandler createRowHandler(GdInventoryLog gdlog) {
return new RowHandler() {

32
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsales/GdSalesRest.java

@ -11,8 +11,13 @@ import com.yxt.common.base.utils.ExportExcelUtils;
import com.yxt.common.core.query.PagerQuery;
import com.yxt.common.core.result.ResultBean;
import com.yxt.common.core.vo.PagerVo;
import com.yxt.supervise.customer.api.gdinventoryok.GdInventoryLog;
import com.yxt.supervise.customer.api.gdsales.*;
import com.yxt.supervise.customer.api.gdsaleslog.GdSalesLog;
import com.yxt.supervise.customer.biz.gdinstoragegd.GdInstorageGdService;
import com.yxt.supervise.customer.biz.gdinstoragelog.GdInstorageLogService;
import com.yxt.supervise.customer.biz.gdinventorylog.GdInventoryLogService;
import com.yxt.supervise.customer.biz.gdsaleslog.GdSalesLogService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.poi.ss.usermodel.FillPatternType;
@ -42,6 +47,19 @@ public class GdSalesRest implements GdSalesFeign {
@Autowired
private GdSalesService gdSalesService;
@Autowired
private GdInstorageGdService gdInstorageGdService;
@Autowired
GdInstorageLogService instorageLogService;
@Autowired
GdInventoryLogService inventoryLogService;
@Autowired
GdSalesLogService gdSalesLogService;
@PostMapping("/sendMsg")
public void send(@RequestBody String message) {
gdInstorageGdService.sendMsg(message);
}
@Override
@ApiOperation("根据条件分页查询数据的列表")
@ -70,6 +88,18 @@ public class GdSalesRest implements GdSalesFeign {
}
@PostMapping("/uploadXssj")
public ResultBean<GdSalesLog> uploadGdData(@RequestParam("file") MultipartFile file) {
return gdSalesService.uploadAndInsert(file);
ResultBean rb = ResultBean.fireFail();
GdSalesLog log=gdSalesService.uploadAndInsert(file);
if(ee(log.getOrderDate())==true){
this.send(log.getOrderDate());
}
return rb.success().setData(log);
}
public Boolean ee(String orderDate){
//判断入库 销售 库存已经是否上传excel
if(instorageLogService.getLogByOrderDate(orderDate).equals(null) && inventoryLogService.getLogByOrderDate(orderDate).equals(null) && gdSalesLogService.getLogByOrderDate(orderDate).equals(null)){
return true;
}
return false;
}
}

4
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsales/GdSalesService.java

@ -140,7 +140,7 @@ public class GdSalesService extends MybatisBaseService<GdSalesMapper, GdSales> {
List<GdSalesExcelVo> pagging = baseMapper.exportExcel(qw);
return pagging;
}
public ResultBean<GdSalesLog> uploadAndInsert(MultipartFile file) {
public GdSalesLog uploadAndInsert(MultipartFile file) {
ResultBean rb = ResultBean.fireFail();
ResultBean<FileUploadResult> fub = fileUploadComponent.uploadFile(file, "xssj");
@ -153,7 +153,7 @@ public class GdSalesService extends MybatisBaseService<GdSalesMapper, GdSales> {
gdlog.setDurations(System.currentTimeMillis() - millis);
gdSalesLogService.save(gdlog);
return rb.success().setData(gdlog);
return gdlog;
}
public void clearByDataDate(String dataDate) {
baseMapper.clearByDataDate(dataDate);

5
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsaleslog/GdSalesLogMapper.java

@ -26,8 +26,11 @@
package com.yxt.supervise.customer.biz.gdsaleslog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yxt.supervise.customer.api.gdinstoragelog.GdInstorageLog;
import com.yxt.supervise.customer.api.gdsaleslog.GdSalesLog;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
/**
* Project: yxt_supervise <br/>
@ -44,4 +47,6 @@ import org.apache.ibatis.annotations.Mapper;
*/
@Mapper
public interface GdSalesLogMapper extends BaseMapper<GdSalesLog> {
@Select("select * from gd_sales_log where orderDate=#{orderDate}")
public GdInstorageLog getLogByOrderDate(@Param("oderDate") String orderDate);
}

6
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/java/com/yxt/supervise/customer/biz/gdsaleslog/GdSalesLogService.java

@ -32,6 +32,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yxt.common.base.utils.PagerUtil;
import com.yxt.common.core.query.PagerQuery;
import com.yxt.common.core.vo.PagerVo;
import com.yxt.supervise.customer.api.gdinstoragelog.GdInstorageLog;
import com.yxt.supervise.customer.api.gdsales.GdSalesQuery;
import com.yxt.supervise.customer.api.gdsaleslog.GdSalesLog;
import org.springframework.stereotype.Service;
@ -71,4 +72,9 @@ public class GdSalesLogService extends ServiceImpl<GdSalesLogMapper, GdSalesLog>
public GdSalesLog fetchById(String logId) {
return baseMapper.selectById(logId);
}
public GdInstorageLog getLogByOrderDate(String orderDate) {
//GdRukuQuery query = pq.getParams();
return baseMapper.getLogByOrderDate(orderDate);
}
}

11
yxt_supervise/supervise-customer/supervise-customer-biz/src/main/resources/application.yml

@ -1,6 +1,17 @@
spring:
application:
name: supervise-customer
cloud:
stream:
bindings:
output: #通道名称,使用stream默认的通道名称,可以自定义
destination: stream-demo #要写入的消息队列的名称
content-type: application/json #发送或接受什么类型的消息
kafka: #使用kafka作为服务中的消息总线
binder:
zkNodes: localhost:2181 #zookeeper的网络位置,如果是集群,逗号分割
brokers: localhost:9092 #kafka的网络位置
auto-create-topics: true
profiles:
# active: devv
active: test

4
yxt_supervise/supervise-report/supervise-report-biz/pom.xml

@ -76,6 +76,10 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>

24
yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/biz/reportsalesdaylog/ReportSalesDayLogService.java

@ -7,6 +7,10 @@ import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLog;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLogQuery;
import com.yxt.supervise.report.api.reportsalesdaylog.ReportSalesDayLogVo;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import java.util.List;
@ -16,8 +20,28 @@ import java.util.List;
* @date 2023/4/21 11:18
*/
@Service
@EnableBinding(Sink.class)
public class ReportSalesDayLogService extends MybatisBaseService<ReportSalesDayLogMapper, ReportSalesDayLog> {
@StreamListener(Sink.INPUT)
public void recieve(String payload) {
System.out.println("report接收");
new GdSalesThread(payload);
System.out.println(payload);
}
public class GdSalesThread extends Thread{
String orderDate;
public GdSalesThread(String orderDate) {
this.orderDate = orderDate;
}
public void run(){
System.out.println("excel线程开启");
//gdInstorageGdService.uploadAndInsert(null);
System.out.println("excel线程结束");
}
}
public List<ReportSalesDayLogVo> getReportSalesDayLog(PagerQuery<ReportSalesDayLogQuery> pq) {
ReportSalesDayLogQuery query = pq.getParams();
QueryWrapper<ReportSalesDayLog> qw = new QueryWrapper<>();

15
yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageconsumer/MessageConsumerRest.java

@ -0,0 +1,15 @@
package com.yxt.supervise.report.kafka.messageconsumer;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author wangpengfei
* @date 2023/4/26 17:57
*/
@Api("kafka消费者")
@RestController
@RequestMapping("v1/gdinstoragegd")
public class MessageConsumerRest {
}

25
yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageconsumer/MessageConsumerService.java

@ -0,0 +1,25 @@
package com.yxt.supervise.report.kafka.messageconsumer;
import com.yxt.supervise.report.biz.reportsalesdaylog.ReportSalesDayLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.stereotype.Service;
/**
* @author wangpengfei
* @date 2023/4/26 17:59
*/
@Slf4j
@Service
@EnableBinding(Source.class)
public class MessageConsumerService {
@StreamListener(Sink.INPUT)
public void recieve(String payload) {
System.out.println("report接收");
//new ReportSalesDayLogService.GdSalesThread(payload);
System.out.println(payload);
}
}

24
yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageproducer/MessageProducerRest.java

@ -0,0 +1,24 @@
package com.yxt.supervise.report.kafka.messageproducer;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author wangpengfei
* @date 2023/4/26 17:57
*/
@Api("kafka消费者")
@RestController
@RequestMapping("v1/gdinstoragegd")
public class MessageProducerRest {
@Autowired
MessageProducerService messageProducerService;
@PostMapping("/sendMsg")
public void send(@RequestBody String message) {
messageProducerService.sendMsg(message);
}
}

26
yxt_supervise/supervise-report/supervise-report-biz/src/main/java/com/yxt/supervise/report/kafka/messageproducer/MessageProducerService.java

@ -0,0 +1,26 @@
package com.yxt.supervise.report.kafka.messageproducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* @author wangpengfei
* @date 2023/4/26 17:59
*/
@Slf4j
@Service
@EnableBinding(Source.class)
public class MessageProducerService {
@Autowired
private Source source;
public void sendMsg(String msg) {
System.out.println("customer开始执行");
source.output().send(MessageBuilder.withPayload(msg).build());
}
}

13
yxt_supervise/supervise-report/supervise-report-biz/src/main/resources/application.yml

@ -1,6 +1,19 @@
spring:
application:
name: supervise-report
cloud:
stream:
bindings:
input: #通道名称,使用stream默认的通道名称,可以自定义, 接受消息生产者生产的消息
destination: stream-demo #要写入的消息队列的名称
# group: comsumerGroup1 #该属性确保服务只处理一次
output:
destination: stream-demo-trans #转发
kafka: #使用kafka作为服务中的消息总线
binder:
zkNodes: localhost:2181 #zookeeper的网络位置,如果是集群,逗号分割
brokers: localhost:9092 #kafka的网络位置
auto-create-topics: true
profiles:
# active: devv
active: test

Loading…
Cancel
Save