Commit 406ad77120611f9ae6a66b168cdf0c39499c6950

Authored by zhangmeiyang
1 parent 57a36eb8

refactor(core):重构消息处理流程并优化数据映射逻辑

- 移除了 BaseCustomer 中的 currency 字段及相关注释
- 在 CustomerBuilder 中启用 systemDataId 非空校验
- 删除 CustomerProxy 和 CustomerSender 中的 thirdPartyCode 字段
- 调整 DynamicTaxPipelineMappingService 的包引用路径并移除默认状态设置
- 简化 InitializeProcessor 的依赖引入和处理逻辑- 在 MessageContext 中新增 pipelineDataId、systemDataId、tenantPipeline 等字段及枚举类型
- 优化 ProxyProcessor 处理流程,减少冗余代码
- 扩展 TaxPipelineMapping 及其创建对象,增加 errorMessage 字段支持错误记录
- 更新数据库表结构,添加 error_message 字段并调整状态值含义
- 新增 TaxReceiveService 统一处理消息接收、转换与持久化操作
- 增加新的税务系统异常类型 PARAMETER_IS_NOT_PARSED_CORRECTLY
Showing 18 changed files with 158 additions and 98 deletions
tax-boot/src/main/java/com/diligrp/tax/boot/receiver/TaxReceiver.java
1 package com.diligrp.tax.boot.receiver; 1 package com.diligrp.tax.boot.receiver;
2 2
  3 +import com.diligrp.tax.boot.service.TaxReceiveService;
3 import com.diligrp.tax.central.domain.MessageContext; 4 import com.diligrp.tax.central.domain.MessageContext;
4 -import com.diligrp.tax.central.process.ProcessorChain;  
5 -import com.diligrp.tax.central.type.StatusType;  
6 -import com.diligrp.tax.central.type.SystemType;  
7 -import com.diligrp.tax.central.utils.JsonUtils;  
8 -import com.diligrp.tax.storage.service.TenantTaxService;  
9 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Channel;
10 import jakarta.annotation.Resource; 6 import jakarta.annotation.Resource;
11 import lombok.extern.slf4j.Slf4j; 7 import lombok.extern.slf4j.Slf4j;
@@ -15,12 +11,11 @@ import org.springframework.amqp.rabbit.annotation.Queue; @@ -15,12 +11,11 @@ import org.springframework.amqp.rabbit.annotation.Queue;
15 import org.springframework.amqp.rabbit.annotation.QueueBinding; 11 import org.springframework.amqp.rabbit.annotation.QueueBinding;
16 import org.springframework.amqp.rabbit.annotation.RabbitListener; 12 import org.springframework.amqp.rabbit.annotation.RabbitListener;
17 import org.springframework.stereotype.Component; 13 import org.springframework.stereotype.Component;
  14 +import org.springframework.transaction.annotation.Transactional;
