1. 上期回顾在第六期中我们分析了云端广播与交易确认机制。可以简单概括为融合终端生成交易 ↓ 写入本地 DAG 账本 ↓ 广播给 cloud 和其他 fusion ↓ cloud 插入全局账本 ↓ cloud 根据累计权重产生确认动作 ↓ 确认动作同步回各融合终端到这里为止我们已经看懂了项目的核心仿真逻辑设备注册、遥测上报、DAG 父节点选择、云端确认。这一期继续分析一个后端项目很重要的部分数据如何保存查询结果如何缓存服务重启后系统如何恢复2. 本期学习目标本期重点解决四个问题1. 项目中哪些数据会写入 MySQL 2. DAG 账本为什么采用“快照式持久化” 3. Redis 缓存了哪些接口结果 4. 写入操作后缓存为什么要失效本期重点涉及这些类LedgerStateStore DeviceSessionStore AsyncLedgerPersistenceCoordinator TransactionPersistenceMapper LedgerTransactionEntity DeviceSessionEntity RedisCacheConfig CacheConstant SimulationQueryServiceImpl SimulationCommandServiceImpl SimulationRuntimeContext3. MySQL 和 Redis 在项目中的分工这个项目不是纯内存仿真而是同时使用了 MySQL 和 Redis。可以简单理解为MySQL保存系统状态支持服务重启后恢复 Redis缓存查询结果提高高频读接口响应速度MySQL 主要保存两类数据ledger_transactions账本交易快照 device_sessions设备会话信息Redis 主要缓存查询接口结果例如health topology cloud ledger fusion list fusion ledger device list这两个组件的作用并不相同MySQL 解决“数据不能丢”的问题 Redis 解决“查询不要每次都重新组装”的问题。4. 账本持久化的核心类LedgerStateStore账本持久化主要由LedgerStateStore完成。这个类主要提供三个方法hasTerminalSnapshot(terminalId) loadLedger(terminalId) replaceLedger(terminalId, transactions)它的作用可以理解为检查某个 terminal 是否已有账本快照从 MySQL 加载某个 terminal 的账本用新的交易集合替换某个 terminal 的账本快照。其中replaceLedger()的逻辑是先删除某个 terminal 原有的账本记录再把当前内存中的交易集合重新保存进去。这个方法上使用了Transactional说明删除旧快照和保存新快照属于同一个事务过程。也就是说这个项目不是每生成一笔交易就单独插入一条增量记录而是采用了“账本快照替换”的方式。5. 什么是账本快照式持久化快照式持久化可以这样理解某个 terminal 当前账本中有多少交易 就把这些交易整体作为该 terminal 的最新账本状态保存到 MySQL。例如fusion1当前账本中有G1、G2、R1、B1、B2那么持久化时MySQL 中fusion1对应的旧交易记录会被删除然后保存这 5 条最新交易记录。它的流程类似读取内存账本 ↓ 复制交易集合 ↓ 删除 MySQL 中该 terminal 的旧快照 ↓ 保存新的交易集合 ↓ flush 到数据库这种方式的好处是逻辑简单恢复时也简单每个 terminal 只需要加载自己最新保存的一组交易即可。缺点也很明显如果账本很大每次替换整个快照的开销会比较高。因此这个实现更适合仿真系统或中小规模实验。如果是生产级系统后续可能要改成增量持久化或事件日志式持久化。6. 账本交易表ledger_transactionsLedgerTransactionEntity对应 MySQL 中的ledger_transactions表。这个实体类中包含了大量交易字段例如terminal_id tx_id tx_type source_terminal_id source_terminal_pubkey parent_ids_json timestamp_value created_round_value payload_hash terminal_signature device_id device_pubkey target_terminal_id attributes_json payload_json cumulative_weight_value status_value confirmed_at soft_prune_notified_at soft_pruned_at hard_pruned_at其中表上有一个唯一约束terminal_id tx_id这说明同一笔交易可以存在于不同 terminal 的账本中但在同一个 terminal 内不能重复保存同一个交易 ID。这个设计和项目的多节点账本模型是一致的cloud 有自己的账本快照 fusion1 有自己的账本快照 fusion2 有自己的账本快照 fusion3 有自己的账本快照。同一个tx_id可以出现在多个 terminal 的账本中因为它们分别代表不同节点看到的账本状态。7. TransactionPersistenceMapper领域对象和数据库实体的转换Transaction是核心领域对象LedgerTransactionEntity是数据库实体。二者之间的转换由TransactionPersistenceMapper完成。它主要提供两个方法toEntity(terminalId, transaction) toTransaction(entity)toEntity()负责把内存中的Transaction转成可以保存到数据库的实体toTransaction()负责从数据库实体恢复成内存中的Transaction。这里有一个细节parentIds、attributes、payload、tombstone等复杂字段不是拆成多张表而是被序列化成 JSON 字符串保存。例如parentIds → parent_ids_json attributes → attributes_json payload → payload_json tombstone → tombstone_json这样做让表结构比较简单也方便保存交易中的动态字段。尤其是payload本身就是一个 Map 结构直接保存为 JSON 比拆成很多列更灵活。8. 异步账本持久化AsyncLedgerPersistenceCoordinator如果每次注册设备或提交遥测数据后都在请求线程里同步写 MySQL那么接口响应会被数据库写入阻塞。所以项目引入了AsyncLedgerPersistenceCoordinator。这个类内部创建了一个单线程执行器ledger-persistence当系统需要保存账本时会调用persistLedgers(ledgerSnapshots)它不会在当前请求线程里直接写数据库而是把持久化任务提交给后台执行器。简化流程如下注册设备 / 提交遥测 ↓ 内存账本先更新 ↓ 收集 cloud 和各 fusion 的账本快照 ↓ 提交异步持久化任务 ↓ 请求可以继续返回 ↓ 后台线程慢慢写 MySQL这就是 README 中提到的“异步 MySQL 快照写入”。9. 异步持久化保存了哪些状态在SimulationRuntimeContext中持久化时会调用collectLedgerSnapshots()它会分别收集cloud 的交易集合 fusion1 的交易集合 fusion2 的交易集合 fusion3 的交易集合然后交给ledgerPersistenceCoordinator.persistLedgers(...)后台线程会遍历这些账本快照并调用LedgerStateStore.replaceLedger()保存每个 terminal 的账本。因此一次持久化并不是只保存当前产生交易的那个融合终端而是保存所有账本状态。可以理解为一次状态变化后 系统保存 cloud 所有 fusion 的最新账本视图。10. 异步持久化的状态监控AsyncLedgerPersistenceCoordinator还维护了一些统计字段submittedBatchCount completedBatchCount lastError latestTask并通过summary()返回mode async_mysql_snapshot submitted_batches completed_batches last_error这些信息会出现在系统的 storage summary 中用于查看异步持久化是否正常。可以理解为submitted_batches提交了多少次异步保存任务 completed_batches完成了多少次异步保存任务 last_error最近一次持久化错误如果测试时发现数据没有及时写入 MySQL可以优先看这些指标。11. 服务关闭前的 flush异步写入有一个常见风险任务还没写完服务就关闭了。为了解决这个问题AsyncLedgerPersistenceCoordinator中定义了shutdown()方法并使用了PreDestroy。关闭前它会调用flushLatestTask()也就是等待最近一次异步持久化任务完成然后再关闭执行器。这个设计可以降低服务关闭时丢失最新账本快照的风险。不过它也不是完整的数据库事务日志机制。它更适合当前仿真项目的需求保证大多数情况下服务关闭前能把最新快照写完。12. 设备会话持久化DeviceSessionStore除了账本交易项目还需要保存设备会话。为什么要保存设备会话因为设备注册后系统后续提交遥测数据时需要知道这个 deviceId 是否存在 它属于哪个 terminal 它的设备名称是什么 它的签名私钥和公钥是什么 它是否是 bootstrap 身份。这些信息由DeviceSessionStore保存。它提供两个主要方法loadAll() save(snapshot)save()会把设备会话快照保存成DeviceSessionEntityloadAll()会从数据库中读取所有设备会话并转成DeviceSessionSnapshot。13. 设备会话表device_sessionsDeviceSessionEntity对应数据库中的device_sessions表。其中保存了device_id device_name terminal_id sign_privkey sign_pubkey bootstrap_identity表上有一个唯一约束device_id也就是说一个设备 ID 在设备会话表中只能出现一次。这里需要注意一个安全问题该项目为了仿真方便把设备私钥sign_privkey也保存到了数据库中。这样服务重启后模拟设备仍然可以继续签名并提交遥测数据。从真实系统角度看设备私钥一般不应该由服务端保存但在这个项目中设备是由后端模拟出来的所以保存私钥是为了维持仿真连续性。14. 服务启动时如何恢复状态SimulationRuntimeContext初始化时会检查 cloud 是否已有账本快照cloudSnapshotRestored ledgerStateStore.hasTerminalSnapshot(cloud)如果存在快照说明之前系统运行过就会执行恢复流程restoreLedger(cloud, cloud) restoreLedger(fusion1, fusion1) restoreLedger(fusion2, fusion2) restoreLedger(fusion3, fusion3) restoreDeviceSessions()如果没有快照则说明是第一次启动系统会执行初始化同步syncInitialLedgers()也就是把初始 genesis 账本状态保存到 MySQL。所以项目启动流程可以理解为检查 MySQL 是否有 cloud 快照 ↓ 有从 MySQL 恢复 cloud 和 fusion 账本再恢复设备会话 ↓ 没有创建初始账本并写入初始快照15. 恢复账本时发生了什么恢复账本时SimulationRuntimeContext会调用LedgerStateStore.loadLedger(terminalId)从 MySQL 中按时间和交易 ID 顺序加载交易。然后对 cloud 和 fusion 分别恢复。cloud 恢复时会把非 genesis 交易插入 cloud 的 DAG 账本并写入 archivefusion 恢复时会把非 genesis 交易插入对应融合终端的 DAG 账本。可以简化成MySQL ledger_transactions ↓ TransactionPersistenceMapper.toTransaction() ↓ 恢复成 Transaction 对象 ↓ 插入 cloud / fusion 的 DagLedger这样服务重启后系统可以恢复之前的账本状态而不是每次都从空账本开始。16. Redis 缓存配置RedisCacheConfigRedis 缓存由RedisCacheConfig配置。它创建了一个CacheManager并指定key 使用 StringRedisSerializer value 使用 GenericJackson2JsonRedisSerializer 不缓存 null 值 默认过期时间 60 秒同时不同缓存项设置了不同 TTL。例如simulationHealth15 秒 simulationTopology30 秒 cloudLedger30 秒 fusionList60 秒 fusionLedger30 秒 deviceList30 秒这说明项目不是所有接口都用同一个缓存时间而是根据数据变化频率做了区分。17. 缓存名称集中管理CacheConstant缓存名没有散落在代码中而是集中定义在CacheConstant中simulationHealth simulationTopology cloudLedger fusionList fusionLedger deviceList这样做的好处是避免字符串写错方便统一修改缓存名查询缓存和写入清理缓存时可以共用同一组常量。CacheConstant是一个典型的小型常量类。18. 查询接口如何使用 Redis 缓存查询接口的缓存主要在SimulationQueryServiceImpl中。例如health() 使用 simulationHealth 缓存 topology() 使用 simulationTopology 缓存 cloudLedger() 使用 cloudLedger 缓存 listFusions() 使用 fusionList 缓存 fusionLedger(terminalId) 使用 fusionLedger 缓存并以 terminalId 作为 key listDevices() 使用 deviceList 缓存也就是说当用户多次请求相同查询接口时系统不一定每次都重新组装数据而是可以直接从 Redis 读取缓存结果。查询流程可以表示为GET /api/cloud/ledger ↓ 检查 Redis 是否有 cloudLedger 缓存 ↓ 有直接返回缓存 ↓ 没有调用 runtimeContext.cloudLedger() ↓ 组装 LedgerViewVO ↓ 写入 Redis ↓ 返回结果19. 写入操作为什么要清理缓存既然查询接口用了缓存那么写入操作后就必须清理缓存。例如注册设备后设备列表变了拓扑结构变了融合终端账本变了cloud 账本可能也变了health 里的设备数量也变了。如果不清理缓存用户可能会看到旧数据。因此SimulationCommandServiceImpl在注册设备和提交遥测数据时都使用了Caching(evict {...})一次性清理多个缓存simulationHealth simulationTopology cloudLedger fusionList fusionLedger deviceList这说明项目采用的是查询时缓存 写入后失效。而不是手动更新每个缓存。20. 注册设备时的持久化与缓存流程以设备注册为例完整流程可以总结为POST /api/fusions/fusion1/devices/register ↓ SimulationCommandServiceImpl.registerDevice() ↓ 清理相关 Redis 缓存 ↓ SimulationRuntimeContext.registerDevice() ↓ 创建设备模拟器 ↓ fusion1 生成 register 交易 ↓ 保存 DeviceSession 到 MySQL ↓ 广播 register 交易到 cloud 和其他 fusion ↓ 如果 autoConfirmtrue执行注册确认 ↓ collectLedgerSnapshots() ↓ AsyncLedgerPersistenceCoordinator.persistLedgers() ↓ 后台线程保存 cloud 所有 fusion 的账本快照 ↓ 返回注册结果这里同时发生了两类数据变化device_sessions 增加设备会话 ledger_transactions 更新各 terminal 账本快照。21. 提交遥测时的持久化与缓存流程遥测上报时流程类似POST /api/fusions/fusion1/devices/{deviceId}/telemetry ↓ SimulationCommandServiceImpl.submitTelemetry() ↓ 清理相关 Redis 缓存 ↓ SimulationRuntimeContext.submitTelemetry() ↓ 检查设备会话 ↓ 设备签名 ↓ 融合终端验签 ↓ 生成 business 交易 ↓ 广播到 cloud 和其他 fusion ↓ collectLedgerSnapshots() ↓ 异步保存账本快照到 MySQL ↓ 返回遥测提交结果和注册不同遥测上报一般不会新增设备会话但会更新账本交易状态所以仍然需要持久化账本快照也需要清理缓存。22. 整体数据流图可以把 MySQL 和 Redis 的整体作用画成下面这样写入请求 POST register / telemetry ↓ 修改内存状态 ↓ 更新 DagLedger / DeviceSession ↓ 清理 Redis 查询缓存 ↓ 异步写 MySQL 快照 ↓ 返回结果 查询请求 GET health / topology / ledger / devices ↓ 优先查 Redis ↓ 缓存命中直接返回 ↓ 缓存未命中读取内存状态 ↓ 组装 VO ↓ 写入 Redis ↓ 返回结果 服务重启 ↓ 从 MySQL 读取 ledger_transactions ↓ 恢复 cloud / fusion 账本 ↓ 从 MySQL 读取 device_sessions ↓ 恢复设备会话这个图基本覆盖了项目的数据管理逻辑。23. 当前实现的特点这一部分实现有几个明显特点。第一账本以 terminal 为单位保存快照。cloud、fusion1、fusion2、fusion3 各自都有自己的账本视图MySQL 里用terminal_id区分。第二账本持久化是异步的。写请求不会长时间等待 MySQL 保存所有账本快照减少了接口阻塞。第三设备会话是同步保存的。注册设备后设备会话会立即保存到 MySQL保证后续可以恢复设备模拟器。第四Redis 只缓存查询结果。写入操作不会依赖 Redis 保存核心状态核心状态仍然以内存和 MySQL 为主。第五写入后统一清理多个缓存。这样可以避免用户在注册或遥测提交后看到旧的 topology、ledger 或 device list。24. 这部分可以如何写进论文或项目说明如果把这部分对应到论文或系统设计说明中可以写成为保证仿真系统在服务重启后的状态恢复能力系统将云端节点与各融合终端的 DAG 账本状态以终端快照形式持久化至 MySQL。每次设备注册或业务数据提交后系统收集云端及各融合终端的账本副本并通过异步持久化线程将其写入数据库从而降低数据库写入对在线请求处理的阻塞。同时系统将设备会话信息独立保存用于恢复设备标识、密钥和接入域等仿真状态。在查询侧系统利用 Redis 对健康检查、拓扑结构、账本视图和设备列表等高频读接口进行短时缓存并在注册或遥测写入后统一失效相关缓存以保证查询结果与最新账本状态一致。如果写得更简洁可以写成系统采用 MySQL 保存账本快照和设备会话并利用 Redis 缓存高频查询结果。写入请求更新内存账本后异步落库同时清理相关缓存查询请求优先读取缓存未命中时再从运行时状态组装结果从而兼顾状态可恢复性与查询效率。25. 本期小结这一期分析了 MySQL 持久化与 Redis 缓存机制。可以总结为五点第一MySQL 主要保存ledger_transactions和device_sessions。第二账本采用按 terminal 区分的快照式持久化。第三账本快照写入由AsyncLedgerPersistenceCoordinator异步完成减少请求阻塞。第四服务重启时系统会从 MySQL 恢复 cloud、fusion 账本和设备会话。第五Redis 用于缓存高频查询接口写入操作后会清理相关缓存避免返回旧数据。