在政务热线、城市治理、物业管理等场景中经常会出现多个居民集中投诉同一个对象的问题例如投诉同一商家多项不合规运营行为投诉同一工地夜间施工、扬尘污染等但传统工单系统通常是一单一处理缺乏自动聚类、自动识别能力导致重复处理同类问题无法发现“群体投诉热点”无法定位真正的「被投诉对象」因此我设计并实现了一套自动化 pipeline目标是✅ 自动发现「多人、多次、集中」投诉同一对象的工单✅ 自动聚类 语义理解 AI 提取被投诉对象✅ 支持规模化、自动化、可视化分析我充分利用地理信息和语义信息进行二次聚类再利用大模型的语意理解进行细则判定。用地理位置聚类 → 用语义向量再聚类 → 用大模型提取投诉对象一、基于 PostGIS 的地理聚类多人同一地点sql text( SELECT id, content, purpose, community, public.ST_ClusterDBSCAN( ST_Transform(shape, 3857), 50, 2 ) OVER () AS cluster_id FROM system.work_order WHERE type 投诉举报 AND street IS NOT NULL AND community IS NOT NULL; )把「空间上非常接近的投诉」先聚成一个 cluster。例如同一小区、同一垃圾点、同一商家门前二、Embedding 向量化支持 GPU 批量cluster 中的工单做语义向量Embedding生成模型选择MODEL_NAME sentence-transformers/all-MiniLM-L6-v2每条投诉文字 community content purpose向量生成函数def encode(texts): inputs tokenizer(texts, paddingTrue, truncationTrue, return_tensorspt) inputs {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs model(**inputs) embeddings outputs.last_hidden_state.mean(dim1) return embeddings.cpu().numpy()然后写回数据库字段embedding double precision[]三、同一地点的语义聚类子类sub_cluster在同一「地理cluster」里面再按语义做 DBSCAN 聚类DBSCAN(eps0.2, min_samples2, metriccosine)for cid, group in df.groupby(cluster_id): embeddings np.stack(group[embedding].to_list()) clustering DBSCAN(...).fit(embeddings) sub_ids clustering.labels_ df.loc[group.index, sub_cluster] sub_ids输出表system.work_order_multi_person_clusters✅ 至此完成同一地点 同一对象 的投诉聚合SELECT cluster_id, sub_cluster, content FROM system.work_order_multi_person_clusters ORDER BY cluster_id, sub_cluster;四、用 LLM 自动提取「被投诉对象」(cluster_id sub_cluster)内容拼接让LLM总结需求经理告诉我更需要知道重点关注对象所以提示词设置为prompt f 请分析以下投诉提取被投诉对象 并用一句话概括投诉对象是什么 投诉内容如下 {combined_text} MODEL deepseek-v3-250324五、完整代码 1 extract_complaints_and_clusters.py 功能 1. 从 system.work_order 提取“投诉举报”类工单 2. 基于地理位置 shape 进行空间聚类 (ST_ClusterDBSCAN) 3. 生成 cluster_id 并保存到 system.work_order_multi_person_complaints 字段id, cluster_id, community, content, purpose import pandas as pd from sqlalchemy import create_engine, text DB_CONFIG { host: localhost, port: 5432, database: wujiang_system, user: postgres, password: 12345 } CONN_STRING fpostgresql://{DB_CONFIG[user]}:{DB_CONFIG[password]} \ f{DB_CONFIG[host]}:{DB_CONFIG[port]}/{DB_CONFIG[database]} def extract_clusters(engine): sql text( SELECT id, content, purpose, community, public.ST_ClusterDBSCAN(shape, 10, 1) OVER () AS cluster_id FROM system.work_order WHERE type 投诉举报 AND level IN (小巷,门牌号,门址,公交地铁站点,道路,兴趣点,村庄,道路交叉路口,住宅区) AND street IS NOT NULL AND community IS NOT NULL; ) df pd.read_sql(sql, engine) df df[[id, cluster_id, community, content, purpose]] df.to_sql( work_order_multi_person_complaints, engine, schemasystem, if_existsreplace, indexFalse ) print(f✅ 已保存 {len(df)} 条聚类工单到 system.work_order_multi_person_complaints) return df def main(): engine create_engine(CONN_STRING) extract_clusters(engine) if __name__ __main__: main() 2 generate_embeddings_auto.py 功能 1. 对 system.work_order_multi_person_complaints 表中 embedding 为空的记录生成向量 2. 支持 GPU 加速与批量处理 3. 自动创建 embedding 列如果不存在 import os import psycopg2 import torch from transformers import AutoTokenizer, AutoModel DB_CONFIG { host: localhost, port: 5432, database: wujiang_system, user: postgres, password: 12345 } TABLE_NAME system.work_order_multi_person_complaints device torch.device(cuda if torch.cuda.is_available() else cpu) print(f⚡ 使用设备: {device}) MODEL_NAME sentence-transformers/all-MiniLM-L6-v2 MODEL_CACHE_DIR ./models/all-MiniLM-L6-v2 os.makedirs(MODEL_CACHE_DIR, exist_okTrue) try: tokenizer AutoTokenizer.from_pretrained(MODEL_CACHE_DIR, local_files_onlyTrue) model AutoModel.from_pretrained(MODEL_CACHE_DIR, local_files_onlyTrue).to(device) print(✅ 本地模型加载成功离线模式) except: tokenizer AutoTokenizer.from_pretrained(MODEL_NAME, cache_dirMODEL_CACHE_DIR) model AutoModel.from_pretrained(MODEL_NAME, cache_dirMODEL_CACHE_DIR).to(device) print(✅ 模型下载并缓存完成) model.eval() def encode(texts): inputs tokenizer(texts, paddingTrue, truncationTrue, return_tensorspt) inputs {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs model(**inputs) embeddings outputs.last_hidden_state.mean(dim1) return embeddings.cpu().numpy() def main(batch_size64): conn psycopg2.connect(**DB_CONFIG) cur conn.cursor() cur.execute(f SELECT column_name FROM information_schema.columns WHERE table_schemasystem AND table_name{TABLE_NAME.split(.)[-1]}; ) columns [row[0] for row in cur.fetchall()] if embedding not in columns: cur.execute(fALTER TABLE {TABLE_NAME} ADD COLUMN embedding double precision[];) conn.commit() print(✅ embedding 列创建完成) processed 0 while True: cur.execute(f SELECT id, community, content, purpose FROM {TABLE_NAME} WHERE embedding IS NULL ORDER BY id LIMIT %s; , (batch_size,)) rows cur.fetchall() if not rows: break ids, texts [], [] for r in rows: _id, community, content, purpose r combined_text 。.join([str(x).strip() for x in [community, content, purpose] if x and str(x).strip()]) if not combined_text: continue ids.append(_id) texts.append(combined_text) if not ids: continue vectors encode(texts) for i, vec in zip(ids, vectors): cur.execute(fUPDATE {TABLE_NAME} SET embedding %s WHERE id %s;, (vec.tolist(), i)) conn.commit() processed len(ids) print(f✅ 已处理 {processed} 条空 embedding) cur.close() conn.close() print( 增量 embedding 生成完成) if __name__ __main__: main(batch_size64) 3 detect_multi_person_clusters.py 功能 1. 读取 system.work_order_multi_person_complaints 的 embedding 2. 在每个地理 cluster 内做 embedding 聚类生成 sub_cluster 3. 保存结果到 system.work_order_multi_person_clusters import pandas as pd import numpy as np import json from sqlalchemy import create_engine from sklearn.cluster import DBSCAN DB_CONFIG { host: localhost, port: 5432, database: wujiang_system, user: postgres, password: 12345 } CONN_STRING fpostgresql://{DB_CONFIG[user]}:{DB_CONFIG[password]} \ f{DB_CONFIG[host]}:{DB_CONFIG[port]}/{DB_CONFIG[database]} engine create_engine(CONN_STRING) EPS 0.2 MIN_SAMPLES 2 def main(): df pd.read_sql(SELECT * FROM system.work_order_multi_person_complaints WHERE embedding IS NOT NULL, engine) # 如果 embedding 原本是文本 JSON则先加载成 np.array df[embedding] df[embedding].apply(lambda x: np.array(json.loads(x)) if isinstance(x, str) else np.array(x)) df[sub_cluster] -1 for cid, group in df.groupby(cluster_id): embeddings np.stack(group[embedding].to_list()) clustering DBSCAN(epsEPS, min_samplesMIN_SAMPLES, metriccosine).fit(embeddings) sub_ids clustering.labels_ # 给噪声点分配新的唯一 sub_cluster max_sub df[sub_cluster].max() 1 sub_ids[sub_ids -1] range(max_sub, max_sub (sub_ids -1).sum()) df.loc[group.index, sub_cluster] sub_ids # 存入数据库前embedding 转成 JSON 字符串 df[embedding] df[embedding].apply(lambda x: json.dumps(x.tolist())) df.to_sql(work_order_multi_person_clusters, engine, schemasystem, if_existsreplace, indexFalse) print(f✅ 共保存 {len(df)} 条记录含 sub_cluster到 system.work_order_multi_person_clusters) if __name__ __main__: main() 4 llm_verify_complaint_objects.py 功能 1. 读取 system.work_order_multi_person_clusters 2. 对每个 cluster sub_cluster 调用 LLM 提取被投诉对象和问题 3. 保存到 system.work_order_multi_person_objects import pandas as pd from sqlalchemy import create_engine import requests import json DB_CONFIG { host: localhost, port: 5432, database: wujiang_system, user: postgres, password: 12345 } engine create_engine(fpostgresql://{DB_CONFIG[user]}:{DB_CONFIG[password]} f{DB_CONFIG[host]}:{DB_CONFIG[port]}/{DB_CONFIG[database]}) INPUT_TABLE work_order_multi_person_clusters OUTPUT_TABLE work_order_multi_person_objects API_URL https://ark.cn-beijing.volces.com/api/v3/chat/completions API_KEY 9b6b8bd0- -4e53-ae63-91f89dbbddb8 MODEL deepseek-v3-250324 df pd.read_sql(fSELECT * FROM system.{INPUT_TABLE}, engine) if sub_cluster not in df.columns: raise ValueError(sub_cluster 列不存在请先运行 detect_multi_person_clusters.py) def analyze_cluster(texts): combined_text \n.join(texts) prompt f请分析以下投诉提取被投诉对象\n{combined_text} headers {Content-Type: application/json, Authorization: fBearer {API_KEY}} payload {model: MODEL, messages: [{role: user, content: prompt}], max_tokens: 200} try: resp requests.post(API_URL, headersheaders, datajson.dumps(payload), timeout60) resp.raise_for_status() content resp.json()[choices][0][message][content].strip() return content except: return 未知对象 results [] for (cid, sub_id), group in df.groupby([cluster_id, sub_cluster]): texts group[content].tolist() obj analyze_cluster(texts) results.append({cluster_id: cid, sub_cluster: sub_id, object: obj, num_records: len(group)}) print(fcluster_id{cid} | sub_cluster{sub_id} | 对象{obj} | 数量{len(group)}) df_out pd.DataFrame(results) df_out.to_sql(OUTPUT_TABLE, engine, schemasystem, if_existsreplace, indexFalse) print(f✅ 共保存 {len(df_out)} 条结果到 {OUTPUT_TABLE})run_all.py import subprocess scripts [ extract_complaints_and_clusters.py, generate_embeddings_auto.py, detect_multi_person_clusters.py, llm_verify_complaint_objects.py ] for s in scripts: print(f\n 正在运行 {s} ) subprocess.run([python, s], checkTrue) print(f✅ {s} 完成) SELECT cluster_id, sub_cluster, content FROM system.work_order_multi_person_clusters ORDER BY cluster_id, sub_cluster;