18 15
19 import java.io.IOException; 16 import java.io.IOException;
20 -import java.io.PrintWriter;  
21 -import java.io.StringWriter;  
22 import java.nio.charset.StandardCharsets; 17 import java.nio.charset.StandardCharsets;
23 -import java.util.Map; 18 +import java.util.Optional;
24 19
25 import static com.diligrp.tax.boot.queue.TaxAutoPush.*; 20 import static com.diligrp.tax.boot.queue.TaxAutoPush.*;
26 21
@@ -35,10 +30,7 @@ import static com.diligrp.tax.boot.queue.TaxAutoPush.*; @@ -35,10 +30,7 @@ import static com.diligrp.tax.boot.queue.TaxAutoPush.*;
35 public class TaxReceiver { 30 public class TaxReceiver {
36 31
37 @Resource 32 @Resource
38 - private Map<SystemType, ProcessorChain> processorChainMap;  
39 -  
40 - @Resource  
41 - private TenantTaxService tenantTaxService; 33 + private TaxReceiveService taxReceiveService;
42 34
43 @RabbitListener(bindings = 35 @RabbitListener(bindings =
44 @QueueBinding( 36 @QueueBinding(
@@ -47,34 +39,20 @@ public class TaxReceiver { @@ -47,34 +39,20 @@ public class TaxReceiver {
47 key = NORMAL_ROUTING), 39 key = NORMAL_ROUTING),
48 ackMode = "MANUAL" 40 ackMode = "MANUAL"
49 ) 41 )
  42 + @Transactional
50 public void receiveMessage(Channel channel, Message message) throws IOException { 43 public void receiveMessage(Channel channel, Message message) throws IOException {
51 var content = new String(message.getBody(), StandardCharsets.UTF_8); 44 var content = new String(message.getBody(), StandardCharsets.UTF_8);
52 log.info("tax-agent收到消息:{}", content); 45 log.info("tax-agent收到消息:{}", content);
53 - MessageContext ctx = JsonUtils.fromJsonString(content, MessageContext.class); 46 + MessageContext ctx = null;
54 try { 47 try {
55 - handle(ctx); 48 + ctx = taxReceiveService.messageHandle(content);
  49 + taxReceiveService.recordMapping(ctx);
56 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 50 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
57 log.info("tax-agent消息处理成功:{}", content); 51 log.info("tax-agent消息处理成功:{}", content);
58 } catch (Exception e) { 52 } catch (Exception e) {
59 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 53 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
60 - recordError(e, ctx); 54 + Optional.ofNullable(ctx).ifPresentOrElse(c -> taxReceiveService.recordMappingError(e, c), () -> log.error("tax-agent解析错误:", e));
61 log.error("tax-agent消息处理失败:", e); 55 log.error("tax-agent消息处理失败:", e);
62 } 56 }
63 } 57 }
64 -  
65 - private void handle(MessageContext ctx) {  
66 - ctx.setTenantId(tenantTaxService.getTenantId(ctx.getGroup(), ctx.getEntity()));  
67 - MessageContext messageContext = processorChainMap.get(SystemType.from(ctx.getSystemType())).startProcess(ctx);  
68 - messageContext.setStatus(StatusType.SUCCESS.code);  
69 - //TODO write to db  
70 - }  
71 -  
72 - private void recordError(Exception e, MessageContext ctx) {  
73 - StringWriter sw = new StringWriter();  
74 - PrintWriter pw = new PrintWriter(sw);  
75 - e.printStackTrace(pw);  
76 - ctx.setError(sw.toString());  
77 - ctx.setStatus(StatusType.FAIL.code);  
78 - //TODO write error to db  
79 - }  
80 } 58 }
tax-boot/src/main/java/com/diligrp/tax/boot/service/TaxReceiveService.java 0 → 100644
  1 +package com.diligrp.tax.boot.service;
  2 +
  3 +import com.diligrp.tax.central.context.TenantStorageContext;
  4 +import com.diligrp.tax.central.domain.MessageContext;
  5 +import com.diligrp.tax.central.exception.TaxAgentServiceException;
  6 +import com.diligrp.tax.central.model.TaxPipelineMappingCreate;
  7 +import com.diligrp.tax.central.model.TenantPipeline;
  8 +import com.diligrp.tax.central.process.ProcessorChain;
  9 +import com.diligrp.tax.central.service.ITaxPipelineMappingService;
  10 +import com.diligrp.tax.central.service.ITenantTaxService;
  11 +import com.diligrp.tax.central.type.DocumentType;
  12 +import com.diligrp.tax.central.type.MappingStateType;
  13 +import com.diligrp.tax.central.type.SystemType;
  14 +import com.diligrp.tax.central.type.TaxSystemType;
  15 +import com.diligrp.tax.central.utils.JsonUtils;
  16 +import jakarta.annotation.Resource;
  17 +import org.springframework.stereotype.Service;
  18 +import org.springframework.transaction.annotation.Transactional;
  19 +
  20 +import java.io.PrintWriter;
  21 +import java.io.StringWriter;
  22 +import java.util.Map;
  23 +import java.util.Optional;
  24 +
  25 +/**
  26 + * @Author: zhangmeiyang
  27 + * @CreateTime: 2025-11-07 17:22
  28 + * @Version: todo
  29 + */
  30 +@Service
  31 +public class TaxReceiveService {
  32 +
  33 + @Resource
  34 + private Map<SystemType, ProcessorChain> processorChainMap;
  35 +
  36 + @Resource
  37 + private ITenantTaxService tenantTaxService;
  38 +
  39 + @Resource
  40 + private ITaxPipelineMappingService taxPipelineMappingService;
  41 +
  42 +
  43 + public MessageContext messageHandle(String content) {
  44 + MessageContext ctx = JsonUtils.fromJsonString(content, MessageContext.class);
  45 + DocumentType from = DocumentType.from(ctx.getDocumentType());
  46 + SystemType system = SystemType.from(ctx.getSystemType());
  47 + Optional.of(system).filter(s -> s.documentTypes.contains(from)).orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.BUSINESS_MATCHES_ARE_INCORRECT));
  48 + // 校验业务类型和系统类型
  49 + ctx.setDocumentTypeEnum(from);
  50 + ctx.setSystemTypeEnum(system);
  51 + Optional<TenantPipeline> option = TenantStorageContext.getTenantPipeline(ctx.getTenantId(), system, ctx.getPipelineCode());
  52 + TenantPipeline pipeline = option.orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.NO_MATCHING_SET_OF_ACCOUNTS_FOUND));
  53 + //获取租户账套
  54 + ctx.setTenantPipeline(pipeline);
  55 + //获取租户id
  56 + ctx.setTenantId(tenantTaxService.getTenantId(ctx.getGroup(), ctx.getEntity()));
  57 + return processorChainMap.get(SystemType.from(ctx.getSystemType())).startProcess(ctx);
  58 + }
  59 +
  60 + @Transactional
  61 + public void recordMapping(MessageContext messageContext) {
  62 + TaxPipelineMappingCreate create = new TaxPipelineMappingCreate();
  63 + create.setTenantId(messageContext.getTenantId());
  64 + create.setPipelineId(messageContext.getTenantPipeline().getId());
  65 + create.setDocumentType(messageContext.getDocumentType());
  66 + create.setSystemDataId(messageContext.getSystemDataId());
  67 + create.setState(MappingStateType.SYNCED.value);
  68 + create.setOriginData(messageContext.getMsgBody());
  69 + taxPipelineMappingService.insert(create);
  70 + }
  71 +
  72 + @Transactional
  73 + public void recordMappingError(Exception e, MessageContext ctx) {
  74 + StringWriter sw = new StringWriter();
  75 + PrintWriter pw = new PrintWriter(sw);
  76 + e.printStackTrace(pw);
  77 + TaxPipelineMappingCreate create = new TaxPipelineMappingCreate();
  78 + create.setTenantId(ctx.getTenantId());
  79 + create.setPipelineId(ctx.getTenantPipeline().getId());
  80 + create.setDocumentType(ctx.getDocumentType());
  81 + create.setSystemDataId(ctx.getSystemDataId());
  82 + create.setPipelineDataId(ctx.getPipelineDataId());
  83 + create.setOriginData(ctx.getMsgBody());
  84 + create.setState(MappingStateType.SYNC_FAILED.value);
  85 + create.setErrorMessage(sw.toString());
  86 + taxPipelineMappingService.insert(create);
  87 + }
  88 +
  89 +}
