swoole方案 智能电表/水表流式数据采集网关
2G网络问题 粘包TCP把两个包粘成一个给你一次 recv 收到两个完整包 断流一个包被切成两次才到第一次只有一半 Swoole 两种方案 open_length_check → 包头有长度字段Swoole自动攒够再交给你 ← 电表用这个 open_eof_split → 包末尾有结束标记比如 \r\n 三行配置粘包/断包全解决不用自己写缓冲区 协议格式11字节头数据体CRC16[Magic:2][Type:1][MeterID:4][Seq:2][BodyLen:2]|[Body:N]|[CRC16:2]0xABCD类型 表具编号 序号N2含CRC 读数JSON校验和---代码 composerrequirenelexa/buffer?php// composer require nelexa/bufferrequire__DIR__./vendor/autoload.php;useNelexa\Buffer\StringBuffer;constMAGIC0xABCD;constHDR11;// 固定头长度constT_REG0x01;// 注册constT_DATA0x02;// 上报读数constT_HB0x03;// 心跳constT_ACK0x04;// 确认// CRC16-Modbus电表行业标准校验functioncrc16(string$d):int{$c0xFFFF;for($i0;$istrlen($d);$i){$c^ord($d[$i]);for($j0;$j8;$j)$c($c1)?($c1)^0xA001:$c1;}return$c;}// nelexa/buffer 负责二进制读写functionencode(int$type,int$mid,int$seq,string$body):string{$bnewStringBuffer();$b-writeUnsignedShort(MAGIC);$b-writeUnsignedByte($type);$b-writeUnsignedInt($mid);$b-writeUnsignedShort($seq);$b-writeUnsignedShort(strlen($body)2);// BodyLen 数据 2字节CRCif($body!)$b-write($body);$b-writeUnsignedShort(crc16(substr((string)$b,2)));// CRC从Type开始算return(string)$b;}functiondecode(string$pkt):?array{if(strlen($pkt)HDR2)returnnull;$bnewStringBuffer($pkt);if($b-readUnsignedShort()!MAGIC)returnnull;$type$b-readUnsignedByte();$mid$b-readUnsignedInt();$seq$b-readUnsignedShort();$bodyLen$b-readUnsignedShort();$dataLenmax(0,$bodyLen-2);$data$dataLen0?$b-read($dataLen):;$crcRecv$b-readUnsignedShort();// CRC不对2G噪声导致数据损坏丢包而不断连if(crc16(substr($pkt,2,HDR-2$dataLen))!$crcRecv)returnnull;return[type$type,mid$mid,seq$seq,body$data?json_decode($data,true)??[]:[]];}// ─── 表具在线表 ───────────────────────────────────────────────$metersnewSwoole\Table(116);$meters-column(mid,Swoole\Table::TYPE_INT);$meters-column(type,Swoole\Table::TYPE_STRING,16);// electric/water/gas$meters-column(ts,Swoole\Table::TYPE_INT);$meters-create();functionredis():Swoole\Coroutine\Redis{static$pool[];$cidSwoole\Coroutine::getCid();if(!isset($pool[$cid])){$rnewSwoole\Coroutine\Redis();$r-connect(127.0.0.1,6379);$pool[$cid]$r;}return$pool[$cid];}$servernewSwoole\Server(0.0.0.0,9600);$server-set([worker_numswoole_cpu_num(),// ── 三行解决粘包/断包 ──────────────────────────────────────open_length_checktrue,package_length_typen,// uint16大端读BodyLen字段package_length_offset9,// BodyLen 在第9字节package_body_offsetHDR,// 头部11字节// ─────────────────────────────────────────────────────────package_max_length4096,// 电表包很小超了一定是异常heartbeat_check_interval60,heartbeat_idle_time180,// 2G网络慢给3分钟]);$server-on(receive,function($server,$fd,$rid,$pkt)use($meters){$pdecode($pkt);if($pnull){// CRC错/格式错2G噪声不断连等下一包echo[BAD_PKT] fd$fd丢弃\n;return;}[type$type,mid$mid,seq$seq,body$body]$p;switch($type){caseT_REG:$meters-set($fd,[mid$mid,type$body[type]??,tstime()]);redis()-hMSet(meter:$mid,[type$body[type]??,addr$body[addr]??,tstime()]);$server-send($fd,encode(T_ACK,$mid,$seq,json_encode([code0])));echo[REG] meter$mid{$body[type]}\n;break;caseT_DATA:$ts(int)($body[ts]??time());$val$body[reading]??0;$rredis();// 时序存储ZSetscore时间戳保留30天$r-zAdd(meter:$mid:log,$ts,json_encode([ts$ts,v$val,u$body[unit]??]));$r-zRemRangeByScore(meter:$mid:log,-inf,time()-86400*30);$r-hMSet(meter:$mid,[last_v$val,last_ts$ts]);$server-send($fd,encode(T_ACK,$mid,$seq,json_encode([code0])));echo[DATA] meter$midreading$val\n;break;caseT_HB:$row$meters-get($fd);if($row)$meters-set($fd,[...$row,tstime()]);$server-send($fd,encode(T_ACK,$mid,$seq));break;}});$server-on(close,function($server,$fd)use($meters){echo[OFFLINE] meter.($meters-get($fd)[mid]???).\n;$meters-del($fd);});$server-start();---模拟设备故意制造粘包测试?php// sim.phprequire__DIR__./vendor/autoload.php;useNelexa\Buffer\StringBuffer;Swoole\Coroutine\run(function(){$cnewSwoole\Coroutine\Client(SWOOLE_SOCK_TCP);// 模拟客户端不设 open_length_check手动发包测服务端$c-connect(127.0.0.1,9600,5);// 注册$c-send(encode(T_REG,10001,1,json_encode([typeelectric,addr北京市朝阳区XX号])));// 故意粘包一次 send 发两个完整数据包测服务端拆包$pkt1encode(T_DATA,10001,2,json_encode([tstime(),reading1234.56,unitkWh]));$pkt2encode(T_DATA,10001,3,json_encode([tstime()1,reading1234.67,unitkWh]));$c-send($pkt1.$pkt2);// 两包粘在一起发// 故意断流一个包分两次发测服务端拼包$pkt3encode(T_HB,10001,4);$halfstrlen($pkt3)/2;$c-send(substr($pkt3,0,$half));// 先发前半段Swoole\Coroutine::sleep(0.5);// 等一会模拟2G延迟$c-send(substr($pkt3,$half));// 再发后半段Swoole\Coroutine::sleep(1);echo服务端应收到3条独立消息不多不少\n;$c-close();});---解释 粘包/断包是什么 发两个包[REG包:30字节][DATA包:40字节]2G网络可能送达 情况A粘包[REGDATA共70字节]← 一次recv收到两个 情况B断流[REG前15字节]然后[REG后15字节DATA前10字节]然后[DATA剩30字节]不处理的话解析乱掉数据全错 三行配置为什么能解决package_length_offset9← BodyLen 在第9字节package_length_typen← 读2字节无符号整数package_body_offset11← 头部11字节 Swoole 收到数据后1.读第9-10字节拿到 BodyLen2.计算完整包11BodyLen 字节3.没攒够就等攒够了才调 onReceive 你只看到完整的包粘包/断包全是Swoole处理的CRC不对不断连2G信号差会产生噪声导致1-2字节出错下一包可能是好的。直接断连设备会重连2G重连很慢不如丢掉坏包等下一包 ZSet 存时序数据score时间戳ZRANGEBYSCOREmeter:10001:log start end 按时间段查历史读数O(logN)。30天自动裁剪不会无限增长 heartbeat_idle_time1802G网络延迟高、信号不稳给3分钟超时而不是默认60秒防止信号差时频繁断连重连---测试 php server.php# 运行模拟器php sim.php# 服务端输出三包独立解析粘包拆开了断流拼好了# [REG] meter10001 electric# [DATA] meter10001 reading1234.56# [DATA] meter10001 reading1234.67# 心跳的ACK回给设备不打印# 查Redis时序数据redis-cliZRANGEmeter:10001:log0-1WITHSCORESredis-cliHGETALLmeter:10001