首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

规则引擎设计实战-从零构建分布式规则引擎系统

  • 25-04-23 18:18
  • 2364
  • 13333
juejin.cn

一、需求深度解析(需求分析篇)


1. 业务场景拆解

业务流程图(Mermaid流程图)

提交节点元数据
同步调用
异步调用
获取最终结果
条件满足
条件不满足
超时/重试
应用注册节点
规则引擎
执行流程
调用节点Notify URL
调用异步Monitor URL
等待返回结果
启动监控线程轮询
更新流程上下文
检查下游连线条件
触发下一节点
挂起当前流程
流程结束回调

2. 核心功能需求清单

模块功能点
节点管理1. 节点注册接口(REST API)
2. 节点元数据存储(名称/URL/参数定义)
流程编排1. 可视化流程配置
2. 条件表达式动态解析(支持上下文变量)
执行引擎1. 同步/异步执行器
2. 流程状态机管理
3. 上下文参数传递
异步监控1. 轮询线程池管理
2. 回调结果处理
3. 超时熔断机制
生命周期管理1. 流程实例持久化
2. 节点重试策略
3. 强制终止机制

3. 非功能需求分析

维度具体要求
性能单流程实例延迟 ≤500ms(同步)
支持1000+并行流程实例
扩展性可插拔的表达式引擎(支持Groovy/SpEL等)
分布式部署能力
可靠性节点调用失败自动重试(3次策略)
流程状态自动持久化(Checkpoint机制)
安全性节点URL白名单校验
表达式执行沙箱隔离

4. 关键问题抽象

增强版类图设计

source
1
0..*
contains
1
1..*
contains
1
1..*
executes
Node
+String nodeId
+String name
+URI notifyUrl
+URI asyncMonitorUrl
+Map> inputParams
+Map> outputParams
+boolean allowRetry
+validate() : boolean
Edge
+String edgeId
+String conditionExpression
+Node sourceNode
+Node targetNode
+evaluate(Context context) : boolean
Workflow
+String workflowId
+List nodes
+List edges
+addNode(Node node)
+connect(Node source, Node target, String condition)
WorkflowInstance
+String instanceId
+Workflow workflow
+Map context
+State currentState
+persistState()

类职责说明

  • Node:定义执行单元元数据,包含同步/异步调用方式、输入输出参数约束
  • Edge:封装流转逻辑,通过表达式引擎(如Spring EL)动态计算条件
  • Workflow:静态流程模板,维护节点与连线的拓扑关系
  • WorkflowInstance:动态流程实例,保存运行时上下文和状态

5. 典型问题场景

  1. 异步回调丢失问题

    • 设计异步任务ID生成规则(雪花算法)
    • 采用Redis存储任务状态+超时补偿机制
  2. 上下文污染问题

    • 使用ThreadLocal隔离流程实例上下文
    • 深拷贝技术保证参数传递安全
  3. 死循环检测

    java
    代码解读
    复制代码
    // 环路检测算法示例 public class CycleDetector { public static boolean hasCycle(Workflow workflow) { Map visited = new HashMap<>(); for (Node node : workflow.getNodes()) { if (detectCycle(node, visited)) return true; } return false; } private static boolean detectCycle(Node node, Map stack) { if (stack.containsKey(node)) return stack.get(node); if (visited.contains(node)) return false; stack.put(node, true); for (Edge edge : node.getOutEdges()) { if (detectCycle(edge.getTarget(), stack)) return true; } stack.put(node, false); return false; } }

二、架构设计方法论(架构设计篇)


1. 分层架构设计详解

接入层
核心引擎层
执行层
存储层
监控层
1.1 接入层设计

核心功能:

  • 节点注册API(支持JSON/YAML配置)
  • 流程配置可视化接口
  • 权限校验与流量控制

技术实现:

java
代码解读
复制代码
// Spring Boot 节点注册示例 @RestController @RequestMapping("/api/nodes") public class NodeController { @PostMapping public ResponseEntity registerNode(@Valid @RequestBody NodeConfig config) { // 1. 校验URL白名单 // 2. 持久化节点元数据 // 3. 返回注册成功响应 } }
1.2 核心引擎层设计

核心组件:

WorkflowScheduler
+submit(WorkflowInstance instance)
+pause(String instanceId)
+resume(String instanceId)
StateMachine
+transition(Event event)
+getCurrentState()
ContextManager
+createContext()
+saveContext()
+recoverContext()
1.3 执行层设计

双模式执行器架构:

是
否
执行请求
同步?
同步执行器
异步执行器
直接返回结果
提交任务到线程池
生成监控任务ID
1.4 存储层设计

多级存储策略:

存储类型技术方案数据示例
元数据存储MySQL + MyBatis节点配置/流程模板
运行时状态Redis(Hash结构)WorkflowInstance JSON
日志存储Elasticsearch + Logstash执行日志/错误追踪
1.5 监控层设计

异步监控三阶段模型:

引擎监控服务应用系统注册监控任务(TaskID,CallbackURL)定时轮询MonitorURL返回处理状态状态更新通知触发后续节点引擎监控服务应用系统

2. 技术选型深度解析

基础框架
Spring Boot 2.7
异步调度
Netty 4.1
Redis Stream
状态存储
Redis RDB+AOF
MySQL 8.0
表达式引擎
Spring EL
监控
Prometheus
部署
Docker+K8s
2.1 关键技术对比
技术点方案选择优势替代方案
异步通信Netty高吞吐量(10W+ QPS)RocketMQ
表达式解析Spring EL与Spring生态无缝集成Groovy
状态存储Redis+MySQL热数据内存加速+冷数据持久化MongoDB
定时调度Redis Stream分布式场景下的可靠消息队列RabbitMQ Delayed
2.2 关键技术实现示例

Netty异步调用处理器:

java
代码解读
复制代码
public class AsyncRequestHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) { // 1. 解析请求参数 // 2. 提交到业务线程池 // 3. 返回202 Accepted响应 ctx.writeAndFlush(new DefaultFullHttpResponse( HTTP_1_1, HttpResponseStatus.ACCEPTED )); } }

Spring EL表达式解析:

java
代码解读
复制代码
public class ConditionEvaluator { private final ExpressionParser parser = new SpelExpressionParser(); public boolean evaluate(String expr, EvaluationContext context) { return parser.parseExpression(expr) .getValue(context, Boolean.class); } }

3. 扩展性设计

插件化架构设计:

«interface»
Plugin
+init()
+execute(Context ctx)
+destroy()
EmailPlugin
+sendNotification()
LogPlugin
+auditLogging()

4. 性能优化要点

  1. 线程池隔离:

    java
    代码解读
    复制代码
    // 不同业务使用独立线程池 ThreadPoolTaskExecutor asyncExecutor = new ThreadPoolTaskExecutor(); asyncExecutor.setCorePoolSize(20); asyncExecutor.setQueueCapacity(100); asyncExecutor.setThreadNamePrefix("Async-");
  2. 上下文缓存:

    java
    代码解读
    复制代码
    @Component public class ContextCache { private final LoadingCache cache = CacheBuilder.newBuilder() .maximumSize(1000) .expireAfterAccess(10, TimeUnit.MINUTES) .build(new CacheLoader<>() { public WorkflowContext load(String key) { return redisTemplate.opsForValue().get(key); } }); }

三、核心模块详细设计(详细设计篇)


1. 节点注册模块深度设计

1.1 接口规范设计

REST API 设计规范:

java
代码解读
复制代码
// 节点注册接口示例(Spring Boot实现) @PostMapping("/nodes") @Operation(summary = "注册新节点") public ResponseEntity registerNode( @io.swagger.v3.oas.annotations.parameters.RequestBody( description = "节点配置信息", required = true, content = @Content(schema = @Schema(implementation = NodeConfig.class)) @Valid @RequestBody NodeConfig config) { // 参数校验增强版 if (nodeService.exists(config.getNodeId())) { throw new BusinessException("节点ID已存在"); } validateUrlWhitelist(config.getNotifyUrl()); // 存储逻辑 NodeEntity entity = nodeConverter.toEntity(config); nodeRepository.save(entity); return ResponseEntity.ok(ApiResponse.success("注册成功")); } // URL白名单校验实现 private void validateUrlWhitelist(String url) { List allowedDomains = Arrays.asList("trusted.com", "internal.net"); if (!allowedDomains.contains(UrlUtils.extractDomain(url))) { throw new SecurityException("非授信服务地址"); } }
1.2 元数据存储优化

MySQL表结构设计:

sql
代码解读
复制代码
CREATE TABLE tb_node_config ( node_id VARCHAR(64) PRIMARY KEY, name VARCHAR(255) NOT NULL, notify_url VARCHAR(512) NOT NULL, async_monitor_url VARCHAR(512), input_params JSON COMMENT '{"param1":"String","param2":"Integer"}', output_params JSON, allow_retry TINYINT(1) DEFAULT 0, created_time DATETIME DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE INDEX idx_node_name ON tb_node_config(name);

参数序列化策略:

java
代码解读
复制代码
// 使用Jackson自定义序列化 public class ParamTypeSerializer extends JsonSerializer> { @Override public void serialize(Class value, JsonGenerator gen, SerializerProvider provider) throws IOException { gen.writeString(value.getSimpleName()); } } // 实体类注解配置 @JsonSerialize(keyUsing = ParamTypeSerializer.class) private Map> inputParams;

2. 流程引擎核心设计

2.1 增强型状态机设计
创建实例
onStart()
同步执行
处理完成
异步执行
收到回调
超时未响应
允许重试
重试成功
重试耗尽
流程完成
执行异常
终止流程
PENDING
RUNNING
SYNC_PROCESSING
ASYNC_WAITING
TIMEOUT
RETRYING
FAILED
COMPLETED

状态机实现代码:

java
代码解读
复制代码
public enum WorkflowState { PENDING, RUNNING, SYNC_PROCESSING, ASYNC_WAITING, COMPLETED, FAILED, TIMEOUT, RETRYING } public enum WorkflowEvent { START, ASYNC_CALL, CALLBACK, TIMEOUT, RETRY, FINISH, ERROR } @Configuration @EnableStateMachineFactory public class StateMachineConfig extends EnumStateMachineConfigurerAdapter { @Override public void configure(StateMachineStateConfigurer states) throws Exception { states.withStates() .initial(WorkflowState.PENDING) .states(EnumSet.allOf(WorkflowState.class)); } @Override public void configure(StateMachineTransitionConfigurer transitions) throws Exception { transitions .withExternal() .source(WorkflowState.PENDING).target(WorkflowState.RUNNING) .event(WorkflowEvent.START) .and() .withExternal() .source(WorkflowState.RUNNING).target(WorkflowState.ASYNC_WAITING) .event(WorkflowEvent.ASYNC_CALL) .action(asyncCallAction()) // 其他状态转换配置... } }
2.2 上下文管理策略

线程安全的上下文容器:

java
代码解读
复制代码
public class WorkflowContext implements Serializable { private String instanceId; private ConcurrentHashMap variables = new ConcurrentHashMap<>(); private AtomicInteger retryCount = new AtomicInteger(0); // 使用CAS保证原子操作 public void updateVariable(String key, BiFunction updateFunc) { variables.compute(key, (k, v) -> updateFunc.apply(v)); } } // 使用ThreadLocal隔离实例 public class ContextHolder { private static final ThreadLocal holder = new ThreadLocal<>(); public static void setContext(WorkflowContext context) { holder.set(context); } public static WorkflowContext getContext() { return holder.get(); } public static void clear() { holder.remove(); } }

Redis存储结构设计:

shell
代码解读
复制代码
# 流程实例存储 HSET workflow:instances ${instanceId} ${JSON序列化上下文} # 状态变更日志 ZADD workflow:logs:${instanceId} ${timestamp} "状态从RUNNING变为ASYNC_WAITING"

3. 表达式引擎实现方案

3.1 技术方案对比
引擎优势劣势适用场景
Spring EL与Spring生态无缝集成,安全性高语法相对简单,功能有限简单条件判断
Groovy动态脚本能力强大,灵活性高需要沙箱防护,性能开销较大复杂业务规则
JavaScript前端友好,学习成本低安全风险高,需严格隔离需与前端共享逻辑的场景
Aviator高性能,轻量级语法差异较大,社区生态弱高并发简单表达式
3.2 安全变量注入实现

上下文变量过滤器:

java
代码解读
复制代码
public class SafeVariableResolver implements EvaluationContext { private final Map variables; public SafeVariableResolver(Map safeVars) { this.variables = Collections.unmodifiableMap(safeVars); } @Override public Object lookupVariable(String name) { if (!variables.containsKey(name)) { throw new ExpressionException("禁止访问未声明的变量: " + name); } return variables.get(name); } } // 使用示例 EvaluationContext context = new SafeVariableResolver( ImmutableMap.of("A", 100, "B", 200) ); expression.getValue(context);

表达式执行沙箱(Groovy引擎示例):

java
代码解读
复制代码
public class GroovySandbox { private static final CompilerConfiguration config = new CompilerConfiguration(); static { config.addCompilationCustomizers( new ImportCustomizer().addStaticStars("java.lang.Math"), new SecureASTCustomizer().addAllowedMethods(Matcher.REGEX_PATTERN) ); config.setSecure(true); } public static Object eval(String script, Map params) { Binding binding = new Binding(params); GroovyShell shell = new GroovyShell(binding, config); return shell.evaluate(script); } }
3.3 性能优化方案

表达式编译缓存:

java
代码解读
复制代码
public class ExpressionCache { private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); public static boolean evaluate(String expr, Context ctx) { Expression expression = cache.computeIfAbsent(expr, e -> parser.parseExpression(e) ); return expression.getValue(ctx, Boolean.class); } }

敏感操作监控:

java
代码解读
复制代码
@Aspect @Component public class ExpressionMonitor { @Around("execution(* com.engine.evaluator.*.*(..))") public Object monitor(ProceedingJoinPoint pjp) throws Throwable { long start = System.currentTimeMillis(); try { return pjp.proceed(); } finally { long cost = System.currentTimeMillis() - start; Metrics.record("expression_eval_time", cost); if (cost > 1000) { log.warn("表达式执行超时: {}", pjp.getArgs()[0]); } } } }

4. 异常处理设计

自定义异常体系:

«abstract»
EngineException
+String errorCode
+String message
NodeTimeoutException
+String nodeId
+Duration timeout
ExpressionEvalException
+String expression
+Map context
CircularDependencyException

重试策略实现:

java
代码解读
复制代码
@Bean public RetryTemplate retryTemplate() { return RetryTemplate.builder() .maxAttempts(3) .exponentialBackoff(1000, 2, 5000) .retryOn(RemoteAccessException.class) .traversingCauses() .build(); } // 在异步执行器中应用 public class AsyncExecutor { @Retryable(value = TimeoutException.class, backoff = @Backoff(delay = 1000, multiplier = 2)) public void executeWithRetry(Node node) { // 调用远程服务 } }

四、代码实现与演示(落地实现篇)


1. 基础框架搭建

1.1 Spring Boot项目初始化
bash
代码解读
复制代码
# 使用Spring Initializr生成项目 curl https://start.spring.io/starter.zip \ -d dependencies=web,data-jpa,redis,validation,actuator \ -d packageName=com.engine \ -d name=rule-engine \ -d javaVersion=17 \ -o rule-engine.zip
1.2 分层结构代码示例
java
代码解读
复制代码
// 领域对象定义 public class WorkflowInstance { @Id private String instanceId; @Embedded private WorkflowContext context; @Enumerated(EnumType.STRING) private WorkflowState state; } // 核心引擎接口 public interface WorkflowEngine { void start(Workflow workflow); void resume(String instanceId); void pause(String instanceId); } // 异步执行器组件 @Async public class AsyncExecutor { @Autowired private TaskMonitor taskMonitor; public CompletableFuture executeAsync(Node node, WorkflowContext context) { return CompletableFuture.runAsync(() -> { // 异步执行逻辑 }, taskMonitor.getAsyncThreadPool()); } }

2. 增强型流程执行实现

2.1 责任链模式优化实现
java
代码解读
复制代码
public abstract class NodeHandler { private NodeHandler next; public void setNext(NodeHandler next) { this.next = next; } public void handle(Node node, WorkflowContext context) { if (canHandle(node)) { process(node, context); } if (next != null) { next.handle(node, context); } } protected abstract boolean canHandle(Node node); protected abstract void process(Node node, WorkflowContext context); } // 同步节点处理器 @Component public class SyncHandler extends NodeHandler { @Override protected boolean canHandle(Node node) { return !node.isAsync(); } @Override protected void process(Node node, WorkflowContext context) { try { Object result = restTemplate.postForObject( node.getNotifyUrl(), context.getParams(), Object.class ); context.updateOutput(node.getNodeId(), result); } catch (RestClientException e) { throw new NodeExecutionException("同步节点执行失败", e); } } }
2.2 流程执行核心逻辑
java
代码解读
复制代码
public class WorkflowExecutor { @Autowired private List handlers; public void execute(WorkflowContext context) { buildHandlerChain(); List path = context.getExecutionPath(); for (Node node : path) { if (!checkPreconditions(node, context)) { handleBlocking(context); return; } executeNode(node, context); } } private void buildHandlerChain() { NodeHandler chain = new DefaultHandler(); for (NodeHandler handler : handlers) { chain.setNext(handler); chain = handler; } } private void executeNode(Node node, WorkflowContext context) { context.beforeExecute(node); try { chain.handle(node, context); context.afterExecute(node); } catch (Exception e) { context.markFailed(node, e); } } }

3. 异步监控深度实现

3.1 定时轮询设计
java
代码解读
复制代码
@Configuration @EnableScheduling public class AsyncMonitorConfig { @Bean public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.setThreadNamePrefix("AsyncMonitor-"); return scheduler; } } @Component public class AsyncTaskMonitor { @Autowired private RedisTemplate redisTemplate; @Scheduled(fixedRate = 5000) public void pollAsyncTasks() { Set taskIds = redisTemplate.opsForZSet() .rangeByScore("async:tasks", 0, System.currentTimeMillis()); taskIds.forEach(taskId -> { String callbackUrl = redisTemplate.opsForValue().get(taskId); checkTaskStatus(taskId, callbackUrl); }); } private void checkTaskStatus(String taskId, String callbackUrl) { // 调用回调接口并处理响应 } }
3.2 回调处理机制
java
代码解读
复制代码
@RestController @RequestMapping("/callback") public class CallbackController { @PostMapping("/{taskId}") public ResponseEntity handleCallback( @PathVariable String taskId, @RequestBody CallbackResult result) { WorkflowContext context = contextService.recoverContext(taskId); if (result.isSuccess()) { context.updateVariable("output", result.getData()); workflowEngine.resume(context.getInstanceId()); } else { workflowEngine.retry(context.getInstanceId()); } return ResponseEntity.accepted().build(); } }
3.3 异步任务状态管理
java
代码解读
复制代码
public class AsyncTaskManager { private static final String ASYNC_TASK_PREFIX = "async:task:"; public String registerAsyncTask(Node node, WorkflowContext context) { String taskId = generateTaskId(node); Map taskData = Map.of( "callbackUrl", node.getAsyncMonitorUrl(), "context", serializeContext(context) ); redisTemplate.opsForHash().putAll(ASYNC_TASK_PREFIX + taskId, taskData); redisTemplate.expire(ASYNC_TASK_PREFIX + taskId, 1, TimeUnit.HOURS); return taskId; } private String generateTaskId(Node node) { return node.getNodeId() + "-" + UUID.randomUUID(); } }

4. 异常处理增强实现

4.1 全局异常处理器
java
代码解读
复制代码
@ControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler(NodeExecutionException.class) public ResponseEntity handleNodeError(NodeExecutionException ex) { ErrorResponse response = new ErrorResponse( "NODE_EXECUTION_ERROR", ex.getMessage(), Map.of("nodeId", ex.getNodeId()) ); return ResponseEntity.status(503).body(response); } @ExceptionHandler(ExpressionEvalException.class) public ResponseEntity handleExpressionError(ExpressionEvalException ex) { ErrorResponse response = new ErrorResponse( "EXPRESSION_ERROR", "条件表达式计算错误", Map.of("expression", ex.getExpression()) ); return ResponseEntity.badRequest().body(response); } }
4.2 熔断机制实现
java
代码解读
复制代码
@CircuitBreaker(name = "nodeService", fallbackMethod = "fallbackExecute") public Object executeNodeWithCircuitBreaker(Node node, WorkflowContext context) { return nodeService.execute(node, context); } private Object fallbackExecute(Node node, WorkflowContext context, Throwable t) { log.error("节点服务熔断降级", t); context.markDegraded(node); return DEFAULT_FALLBACK_VALUE; }

5. 完整执行流程图解

ClientEngineNodeANodeBRedisloop[定时轮询]启动流程持久化初始状态同步调用即时响应更新上下文异步调用返回202 Accepted注册监控任务查询处理状态处理中最终回调更新最终状态流程完成通知ClientEngineNodeANodeBRedis

五、测试与优化(验证篇)


1. 单元测试策略

核心测试场景:

节点注册校验
空URL检测
参数类型冲突
条件表达式
非法语法拦截
变量不存在防护
流程引擎
环路检测
状态流转验证
1.1 边界条件测试用例

示例1:空节点检测

java
代码解读
复制代码
@Test void shouldThrowExceptionWhenRegisterEmptyNode() { NodeConfig config = new NodeConfig(); config.setNodeId("test-node"); assertThrows(ConstraintViolationException.class, () -> nodeService.register(config)); }

示例2:极端参数测试

java
代码解读
复制代码
@ParameterizedTest @ValueSource(strings = {"A>100", "B == null", "obj.field[0] < 5"}) void shouldEvaluateComplexExpressions(String expr) { EvaluationContext context = createTestContext(); assertDoesNotThrow(() -> evaluator.evaluate(expr, context)); }
1.2 测试覆盖率优化

Jacoco配置示例:

xml
代码解读
复制代码
<plugin> <groupId>org.jacocogroupId> <artifactId>jacoco-maven-pluginartifactId> <configuration> <excludes> <exclude>**/config/**exclude> <exclude>**/model/**exclude> excludes> configuration> plugin>

覆盖率报告:

shell
代码解读
复制代码
# 生成报告 mvn jacoco:report # 查看覆盖率 Class Coverage: 92% Method Coverage: 85% Line Coverage: 80%

2. 压力测试方案

测试场景设计:

场景并发量节点类型预期指标
纯同步流程500 TPS快速响应(<10ms)成功率 >99.9%
混合型流程200 TPS含50%异步节点平均延迟 <1s
极限压力测试1000 TPS高延迟节点(2s)系统不崩溃
2.1 JMeter测试脚本

测试计划结构:

xml
代码解读
复制代码
<TestPlan> <ThreadGroup> <numThreads>100numThreads> <rampUp>60rampUp> <LoopController> <loops>100loops> LoopController> <HTTPSampler> <method>POSTmethod> <path>/api/workflow/startpath> <body>{"workflowId":"stress-test"}body> HTTPSampler> <ResponseAssertion> <testField>Response CodetestField> <testType>2testType> <testValue>202testValue> ResponseAssertion> ThreadGroup> TestPlan>

关键监控指标:

java
代码解读
复制代码
// 自定义监控指标 public class EngineMetrics { static final Counter executedNodes = Metrics.counter("engine.nodes.executed"); static final Timer asyncLatency = Metrics.timer("engine.async.latency"); public static void recordAsyncTime(Duration duration) { asyncLatency.record(duration); } }
2.2 分布式压力测试
bash
代码解读
复制代码
# 启动JMeter集群 jmeter -n -t test-plan.jmx -R 192.168.1.101,192.168.1.102 # 实时监控命令 watch -n 1 "curl -s http://localhost:8080/actuator/metrics/engine.nodes.executed | jq"

3. 性能优化技巧

3.1 线程池调优参数

最佳实践配置:

yaml
代码解读
复制代码
# application.yml executor: core-pool-size: ${CPU_CORES * 2} max-pool-size: ${CPU_CORES * 4} queue-capacity: 1000 keep-alive: 60s allow-core-thread-timeout: true

动态调整实现:

java
代码解读
复制代码
@Scheduled(fixedRate = 5000) public void adjustThreadPool() { int activeCount = threadPool.getActiveCount(); if (activeCount > threadPool.getMaximumPoolSize() * 0.8) { threadPool.setMaxPoolSize(threadPool.getMaxPoolSize() + 10); } }
3.2 上下文复用策略

对象池实现:

java
代码解读
复制代码
public class ContextPool { private final GenericObjectPool pool; public ContextPool() { pool = new GenericObjectPool<>(new BasePooledObjectFactory<>() { @Override public WorkflowContext create() { return new WorkflowContext(); } @Override public void passivateObject(PooledObject p) { p.getObject().clear(); } }); } public WorkflowContext borrow() throws Exception { return pool.borrowObject(); } }

缓存优化示例:

java
代码解读
复制代码
@Cacheable(cacheNames = "expressionCache", key = "#expr") public Expression compileExpression(String expr) { return parser.parseExpression(expr); }
3.3 其他关键优化点
优化方向具体措施
网络通信使用HTTP连接池(最大连接数500,每路由最大连接50)
序列化采用Protobuf替换JSON(体积减少60%,解析速度提升3倍)
数据库启用批量插入(batch_size=500) + 二级缓存
垃圾回收G1GC参数优化(-XX:MaxGCPauseMillis=200)

4. 性能对比数据

优化前后对比(单机8核16G):

场景优化前TPS优化后TPS提升幅度
简单同步流程12002400100%
复杂异步流程300750150%
高并发场景800(失败率15%)1500(失败率0.5%)87.5%

5. 持续优化建议

  1. 火焰图分析:

    bash
    代码解读
    复制代码
    # 生成性能分析数据 async-profiler/profiler.sh -d 60 -f flamegraph.html
  2. GC日志分析:

    bash
    代码解读
    复制代码
    java -Xlog:gc*=debug:file=gc.log -jar app.jar
  3. 连接池监控:

    java
    代码解读
    复制代码
    // Druid监控配置 @Bean public ServletRegistrationBean druidServlet() { return new ServletRegistrationBean<>(new StatViewServlet(), "/druid/*"); }

六、扩展与展望(进阶篇)


1. 分布式扩展方案深度实现

1.1 增强型分布式锁设计
InstanceARedisInstanceBloop[业务处理]SET lock:001 UUID EX 30 NXOKSET lock:001 UUID2 EX 30 NXnil执行流程操作EVAL解锁Lua脚本InstanceARedisInstanceB

Redisson实现方案:

java
代码解读
复制代码
public class DistributedLockService { private final RedissonClient redisson; public void executeWithLock(String lockKey, Runnable task) { RLock lock = redisson.getLock(lockKey); try { if (lock.tryLock(5, 30, TimeUnit.SECONDS)) { task.run(); } } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } } // 流程引擎调用示例 distributedLockService.executeWithLock("wf:"+instanceId, () -> { WorkflowContext context = loadContext(instanceId); engine.process(context); saveContext(context); });
1.2 流程分片存储设计
java
代码解读
复制代码
// 基于一致性哈希的分片策略 public class ShardingStrategy { private static final int VIRTUAL_NODES = 160; public String getShard(String key) { TreeMap hashRing = buildHashRing(); long hash = hash(key); SortedMap tailMap = hashRing.tailMap(hash); return tailMap.isEmpty() ? hashRing.firstEntry().getValue() : tailMap.get(tailMap.firstKey()); } private TreeMap buildHashRing() { // 构建虚拟节点环 } } // Redis分片连接配置 @Bean public RedisConnectionFactory shardedConnectionFactory() { List nodes = Arrays.asList( new RedisNode("192.168.1.101", 6379), new RedisNode("192.168.1.102", 6380) ); RedisClusterConfiguration config = new RedisClusterConfiguration(nodes); return new JedisConnectionFactory(config); }
1.3 分布式事务补偿方案
Begin
Error
Compensate
Fail
INITIAL
PROCESSING
COMPENSATING
ROLLBACK_SUCCESS
ROLLBACK_FAILED
ALARM

2. 可视化配置界面实现

2.1 前端架构设计
Vue3
状态管理Pinia
可视化库G6
组件库ElementPlus
流程配置Store
画布渲染
表单配置

典型组件实现:

vue
代码解读
复制代码
2.2 前后端交互规范
typescript
代码解读
复制代码
// API接口定义 interface FlowAPI { POST /api/flows: { body: FlowConfig response: { flowId: string } } GET /api/flows/{id}: { response: FlowConfig } } // WebSocket消息协议 interface WsMessage { type: 'SYNC_UPDATE' | 'COLLAB_EDIT' payload: Partial<FlowConfig> }
2.3 可视化调试支持
javascript
代码解读
复制代码
// 调试器实现原理 class Debugger { constructor(flow) { this.breakpoints = new Set() this.executionTrace = [] } stepOver() { this.engine.executeNextStep() this.updateTrace() } watchVariables(vars) { return Proxy(this.context.variables, { set: (target, prop, value) => { this.logChange(prop, value) return Reflect.set(target, prop, value) } }) } }

3. 规则版本控制方案

3.1 Git版本管理集成
UIAPIGitLab提交新规则git commit -am "feat: new rule"返回commit SHA显示版本号UIAPIGitLab

JGit实现示例:

java
代码解读
复制代码
public class GitService { private final Git git; public String commitChange(String message) { git.add().addFilepattern(".").call(); RevCommit commit = git.commit() .setMessage(message) .setAuthor("engine", "[email protected]") .call(); return commit.getId().name(); } public void rollback(String commitId) { git.reset().setMode(ResetCommand.ResetType.HARD).setRef(commitId).call(); } }
3.2 版本对比算法
java
代码解读
复制代码
public class DiffEngine { public List compare(FlowConfig v1, FlowConfig v2) { return DiffBuilder.compare(Input.fromJson(v1)) .withTest(Input.fromJson(v2)) .withNodeFilter(node -> !node.getName().equals("metadata")) .build() .getDifferences(); } } // 使用示例 List changes = diffEngine.compare(oldVersion, newVersion); changes.forEach(change -> { System.out.println(change.getType() + " " + change.getPath()); });
3.3 版本发布策略
yaml
代码解读
复制代码
# 发布流水线配置示例 stages: - test - canary - production release_rules: - match: feature/* env: staging - match: release/* env: production

4. 未来演进方向

4.1 云原生支持
Kubernetes
Operator模式
HPA自动扩缩
Service Mesh
分布式追踪
4.2 智能规则推荐
python
代码解读
复制代码
# 基于历史数据的规则优化建议 def analyze_rules(): df = load_execution_logs() cluster = DBSCAN(eps=0.5).fit(df[['duration','error_rate']]) return cluster.labels_
4.3 区块链存证
solidity
代码解读
复制代码
// 智能合约示例 contract FlowAudit { struct Version { string hash; uint timestamp; } mapping(string => Version[]) public versions; function recordVersion(string memory flowId, string memory hash) public { versions[flowId].push(Version(hash, block.timestamp)); } }

七、流程引擎状态机深度实现方案


1. 状态机核心设计

1.1 状态/事件定义
java
代码解读
复制代码
// 状态枚举(含超时状态) public enum WorkflowState { CREATED, // 初始状态 READY, // 就绪状态 RUNNING, // 执行中 ASYNC_WAITING, // 等待异步回调 SUSPENDED, // 人工挂起 COMPLETED, // 成功结束 FAILED, // 失败结束 RETRYING // 重试中 } // 事件枚举(含超时事件) public enum WorkflowEvent { START, // 启动流程 NODE_COMPLETE, // 节点完成 ASYNC_CALLBACK, // 异步回调 MANUAL_RETRY, // 手动重试 TIMEOUT, // 超时事件 FAILURE, // 执行失败 FORCE_COMPLETE // 强制完成 }
1.2 状态转移配置
java
代码解读
复制代码
@Configuration @EnableStateMachineFactory public class StateMachineConfig extends StateMachineConfigurerAdapter { @Override public void configure(StateMachineStateConfigurer states) throws Exception { states .withStates() .initial(WorkflowState.CREATED) .state(WorkflowState.READY, entryAction(), exitAction()) .state(WorkflowState.RUNNING, new Action(){/* 进入运行状态逻辑 */}) .state(WorkflowState.ASYNC_WAITING, asyncWaitAction()) .end(WorkflowState.COMPLETED) .end(WorkflowState.FAILED); } @Override public void configure(StateMachineTransitionConfigurer transitions) throws Exception { transitions // 启动流程 .withExternal() .source(WorkflowState.CREATED) .target(WorkflowState.READY) .event(WorkflowEvent.START) .action(startAction()) // 节点异步调用 .and().withExternal() .source(WorkflowState.RUNNING) .target(WorkflowState.ASYNC_WAITING) .event(WorkflowEvent.NODE_COMPLETE) .guard(asyncConditionGuard()) // 异步回调处理 .and().withExternal() .source(WorkflowState.ASYNC_WAITING) .target(WorkflowState.RUNNING) .event(WorkflowEvent.ASYNC_CALLBACK) .action(callbackAction()) // 超时转移 .and().withInternal() .source(WorkflowState.ASYNC_WAITING) .action(timeoutAction()) .timerOnce(30000) // 30秒超时 // 重试机制 .and().withExternal() .source(WorkflowState.FAILED) .target(WorkflowState.RETRYING) .event(WorkflowEvent.MANUAL_RETRY) .action(retryAction()); } }

2. 状态持久化实现

2.1 状态存储结构
java
代码解读
复制代码
@RedisHash("WorkflowStateMachine") public class StateMachineContext { @Id private String machineId; private WorkflowState currentState; private Map contextData; private LocalDateTime lastUpdated; } // 自定义Repository实现 public interface StateMachineRepository extends CrudRepository { @Query("{ 'currentState' : ?0 }") List findByState(WorkflowState state); }
2.2 持久化拦截器
java
代码解读
复制代码
public class PersistStateMachineInterceptor extends StateMachineInterceptorAdapter { @Override public void preStateChange(State state, Message message, Transition transition, StateMachine stateMachine) { // 保存当前状态到Redis StateMachineContext context = new StateMachineContext(); context.setMachineId(stateMachine.getId()); context.setCurrentState(state.getId()); context.setContextData(stateMachine.getExtendedState().getVariables()); repository.save(context); } }

3. 关键业务逻辑实现

3.1 异步回调处理
java
代码解读
复制代码
public class AsyncCallbackHandler { @Autowired private StateMachineService stateMachineService; @PostMapping("/callback/{machineId}") public DeferredResult handleCallback( @PathVariable String machineId, @RequestBody CallbackResult result) { DeferredResult deferredResult = new DeferredResult<>(30000L); stateMachineService.acquireLock(machineId, () -> { StateMachine sm = stateMachineService.getStateMachine(machineId); if (result.isSuccess()) { sm.getExtendedState().getVariables().putAll(result.getData()); sm.sendEvent(WorkflowEvent.ASYNC_CALLBACK); } else { sm.sendEvent(WorkflowEvent.FAILURE); } deferredResult.setResult("PROCESSED"); }); return deferredResult; } }
3.2 超时补偿机制
java
代码解读
复制代码
@Component public class TimeoutMonitor { @Scheduled(fixedRate = 5000) public void checkTimeoutInstances() { List waitingInstances = repository.findByState(WorkflowState.ASYNC_WAITING); waitingInstances.stream() .filter(ctx -> ctx.getLastUpdated() .isBefore(LocalDateTime.now().minusSeconds(30))) .forEach(ctx -> { StateMachine sm = stateMachineService.getStateMachine(ctx.getMachineId()); sm.sendEvent(WorkflowEvent.TIMEOUT); }); } }

4. 完整状态转移图

START
startProcessing()
asyncCall()
ASYNC_CALLBACK
TIMEOUT
finish()
error()
MANUAL_RETRY
retrySuccess()
retryFailed()
terminate()
CREATED
READY
RUNNING
ASYNC_WAITING
FAILED
COMPLETED
RETRYING

5. 测试验证方案

5.1 状态转移测试用例
java
代码解读
复制代码
@SpringBootTest public class StateMachineTests { @Autowired private StateMachineFactory factory; @Test void testNormalFlow() { StateMachine sm = factory.getStateMachine(); sm.start(); sm.sendEvent(WorkflowEvent.START); assertThat(sm.getState().getId()).isEqualTo(WorkflowState.READY); sm.sendEvent(WorkflowEvent.NODE_COMPLETE); assertThat(sm.getState().getId()).isEqualTo(WorkflowState.ASYNC_WAITING); sm.sendEvent(WorkflowEvent.ASYNC_CALLBACK); assertThat(sm.getState().getId()).isEqualTo(WorkflowState.RUNNING); } @Test void testTimeoutRecovery() { // 模拟超时场景 StateMachineContext ctx = new StateMachineContext(); ctx.setCurrentState(WorkflowState.ASYNC_WAITING); ctx.setLastUpdated(LocalDateTime.now().minusMinutes(1)); repository.save(ctx); timeoutMonitor.checkTimeoutInstances(); StateMachine sm = stateMachineService.getStateMachine(ctx.getMachineId()); assertThat(sm.getState().getId()).isEqualTo(WorkflowState.FAILED); } }

6. 生产级增强功能

6.1 分布式锁集成
java
代码解读
复制代码
public class DistributedLockAwareStateMachine extends DefaultStateMachine { private final RedissonClient redisson; @Override public void sendEvent(Message event) { RLock lock = redisson.getLock(getId()); try { lock.lock(); super.sendEvent(event); } finally { lock.unlock(); } } }
6.2 监控埋点
java
代码解读
复制代码
@Aspect @Component public class StateMachineMonitor { @Around("execution(* org.springframework.statemachine.StateMachine.sendEvent(..))") public Object monitorEvent(ProceedingJoinPoint pjp) throws Throwable { long start = System.currentTimeMillis(); String event = ((Message)pjp.getArgs()[0]).getPayload().toString(); String machineId = ((StateMachine)pjp.getTarget()).getId(); try { return pjp.proceed(); } finally { Metrics.timer("state.event.duration") .tag("event", event) .record(System.currentTimeMillis() - start); } } }

7.实现建议

  1. 版本选择:推荐使用Spring State Machine 3.0+,支持响应式编程模型
  2. 调试工具:集成State Machine Visualizer(SMV)进行运行时状态跟踪
  3. 灾备方案:定期将Redis中的状态快照持久化到MySQL
  4. 性能优化:对高频状态转移路径进行缓存预热
注:本文转载自juejin.cn的重庆穿山甲的文章"https://juejin.cn/post/7478234774046883855"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2491) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

103
后端
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top