tax-central/src/main/java/com/diligrp/tax/central/domain/MessageContext.java
1 package com.diligrp.tax.central.domain; 1 package com.diligrp.tax.central.domain;
2 2
  3 +import com.diligrp.tax.central.model.TenantPipeline;
  4 +import com.diligrp.tax.central.type.DocumentType;
  5 +import com.diligrp.tax.central.type.SystemType;
3 import lombok.Getter; 6 import lombok.Getter;
4 import lombok.Setter; 7 import lombok.Setter;
5 8
@@ -7,9 +10,18 @@ import lombok.Setter; @@ -7,9 +10,18 @@ import lombok.Setter;
7 @Getter 10 @Getter
8 public class MessageContext { 11 public class MessageContext {
9 /** 12 /**
10 - * 开放 ID 13 + * 账套数据写入id
11 */ 14 */
12 - private String thirdPartyCode; 15 + private String pipelineDataId;
  16 +
  17 + /**
  18 + * 账套数据对接第三方系统id
  19 + */
  20 + private String systemDataId;
  21 + /**
  22 + * 租户管道
  23 + */
  24 + private TenantPipeline tenantPipeline;
13 /** 25 /**
14 * 分组 26 * 分组
15 */ 27 */
@@ -51,11 +63,12 @@ public class MessageContext { @@ -51,11 +63,12 @@ public class MessageContext {
51 */ 63 */
52 private String documentType; 64 private String documentType;
53 /** 65 /**
54 - * 错误 66 + * 文档类型枚举
55 */ 67 */
56 - private String error; 68 + private DocumentType documentTypeEnum;
  69 +
57 /** 70 /**
58 - * 成功标志 71 + * 系统类型枚举
59 */ 72 */
60 - private Integer status; 73 + private SystemType systemTypeEnum;
61 } 74 }
tax-central/src/main/java/com/diligrp/tax/central/domain/document/kingdee/BaseCustomer.java
@@ -18,11 +18,6 @@ public abstract class BaseCustomer extends BaseDocument { @@ -18,11 +18,6 @@ public abstract class BaseCustomer extends BaseDocument {
18 * 系统数据 ID 18 * 系统数据 ID
19 */ 19 */
20 private String systemDataId; 20 private String systemDataId;
21 -// /**  
22 -// * 结算货币(FTRADINGCURRID)  
23 -// */  
24 -// @Converter(value = CustomerCurrencyConverter.class, targetField = "FTRADINGCURRID")  
25 -// protected String currency;  
26 /** 21 /**
27 * 客户 ID(FCUSTID) 22 * 客户 ID(FCUSTID)
28 */ 23 */
tax-central/src/main/java/com/diligrp/tax/central/domain/proxy/kingdee/CustomerProxy.java
@@ -13,5 +13,4 @@ import lombok.Setter; @@ -13,5 +13,4 @@ import lombok.Setter;
13 @Setter 13 @Setter
14 public class CustomerProxy extends BaseProxy { 14 public class CustomerProxy extends BaseProxy {
15 private String thirdPartyId; 15 private String thirdPartyId;
16 - private String thirdPartyCode;  
17 } 16 }
tax-central/src/main/java/com/diligrp/tax/central/model/TaxPipelineMappingCreate.java
@@ -50,4 +50,9 @@ public class TaxPipelineMappingCreate { @@ -50,4 +50,9 @@ public class TaxPipelineMappingCreate {
50 */ 50 */
51 @NotNull(message = "同步状态不能为空") 51 @NotNull(message = "同步状态不能为空")
52 private Integer state; 52 private Integer state;
  53 +
  54 + /**
  55 + * 错误信息
  56 + */
  57 + private String errorMessage;
53 } 58 }
tax-central/src/main/java/com/diligrp/tax/central/model/TenantTaxPipelineMapping.java
@@ -33,6 +33,10 @@ public class TenantTaxPipelineMapping { @@ -33,6 +33,10 @@ public class TenantTaxPipelineMapping {
33 */ 33 */
34 private String originData; 34 private String originData;
35 /** 35 /**
  36 + * 错误信息
  37 + */
  38 + private String errorMessage;
  39 + /**
36 * 州 40 * 州
37 */ 41 */
38 private Integer state; 42 private Integer state;
tax-central/src/main/java/com/diligrp/tax/central/service/ITaxPipelineMappingService.java
1 package com.diligrp.tax.central.service; 1 package com.diligrp.tax.central.service;
2 2
  3 +import com.diligrp.tax.central.domain.MessageContext;
3 import com.diligrp.tax.central.model.TaxPipelineMappingCreate; 4 import com.diligrp.tax.central.model.TaxPipelineMappingCreate;
4 import com.diligrp.tax.central.model.TenantTaxPipelineMapping; 5 import com.diligrp.tax.central.model.TenantTaxPipelineMapping;
5 6
tax-storage/src/main/java/com/diligrp/tax/storage/type/MappingStateType.java renamed to tax-central/src/main/java/com/diligrp/tax/central/type/MappingStateType.java
1 -package com.diligrp.tax.storage.type; 1 +package com.diligrp.tax.central.type;
2 2
3 public enum MappingStateType { 3 public enum MappingStateType {
4 SYNCED(0, "已同步"), 4 SYNCED(0, "已同步"),
tax-central/src/main/java/com/diligrp/tax/central/type/TaxSystemType.java
@@ -18,6 +18,7 @@ public enum TaxSystemType { @@ -18,6 +18,7 @@ public enum TaxSystemType {
18 INVALID_HTTP_REQUEST_PARAMS(5007, "无效的http请求参数"), 18 INVALID_HTTP_REQUEST_PARAMS(5007, "无效的http请求参数"),
19 NO_MATCHING_SET_OF_ACCOUNTS_FOUND(5008, "未找到匹配账套"), 19 NO_MATCHING_SET_OF_ACCOUNTS_FOUND(5008, "未找到匹配账套"),
20 NO_TENANT_INFORMATION_FOUND(5009, "未找到匹配账套"), 20 NO_TENANT_INFORMATION_FOUND(5009, "未找到匹配账套"),
  21 + PARAMETER_IS_NOT_PARSED_CORRECTLY(5010, "参数解析不正确"),
21 ; 22 ;
22 public final int code; 23 public final int code;
23 public final String message; 24 public final String message;
tax-doc/src/main/java/com/diligrp/tax/doc/demarcate/kingdee/CustomerBuilder.java
@@ -38,7 +38,7 @@ public class CustomerBuilder extends Builder&lt;StandardCustomer&gt; { @@ -38,7 +38,7 @@ public class CustomerBuilder extends Builder&lt;StandardCustomer&gt; {
38 public StandardCustomer build(String body, TenantPipeline pipeline) { 38 public StandardCustomer build(String body, TenantPipeline pipeline) {
39 StandardCustomer customer = JsonUtils.fromJsonString(body, StandardCustomer.class); 39 StandardCustomer customer = JsonUtils.fromJsonString(body, StandardCustomer.class);
40 Optional.ofNullable(customer.getContacts()).orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.MISSING_BUSINESS_INFORMATION, "请完善联系人信息")); 40 Optional.ofNullable(customer.getContacts()).orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.MISSING_BUSINESS_INFORMATION, "请完善联系人信息"));
41 -// Optional.ofNullable(customer.getSystemDataId()).orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.MISSING_BUSINESS_INFORMATION, "请填写系统数据ID")); 41 + Optional.ofNullable(customer.getSystemDataId()).orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.MISSING_BUSINESS_INFORMATION, "请填写系统数据ID"));
42 List<TenantPipelineConfig> list = pipeline.getTenantPipelineConfigs().get(markDocument()); 42 List<TenantPipelineConfig> list = pipeline.getTenantPipelineConfigs().get(markDocument());
43 Optional.ofNullable(list).ifPresent(ts -> configureDefaultData(customer, ts)); 43 Optional.ofNullable(list).ifPresent(ts -> configureDefaultData(customer, ts));
44 //TODO 查询数据库的客户信息 如果存在 赋值id 44 //TODO 查询数据库的客户信息 如果存在 赋值id
tax-doc/src/main/java/com/diligrp/tax/doc/process/kingdee/InitializeProcessor.java
1 package com.diligrp.tax.doc.process.kingdee; 1 package com.diligrp.tax.doc.process.kingdee;
2 2
3 -import com.diligrp.tax.central.context.TenantStorageContext;  
4 import com.diligrp.tax.central.domain.MessageContext; 3 import com.diligrp.tax.central.domain.MessageContext;
5 -import com.diligrp.tax.central.exception.TaxAgentServiceException;  
6 -import com.diligrp.tax.central.model.TenantPipeline;  
7 import com.diligrp.tax.central.process.AbstractProcessor; 4 import com.diligrp.tax.central.process.AbstractProcessor;
8 -import com.diligrp.tax.central.type.DocumentType;  
9 -import com.diligrp.tax.central.type.SystemType;  
10 -import com.diligrp.tax.central.type.TaxSystemType;  
11 import com.diligrp.tax.doc.context.DocumentContext; 5 import com.diligrp.tax.doc.context.DocumentContext;
12 import com.diligrp.tax.doc.demarcate.Builder; 6 import com.diligrp.tax.doc.demarcate.Builder;
13 import lombok.extern.slf4j.Slf4j; 7 import lombok.extern.slf4j.Slf4j;
14 import org.springframework.core.annotation.Order; 8 import org.springframework.core.annotation.Order;
15 import org.springframework.stereotype.Component; 9 import org.springframework.stereotype.Component;
16 10
17 -import java.util.Optional;  
18 -  
19 11
20 /** 12 /**
21 * 初始化处理器 13 * 初始化处理器
@@ -30,15 +22,8 @@ public class InitializeProcessor extends AbstractProcessor { @@ -30,15 +22,8 @@ public class InitializeProcessor extends AbstractProcessor {
30 22
31 @Override 23 @Override
32 public MessageContext process(MessageContext messageContext) { 24 public MessageContext process(MessageContext messageContext) {
33 - DocumentType from = DocumentType.from(messageContext.getDocumentType());  
34 - SystemType system = SystemType.from(messageContext.getSystemType());  
35 - if (!system.documentTypes.contains(from)) {  
36 - throw new TaxAgentServiceException(TaxSystemType.BUSINESS_MATCHES_ARE_INCORRECT);  
37 - }  
38 - Optional<TenantPipeline> option = TenantStorageContext.getTenantPipeline(messageContext.getTenantId(), system, messageContext.getPipelineCode());  
39 - TenantPipeline pipeline = option.orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.NO_MATCHING_SET_OF_ACCOUNTS_FOUND));  
40 - Builder<?> builder = DocumentContext.CONTEXT.get(from);  
41 - messageContext.setDocumentObject(builder.build(messageContext.getMsgBody(),pipeline)); 25 + Builder<?> builder = DocumentContext.CONTEXT.get(messageContext.getDocumentTypeEnum());
  26 + messageContext.setDocumentObject(builder.build(messageContext.getMsgBody(), messageContext.getTenantPipeline()));
42 return messageContext; 27 return messageContext;
43 } 28 }
44 } 29 }
tax-map/src/main/java/com/diligrp/tax/mapping/process/kingdee/MappingProcessor.java
@@ -28,12 +28,7 @@ public class MappingProcessor extends AbstractProcessor { @@ -28,12 +28,7 @@ public class MappingProcessor extends AbstractProcessor {
28 28
29 @Override 29 @Override
30 public MessageContext process(MessageContext messageContext) { 30 public MessageContext process(MessageContext messageContext) {
31 - DocumentType from = DocumentType.from(messageContext.getDocumentType());  
32 - SystemType system = SystemType.from(messageContext.getSystemType());  
33 - if (!system.documentTypes.contains(from)) {  
34 - throw new TaxAgentServiceException(TaxSystemType.BUSINESS_MATCHES_ARE_INCORRECT);  
35 - }  
36 - Transformer<?> transformer = MappingContext.CONTEXT.get(from); 31 + Transformer<?> transformer = MappingContext.CONTEXT.get(messageContext.getDocumentTypeEnum());
37 BaseMapping transform = transformer.transform(messageContext.getDocumentObject()); 32 BaseMapping transform = transformer.transform(messageContext.getDocumentObject());
38 messageContext.setMappingObject(transform); 33 messageContext.setMappingObject(transform);
39 Map<String,Object> map = JsonUtils.convertValue(transform, new TypeReference<>() {}); 34 Map<String,Object> map = JsonUtils.convertValue(transform, new TypeReference<>() {});
tax-proxy/src/main/java/com/diligrp/tax/proxy/demarcate/kingdee/CustomerSender.java
@@ -50,7 +50,6 @@ public class CustomerSender extends Sender&lt;CustomerProxy&gt; { @@ -50,7 +50,6 @@ public class CustomerSender extends Sender&lt;CustomerProxy&gt; {
50 SuccessEntity first = successEntity.getFirst(); 50 SuccessEntity first = successEntity.getFirst();
51 var proxy = new CustomerProxy(); 51 var proxy = new CustomerProxy();
52 proxy.setThirdPartyId(first.getId()); 52 proxy.setThirdPartyId(first.getId());
53 - proxy.setThirdPartyCode(first.getNumber());  
54 return proxy; 53 return proxy;
55 } 54 }
56 55
tax-proxy/src/main/java/com/diligrp/tax/proxy/process/kingdee/ProxyProcessor.java
1 package com.diligrp.tax.proxy.process.kingdee; 1 package com.diligrp.tax.proxy.process.kingdee;
2 2
3 import com.diligrp.tax.central.context.ConnectionContext; 3 import com.diligrp.tax.central.context.ConnectionContext;
4 -import com.diligrp.tax.central.context.TenantStorageContext;  
5 import com.diligrp.tax.central.domain.BaseProxy; 4 import com.diligrp.tax.central.domain.BaseProxy;
6 import com.diligrp.tax.central.domain.MessageContext; 5 import com.diligrp.tax.central.domain.MessageContext;
7 -import com.diligrp.tax.central.exception.TaxAgentServiceException;  
8 import com.diligrp.tax.central.manager.AbstractConnectionManager; 6 import com.diligrp.tax.central.manager.AbstractConnectionManager;
9 -import com.diligrp.tax.central.model.TenantPipeline;  
10 import com.diligrp.tax.central.process.AbstractProcessor; 7 import com.diligrp.tax.central.process.AbstractProcessor;
11 -import com.diligrp.tax.central.type.DocumentType;  
12 -import com.diligrp.tax.central.type.SystemType;  
13 -import com.diligrp.tax.central.type.TaxSystemType;  
14 import com.diligrp.tax.proxy.context.ProxyContext; 8 import com.diligrp.tax.proxy.context.ProxyContext;
15 import com.kingdee.bos.webapi.entity.IdentifyInfo; 9 import com.kingdee.bos.webapi.entity.IdentifyInfo;
16 -import com.kingdee.bos.webapi.sdk.K3CloudApi;  
17 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
18 import org.springframework.core.annotation.Order; 11 import org.springframework.core.annotation.Order;
19 import org.springframework.stereotype.Component; 12 import org.springframework.stereotype.Component;
20 13
21 -import java.util.Optional;  
22 -  
23 /** 14 /**
24 * 代理发送处理器 15 * 代理发送处理器
25 * 将Map<String, Object>发送到目标系统 16 * 将Map<String, Object>发送到目标系统
@@ -31,13 +22,9 @@ public class ProxyProcessor extends AbstractProcessor { @@ -31,13 +22,9 @@ public class ProxyProcessor extends AbstractProcessor {
31 22
32 @Override 23 @Override
33 public MessageContext process(MessageContext messageContext) { 24 public MessageContext process(MessageContext messageContext) {
34 - DocumentType from = DocumentType.from(messageContext.getDocumentType());  
35 - SystemType systemType = SystemType.from(messageContext.getSystemType());  
36 - Optional<TenantPipeline> option = TenantStorageContext.getTenantPipeline(messageContext.getTenantId(), systemType, messageContext.getPipelineCode());  
37 - TenantPipeline pipeline = option.orElseThrow(() -> new TaxAgentServiceException(TaxSystemType.NO_MATCHING_SET_OF_ACCOUNTS_FOUND));  
38 - AbstractConnectionManager<?> abstractConnectionManager = ConnectionContext.CONNECTION_MAP.get(systemType);  
39 - IdentifyInfo identifyInfo = (IdentifyInfo) abstractConnectionManager.getConnection(pipeline);  
40 - BaseProxy send = ProxyContext.CONTEXT.get(from).send(messageContext.getMappingObject(), identifyInfo); 25 + AbstractConnectionManager<?> abstractConnectionManager = ConnectionContext.CONNECTION_MAP.get(messageContext.getSystemTypeEnum());
  26 + IdentifyInfo identifyInfo = (IdentifyInfo) abstractConnectionManager.getConnection(messageContext.getTenantPipeline());
  27 + BaseProxy send = ProxyContext.CONTEXT.get(messageContext.getDocumentTypeEnum()).send(messageContext.getMappingObject(), identifyInfo);
41 messageContext.setProxyObject(send); 28 messageContext.setProxyObject(send);
42 return messageContext; 29 return messageContext;
43 } 30 }
tax-storage/src/main/java/com/diligrp/tax/storage/domain/TaxPipelineMapping.java
@@ -50,6 +50,11 @@ public class TaxPipelineMapping { @@ -50,6 +50,11 @@ public class TaxPipelineMapping {
50 private Integer state; 50 private Integer state;
51 51
52 /** 52 /**
  53 + * 错误信息
  54 + */
  55 + private String errorMessage;
  56 +
  57 + /**
53 * 创建时间 58 * 创建时间
54 */ 59 */
55 private LocalDateTime createdTime; 60 private LocalDateTime createdTime;
tax-storage/src/main/java/com/diligrp/tax/storage/service/DynamicTaxPipelineMappingService.java
@@ -11,7 +11,7 @@ import com.diligrp.tax.central.utils.JsonUtils; @@ -11,7 +11,7 @@ import com.diligrp.tax.central.utils.JsonUtils;
11 import com.diligrp.tax.storage.domain.TaxPipelineMapping; 11 import com.diligrp.tax.storage.domain.TaxPipelineMapping;
12 import com.diligrp.tax.storage.model.co.TaxPipelineMappingCO; 12 import com.diligrp.tax.storage.model.co.TaxPipelineMappingCO;
13 import com.diligrp.tax.storage.repo.TaxPipelineMappingRepository; 13 import com.diligrp.tax.storage.repo.TaxPipelineMappingRepository;
14 -import com.diligrp.tax.storage.type.MappingStateType; 14 +import com.diligrp.tax.central.type.MappingStateType;
15 import jakarta.annotation.Resource; 15 import jakarta.annotation.Resource;
16 import org.springframework.stereotype.Component; 16 import org.springframework.stereotype.Component;
17 import org.springframework.transaction.annotation.Transactional; 17 import org.springframework.transaction.annotation.Transactional;
@@ -86,7 +86,6 @@ public class DynamicTaxPipelineMappingService implements ITaxPipelineMappingServ @@ -86,7 +86,6 @@ public class DynamicTaxPipelineMappingService implements ITaxPipelineMappingServ
86 public void insert(@Validated TaxPipelineMappingCreate taxPipelineMappingCreate) { 86 public void insert(@Validated TaxPipelineMappingCreate taxPipelineMappingCreate) {
87 DocumentType.validateDocumentType(taxPipelineMappingCreate.getDocumentType()); 87 DocumentType.validateDocumentType(taxPipelineMappingCreate.getDocumentType());
88 TaxPipelineMapping taxPipelineMapping = JsonUtils.convertValue(taxPipelineMappingCreate, TaxPipelineMapping.class); 88 TaxPipelineMapping taxPipelineMapping = JsonUtils.convertValue(taxPipelineMappingCreate, TaxPipelineMapping.class);
89 - taxPipelineMapping.setState(MappingStateType.SYNCED.value);  
90 taxPipelineMappingRepository.insert(taxPipelineMapping); 89 taxPipelineMappingRepository.insert(taxPipelineMapping);
91 } 90 }
92 } 91 }
tax-storage/src/main/resources/com/diligrp/tax/storage/repo/TaxPipelineMappingRepository.xml
@@ -8,14 +8,16 @@ @@ -8,14 +8,16 @@
8 , system_data_id 8 , system_data_id
9 , pipeline_data_id 9 , pipeline_data_id
10 , origin_data 10 , origin_data
11 - , state) 11 + , state
  12 + , error_message)
12 VALUES 13 VALUES
13 ( #{pipelineId} 14 ( #{pipelineId}
14 , #{documentType} 15 , #{documentType}
15 , #{systemDataId} 16 , #{systemDataId}
16 , #{pipelineDataId} 17 , #{pipelineDataId}
17 , #{originData} 18 , #{originData}
18 - , #{state}) 19 + , #{state}
  20 + , #{errorMessage})
19 </insert> 21 </insert>
20 <update id="update"> 22 <update id="update">
21 UPDATE 23 UPDATE
@@ -34,6 +36,7 @@ @@ -34,6 +36,7 @@
34 , pipeline_data_id 36 , pipeline_data_id
35 , origin_data 37 , origin_data
36 , state 38 , state
  39 + , error_message
37 , created_Time 40 , created_Time
38 , modified_Time 41 , modified_Time
39 FROM 42 FROM
@@ -55,6 +58,7 @@ @@ -55,6 +58,7 @@
55 , pipeline_data_id 58 , pipeline_data_id
56 , origin_data 59 , origin_data
57 , state 60 , state
  61 + , error_message
58 , created_Time 62 , created_Time
59 , modified_Time 63 , modified_Time
60 FROM 64 FROM
@@ -70,15 +74,16 @@ @@ -70,15 +74,16 @@
70 <update id="createTenantMappingTable"> 74 <update id="createTenantMappingTable">
71 CREATE TABLE ${tableName} 75 CREATE TABLE ${tableName}
72 ( 76 (
73 - `id` bigint unsigned NOT NULL AUTO_INCREMENT,  
74 - `pipeline_id` bigint unsigned NOT NULL COMMENT '账套ID',  
75 - `document_type` varchar(40) NOT NULL COMMENT '业务单据类型',  
76 - `system_data_id` varchar(50) NOT NULL COMMENT '系统数据ID',  
77 - `pipeline_data_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '账套数据ID',  
78 - `origin_data` json NOT NULL COMMENT '原始数据',  
79 - `state` tinyint NOT NULL COMMENT '状态 1=同步成功,2=同步失败',  
80 - `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  
81 - `modified_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', 77 + `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  78 + `pipeline_id` bigint unsigned NOT NULL COMMENT '账套ID',
  79 + `document_type` varchar(40) NOT NULL COMMENT '业务单据类型',
  80 + `system_data_id` varchar(50) NOT NULL COMMENT '系统数据ID',
  81 + `pipeline_data_id` varchar(50) NOT NULL COMMENT '账套数据ID',
  82 + `origin_data` json NOT NULL COMMENT '原始数据',
  83 + `error_message` text NULL COMMENT '错误信息',
  84 + `state` tinyint NOT NULL COMMENT '状态 0=已同步,1=同步失败,2=同步重试',
  85 + `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  86 + `modified_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
82 PRIMARY KEY (`id`) USING BTREE, 87 PRIMARY KEY (`id`) USING BTREE,
83 UNIQUE KEY `uk_pipeline_id_document_type_system_data_id` (`pipeline_id`, `document_type`, `system_data_id`) USING BTREE, 88 UNIQUE KEY `uk_pipeline_id_document_type_system_data_id` (`pipeline_id`, `document_type`, `system_data_id`) USING BTREE,
84 KEY `idx_pipeline_id_state` (`pipeline_id`, `state`) USING BTREE 89 KEY `idx_pipeline_id_state` (`pipeline_id`, `state`) USING BTREE