go: Push Pull Pattern
项目结构/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:25 # User : geovindu # Product : GoLand # Project : godesginpattern # File : config.go */ package config import time // ZMQ 端口 go get github.com/zeromq/goczmq const ( PortRawMaterial 5555 PortProcess 5556 PortQuality 5557 PortSale 5558 ) // 超时毫秒 const ZmqTimeoutMs 3000 // 消息分隔符 const ( MsgSep | KvSep : ) // 连接地址 const ( LocalTcpAddr tcp://localhost BindAddr tcp://* ) // 珠宝品类常量 var JewelryCategory []string{ 钻石戒指, 黄金项链, 翡翠手镯, 铂金耳钉, 彩宝吊坠, } // 质检等级 var QualityGrade []string{ S级(收藏), A级(精品), B级(常规), C级(特价), } // 休眠间隔 const ( SleepRaw 2 * time.Second SleepProcess 1 * time.Second SleepQuality 500 * time.Millisecond SleepSale 1 * time.Second IdleSleep 100 * time.Millisecond // 无消息空转休眠 ) /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : logger.go */ package utils import ( fmt log os time ) func GetLogger(module string) *log.Logger { prefix : fmt.Sprintf([%s] , module) logger : log.New(os.Stdout, prefix, log.LstdFlags) return logger } // 格式化时间输出辅助 func NowStr() string { return time.Now().Format(2006-01-02 15:04:05) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : message.go */ package model import ( godesginpattern/pushpull/config strings ) // BaseMessage 统一消息打包解包 type BaseMessage struct{} // Pack map 转消息字符串 // 首字母大写跨包可访问 var MsgHelper BaseMessage{} func (b *BaseMessage) Pack(data map[string]string) string { var parts []string for k, v : range data { parts append(parts, kconfig.KvSepv) } return strings.Join(parts, config.MsgSep) } func (b *BaseMessage) Unpack(raw string) map[string]string { res : make(map[string]string) items : strings.Split(raw, config.MsgSep) for _, item : range items { kv : strings.SplitN(item, config.KvSep, 2) if len(kv) 2 { res[kv[0]] kv[1] } } return res } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:25 # User : geovindu # Product : GoLand # Project : godesginpattern # File : socket_factory.go */ package core import ( bufio fmt godesginpattern/pushpull/config godesginpattern/pushpull/utils io net sync time ) var logger utils.GetLogger(SocketFactory) type TcpPushSocket struct { listener net.Listener clients []net.Conn mu sync.Mutex } func NewTcpPushSocket(port int) *TcpPushSocket { listener, err : net.Listen(tcp, fmt.Sprintf(:%d, port)) if err ! nil { logger.Fatalf(创建Push监听失败: %v, err) } sock : TcpPushSocket{listener: listener} go sock.acceptLoop() logger.Printf(Push 绑定成功: tcp://localhost:%d, port) return sock } func (s *TcpPushSocket) acceptLoop() { for { conn, err : s.listener.Accept() if err ! nil { logger.Printf(Accept错误: %v, err) return } s.mu.Lock() s.clients append(s.clients, conn) s.mu.Unlock() logger.Printf(新客户端连接: %s, conn.RemoteAddr()) } } func (s *TcpPushSocket) Send(msg string) (int, error) { s.mu.Lock() defer s.mu.Unlock() for _, conn : range s.clients { conn.SetWriteDeadline(time.Now().Add(time.Duration(config.ZmqTimeoutMs) * time.Millisecond)) if _, err : conn.Write([]byte(msg \n)); err ! nil { logger.Printf(发送失败: %v, err) } } return len(msg), nil } func (s *TcpPushSocket) Close() { s.mu.Lock() defer s.mu.Unlock() for _, conn : range s.clients { conn.Close() } s.listener.Close() } type TcpPullSocket struct { conn net.Conn reader *bufio.Reader } func NewTcpPullSocket(port int) *TcpPullSocket { conn, err : net.Dial(tcp, fmt.Sprintf(localhost:%d, port)) if err ! nil { logger.Fatalf(连接Push失败: %v, err) } logger.Printf(Pull 连接成功: tcp://localhost:%d, port) return TcpPullSocket{ conn: conn, reader: bufio.NewReader(conn), } } var ErrConnectionClosed fmt.Errorf(connection closed) func (s *TcpPullSocket) Recv(nonBlocking bool) (string, error) { if nonBlocking { s.conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) } else { s.conn.SetReadDeadline(time.Now().Add(time.Duration(config.ZmqTimeoutMs) * time.Millisecond)) } msg, err : s.reader.ReadString(\n) if err ! nil { if nonBlocking (err bufio.ErrBufferFull || isTimeoutError(err)) { return , fmt.Errorf(resource temporarily unavailable) } if err io.EOF { return , ErrConnectionClosed } return , err } return msg[:len(msg)-1], nil } func (s *TcpPullSocket) Close() { s.conn.Close() } func isTimeoutError(err error) bool { if netErr, ok : err.(net.Error); ok netErr.Timeout() { return true } return false }/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : rawmaterial.go */ package service import ( godesginpattern/pushpull/config godesginpattern/pushpull/core godesginpattern/pushpull/model godesginpattern/pushpull/utils math/rand strconv time ) var rawLog utils.GetLogger(RawMaterialService) type RawMaterialService struct { pushSock *core.TcpPushSocket } func NewRawMaterialService() *RawMaterialService { return RawMaterialService{ pushSock: core.NewTcpPushSocket(config.PortRawMaterial), } } func (r *RawMaterialService) RunProduce(total int) { rawLog.Println(原料采购服务启动) rand.Seed(time.Now().UnixNano()) for i : 1; i total; i { cat : config.JewelryCategory[rand.Intn(len(config.JewelryCategory))] msg : model.MsgHelper.Pack(map[string]string{ type: 原料订单, order_id: RAW_ strconv.Itoa(i), category: cat, status: 已采购待加工, }) _, _ r.pushSock.Send(msg) rawLog.Printf(推送: %s, msg) time.Sleep(config.SleepRaw) } rawLog.Println(原料订单全部推送完成) time.Sleep(time.Second) r.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : process.go */ package service import ( godesginpattern/pushpull/config godesginpattern/pushpull/core godesginpattern/pushpull/model godesginpattern/pushpull/utils math/rand strings time ) var procLog utils.GetLogger(Process) type ProcessService struct { pullSock *core.TcpPullSocket pushSock *core.TcpPushSocket done chan struct{} } func NewProcessService() *ProcessService { return ProcessService{ pullSock: core.NewTcpPullSocket(config.PortRawMaterial), pushSock: core.NewTcpPushSocket(config.PortProcess), done: make(chan struct{}), } } func (p *ProcessService) RunPipeline() { procLog.Println(加工车间启动) for { select { case -p.done: procLog.Println(加工车间停止) return default: msg, err : p.pullSock.Recv(true) if err ! nil { if err core.ErrConnectionClosed { procLog.Println(加工车间连接关闭) return } if err.Error() resource temporarily unavailable { time.Sleep(config.IdleSleep) continue } procLog.Printf(接收错误: %v, err) time.Sleep(config.IdleSleep) continue } data : model.MsgHelper.Unpack(msg) procLog.Printf(收到: %s, msg) time.Sleep(config.SleepProcess time.Duration(rand.Intn(2000))*time.Millisecond) sendMsg : model.MsgHelper.Pack(map[string]string{ type: 成品珠宝, order_id: strings.Replace(data[order_id], RAW, FIN, 1), category: data[category], status: 已加工待质检, }) _, _ p.pushSock.Send(sendMsg) procLog.Printf(加工完成: %s, sendMsg) } } } func (p *ProcessService) Stop() { close(p.done) p.pullSock.Close() p.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : quality.go */ package service import ( godesginpattern/pushpull/config godesginpattern/pushpull/core godesginpattern/pushpull/model godesginpattern/pushpull/utils math/rand time ) var qualityLog utils.GetLogger(Quality) type QualityService struct { pullSock *core.TcpPullSocket pushSock *core.TcpPushSocket done chan struct{} } func NewQualityService() *QualityService { return QualityService{ pullSock: core.NewTcpPullSocket(config.PortProcess), pushSock: core.NewTcpPushSocket(config.PortQuality), done: make(chan struct{}), } } func (q *QualityService) RunPipeline() { qualityLog.Println(质检中心启动) for { select { case -q.done: qualityLog.Println(质检中心停止) return default: msg, err : q.pullSock.Recv(true) if err ! nil { if err core.ErrConnectionClosed { qualityLog.Println(质检中心连接关闭) return } if err.Error() resource temporarily unavailable { time.Sleep(config.IdleSleep) continue } qualityLog.Printf(接收错误: %v, err) time.Sleep(config.IdleSleep) continue } data : model.MsgHelper.Unpack(msg) qualityLog.Printf(收到: %s, msg) time.Sleep(config.SleepQuality time.Duration(rand.Intn(1500))*time.Millisecond) sendMsg : model.MsgHelper.Pack(map[string]string{ type: 质检成品, order_id: data[order_id], category: data[category], grade: config.QualityGrade[rand.Intn(len(config.QualityGrade))], status: 可销售, }) _, _ q.pushSock.Send(sendMsg) qualityLog.Printf(质检完成: %s, sendMsg) } } } func (q *QualityService) Stop() { close(q.done) q.pullSock.Close() q.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : sale.go */ package service import ( godesginpattern/pushpull/config godesginpattern/pushpull/core godesginpattern/pushpull/model godesginpattern/pushpull/utils math/rand strconv strings time ) var saleLog utils.GetLogger(Sale) type SaleService struct { pullSock *core.TcpPullSocket pushSock *core.TcpPushSocket done chan struct{} } func NewSaleService() *SaleService { return SaleService{ pullSock: core.NewTcpPullSocket(config.PortQuality), pushSock: core.NewTcpPushSocket(config.PortSale), done: make(chan struct{}), } } func (s *SaleService) RunPipeline() { saleLog.Println(销售部启动) for { select { case -s.done: saleLog.Println(销售部停止) return default: msg, err : s.pullSock.Recv(true) if err ! nil { if err core.ErrConnectionClosed { saleLog.Println(销售部连接关闭) return } if err.Error() resource temporarily unavailable { time.Sleep(config.IdleSleep) continue } saleLog.Printf(接收错误: %v, err) time.Sleep(config.IdleSleep) continue } data : model.MsgHelper.Unpack(msg) saleLog.Printf(收到: %s, msg) time.Sleep(config.SleepSale time.Duration(rand.Intn(1500))*time.Millisecond) sendMsg : model.MsgHelper.Pack(map[string]string{ type: 销售完成, order_id: strings.Replace(data[order_id], FIN, SALE, 1), category: data[category], price: strconv.Itoa(rand.Intn(49000)1000) 元, status: 已售出, }) _, _ s.pushSock.Send(sendMsg) saleLog.Printf(销售完成: %s, sendMsg) } } } func (s *SaleService) Stop() { close(s.done) s.pullSock.Close() s.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : aftersale.go */ package service import ( godesginpattern/pushpull/config godesginpattern/pushpull/core godesginpattern/pushpull/model godesginpattern/pushpull/utils time ) var afterLog utils.GetLogger(AfterSale) type AfterSaleService struct { pullSock *core.TcpPullSocket done chan struct{} } func NewAfterSaleService() *AfterSaleService { return AfterSaleService{ pullSock: core.NewTcpPullSocket(config.PortSale), done: make(chan struct{}), } } func (a *AfterSaleService) RunConsumer() { afterLog.Println(售后维保启动) for { select { case -a.done: afterLog.Println(售后维保停止) return default: msg, err : a.pullSock.Recv(true) if err ! nil { if err core.ErrConnectionClosed { afterLog.Println(售后维保连接关闭) return } if err.Error() resource temporarily unavailable { time.Sleep(config.IdleSleep) continue } afterLog.Printf(接收错误: %v, err) time.Sleep(config.IdleSleep) continue } data : model.MsgHelper.Unpack(msg) afterLog.Printf(收到单据: %s, msg) afterLog.Printf(✅ 维保生效 | 品类:%s | 订单:%s, data[category], data[order_id]) } } } func (a *AfterSaleService) Stop() { close(a.done) a.pullSock.Close() }调用/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : pushpullbll.go */ package bll import ( godesginpattern/pushpull/service godesginpattern/pushpull/utils os os/signal syscall time ) var ppLog utils.GetLogger(pushpullbll) func PushPullMain() { sig : make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) raw : service.NewRawMaterialService() proc : service.NewProcessService() quality : service.NewQualityService() sale : service.NewSaleService() after : service.NewAfterSaleService() go func() { -sig ppLog.Println(退出中...) proc.Stop() quality.Stop() sale.Stop() after.Stop() os.Exit(0) }() go proc.RunPipeline() time.Sleep(200 * time.Millisecond) go quality.RunPipeline() time.Sleep(200 * time.Millisecond) go sale.RunPipeline() time.Sleep(200 * time.Millisecond) go after.RunConsumer() time.Sleep(200 * time.Millisecond) ppLog.Println(✅ 珠宝 Push-Pull 流水线启动完成) raw.RunProduce(5) time.Sleep(5 * time.Second) proc.Stop() quality.Stop() sale.Stop() after.Stop() ppLog.Println(✅ 珠宝 Push-Pull 流水线运行结束) }输出