1. 项目概述从信息洪流到精准推送的自动化管道在当下的技术信息生态里像 X原 Twitter这样的平台无疑是挖掘技术趋势、工具发布和高质量教程的富矿。但问题在于从海量的推文中筛选出真正有价值的内容就像在沙子里淘金既耗时又难以规模化。手动筛选不仅效率低下而且随着关注列表的扩大信息过载会越来越严重。为了解决这个痛点我构建了一个名为XScraper的自动化管道。它的核心目标很明确自动抓取基于特定主题标签hashtag的推文利用人工智能对内容进行智能过滤和摘要最后将精选出的内容打包成一份简洁的新闻简报通过电子邮件定时发送给订阅者。这不仅仅是一个简单的爬虫脚本而是一个融合了现代网页抓取、自然语言处理和任务调度的完整工程化项目。对于开发者而言这个项目是一个绝佳的实战演练场。它迫使你走出“静态网页解析”的舒适区直面动态单页应用SPA、反爬虫机制、异步编程、API集成以及系统可靠性等现实挑战。通过拆解这个项目你不仅能学会如何使用 Playwright 这样的现代浏览器自动化工具更能理解如何将不同的技术模块抓取、处理、分发优雅地组合成一个健壮、可维护的自动化系统。接下来我将从架构设计开始深入每个模块的实现细节、避坑经验并分享如何将这样一个项目推向生产级别的质量。2. 整体架构与设计思路拆解一个可靠的系统始于清晰的架构。XScraper 的设计遵循了“单一职责”和“高内聚、低耦合”的原则将复杂流程分解为几个独立的模块每个模块只做一件事并把它做好。2.1 核心模块职责划分整个系统基于 Python 3.11 构建主要分为四个核心模块Scraper (src/scraper.py)这是系统的“采集器”。它的唯一任务就是模拟真实用户在 X.com 上导航、滚动、抓取推文数据。它不关心数据如何处理只负责高效、稳定地获取原始数据。AI Processor (src/ai_processor.py)这是系统的“大脑”。它接收 Scraper 抓取的原始推文调用大语言模型LLMAPI这里使用 OpenRouter 作为接口配合 LangChain 框架对推文进行摘要、相关性打分和过滤。它的职责是将杂乱无章的文本转化为结构化的、有价值的信息。Email Sender (src/email_sender.py)这是系统的“分发器”。它将 AI Processor 处理后的精选内容按照预设的模板渲染成美观的 HTML 格式然后通过电子邮件服务如 Resend API发送给指定的订阅者列表。Scheduler (src/scheduler.py)这是系统的“指挥官”。它使用 APScheduler 这类任务调度库将上述三个模块的串联执行过程封装成一个任务并设定触发规则例如每天上午 9 点运行一次实现全流程的自动化。这种架构的优势非常明显。首先可测试性极强。你可以单独测试 Scraper 的抓取逻辑而无需启动 AI 服务或发送真实邮件。其次可维护性高。如果未来 X.com 的页面结构发生巨大变化你只需要修改Scraper模块如果想更换 AI 服务商或邮件服务商也只需修改对应的单个模块其他部分完全不受影响。最后可扩展性好。例如如果你想增加将内容同步到 Slack 或 Discord 的功能只需要新增一个Notifier模块并在调度器中调用即可不会破坏现有流程。2.2 技术选型背后的逻辑为什么选择这些技术栈每一个选择背后都有其权衡和理由。Playwright vs. BeautifulSoup/Selenium这是最关键的选择。BeautifulSoup 只是一个 HTML 解析库无法执行 JavaScript对于 X.com 这种高度动态的 SPA 完全无能为力。Selenium 虽然可以驱动浏览器但其默认的同步 API 在性能上存在瓶颈且生态中的异步支持不如 Playwright 成熟和原生。Playwright 由微软开发提供了一流的异步 API对现代浏览器特性如网络拦截、移动端模拟支持更好并且通过playwright-stealth等插件能更有效地规避反爬检测。在需要与复杂动态网页交互的场景下Playwright 是目前 Python 生态中的首选。OpenRouter LangChain直接调用 OpenAI 或 Anthropic 的 API 当然可以但 OpenRouter 作为一个聚合层提供了访问多个主流 LLM 模型如 GPT-4, Claude, Gemini的统一接口这在模型价格、速率限制或服务稳定性出现波动时给了我们快速切换的灵活性。LangChain 则提供了构建 LLM 应用的高层抽象比如方便的提示词模板、链式调用Chain和输出解析器Output Parser能让我们更专注于业务逻辑“过滤并摘要推文”而不是繁琐的 API 调用和结果处理。Resend for Email发送邮件看似简单实则陷阱很多包括邮件进入垃圾箱、DKIM/SPF 配置复杂等。Resend 是一个开发者优先的邮件 API 服务它极大地简化了事务性邮件的发送流程提供了清晰的日志、开箱即用的域名认证和漂亮的 HTML 模板支持。相比于自建 SMTP 服务器或使用传统企业邮箱的 APIResend 让我们能更快地实现一个可靠的分发环节。APScheduler for Scheduling对于这种周期性的后台任务我们当然可以用操作系统的 Crontab。但 APScheduler 作为一个纯 Python 库能让任务调度逻辑与我们的应用代码无缝集成便于进行更复杂的调度控制如基于特定事件触发、任务持久化将任务状态存入数据库以及在分布式环境下的协调。它让我们的应用更加“自包含”。注意技术选型没有银弹。这里的组合是基于“快速构建一个稳定、可维护的自动化管道”这一目标。如果你的场景是超大规模、分布式抓取可能需要考虑 Scrapy 框架配合 Playwright如果对成本极其敏感或许可以探索本地部署的小模型如通过 Ollama。架构设计就是一系列权衡后的结果。3. 核心模块深度解析与实操要点3.1 Scraper 模块在动态网页的雷区中穿行Scraper 模块是整个项目的基础也是最容易出问题的部分。它的核心挑战在于如何让一个自动化脚本看起来像一个真实的人类用户从而持续、稳定地获取数据。3.1.1 浏览器初始化与反检测配置直接使用 Playwright 的默认无头headless模式访问 X.com几乎立刻会被识别并拦截。因此初始化配置至关重要。# 示例带有反检测配置的浏览器上下文初始化 from playwright.async_api import async_playwright from playwright_stealth import stealth_async async def create_stealth_context(): playwright await async_playwright().start() # 使用 Chromium因其与 Chrome 相似度高规避检测相对成熟 browser await playwright.chromium.launch( headlessTrue, # 生产环境可以用 True调试时可设为 False args[ --disable-blink-featuresAutomationControlled, # 关键禁用自动化控制标志 --no-sandbox, --disable-dev-shm-usage ] ) # 创建上下文模拟特定设备和区域 context await browser.new_context( viewport{width: 1920, height: 1080}, localees-ES, # 模拟西班牙语用户 user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ... # 真实 UA ) page await context.new_page() # 应用 stealth 插件隐藏更多自动化痕迹 await stealth_async(page) return browser, context, page关键点解析--disable-blink-featuresAutomationControlled这是最关键的参数之一。Chrome/Chromium 会设置navigator.webdriver属性为true许多网站通过检测这个属性来识别自动化工具。这个参数可以将其禁用或伪装。playwright-stealth这个第三方插件做了大量额外工作比如覆盖某些 JavaScript 属性、修改 WebGL 指纹等进一步降低被检测的概率。User-Agent 和 Viewport使用一个常见的、真实的桌面版 User-Agent并设置一个合理的视窗大小避免使用默认的移动端或怪异尺寸。3.1.2 会话管理Cookie 注入的艺术对于需要登录才能访问大量内容的网站如 X模拟登录流程输入用户名、密码非常脆弱极易触发验证码。更稳健的方法是使用“会话持久化”。操作流程手动获取 Cookie在你的常规浏览器Chrome/Firefox中登录 X.com。导出 Cookie使用浏览器扩展如 “EditThisCookie” 或 “Cookie-Editor”将当前站点的 Cookie 导出为 JSON 格式。保存文件将导出的 JSON 内容保存为项目根目录下的cookies.json。代码加载在 Scraper 初始化后加载这个文件并将 Cookie 注入浏览器上下文。import json async def load_cookies_and_inject(context, cookie_pathcookies.json): with open(cookie_path, r) as f: cookies json.load(f) # 确保 cookie 的 domain 字段正确Playwright 需要它 for cookie in cookies: # 有些导出工具会包含 expirationDate秒Playwright 需要 expires秒 if expirationDate in cookie: cookie[expires] cookie[expirationDate] del cookie[expirationDate] await context.add_cookies(cookies)实操心得Cookie 会过期。你需要建立一个机制来定期更新cookies.json文件。一个半自动化的方法是写一个简单的脚本提示你手动登录后自动抓取并保存新的 Cookie。绝对不要将包含真实登录凭证的 Cookie 文件提交到公共代码仓库3.1.3 无限滚动与数据提取策略X 采用无限滚动加载内容。我们的脚本需要模拟人类滚动行为。async def infinite_scroll_and_collect(page, max_tweets100): seen_urls set() tweets_data [] scroll_attempts_without_new 0 max_attempts_without_new 3 while len(tweets_data) max_tweets and scroll_attempts_without_new max_attempts_without_new: # 1. 滚动到底部 await page.evaluate(window.scrollTo(0, document.body.scrollHeight)) # 2. 等待新内容加载 await asyncio.sleep(random.uniform(2.0, 4.0)) # 随机等待更拟人 # 3. 定位推文元素 tweet_elements await page.query_selector_all(article[data-testidtweet]) current_batch_urls [] for element in tweet_elements: # 提取推文唯一标识如包含推文ID的链接 link_element await element.query_selector(a[href*/status/]) if link_element: tweet_url await link_element.get_attribute(href) full_url fhttps://x.com{tweet_url} if tweet_url.startswith(/) else tweet_url if full_url not in seen_urls: seen_urls.add(full_url) current_batch_urls.append(full_url) # 提取文本、时间、互动数据等 tweet_text await extract_text(element) tweet_metrics await parse_interactions(element) if tweet_metrics[total] MIN_INTERACTION_THRESHOLD: # 业务过滤 tweets_data.append({ url: full_url, text: tweet_text, metrics: tweet_metrics, # ... 其他字段 }) # 4. 判断是否有新内容 if not current_batch_urls: scroll_attempts_without_new 1 else: scroll_attempts_without_new 0 # 重置计数器 # 5. 避免过快滚动随机延迟 await asyncio.sleep(random.uniform(1.0, 2.0)) return tweets_data关键点解析随机延迟固定的sleep时间容易被识别。在滚动和请求间加入随机延迟是基本操作。去重基于推文 URL 或 ID 进行去重是必须的因为滚动可能加载出重复内容。终止条件不能无限滚动。我们设置两个终止条件1) 收集到足够数量的推文2) 连续滚动 N 次都没有发现新内容说明已到页面底部或加载失败。业务逻辑内嵌在提取循环中直接进行初步过滤如互动数阈值可以减少后续处理的数据量。3.1.4 互动数据解析的魔鬼细节推文的互动数喜欢、转发、浏览的文本表示可能是 “1.2K”、“3.5M”。直接进行数值比较会出错必须解析。def parse_interaction_count(interaction_text: str) - int: 将 1.2K, 3.5M 这样的字符串转换为整数 if not interaction_text or interaction_text.strip() : return 0 text interaction_text.strip().upper() multiplier 1 if text.endswith(K): multiplier 1000 text text[:-1] elif text.endswith(M): multiplier 1_000_000 text text[:-1] elif text.endswith(B): multiplier 1_000_000_000 text text[:-1] try: # 处理可能的小数点如 1.2K - 1.2 * 1000 1200 number float(text.replace(,, )) return int(number * multiplier) except ValueError: # 如果解析失败记录日志并返回0 logging.warning(f无法解析互动数文本: {interaction_text}) return 0这个函数需要被稳健地集成到_parse_interactions方法中处理每个互动指标喜欢、转发、回复、浏览。3.2 AI Processor 模块从噪声中提取信号抓取到原始推文后下一步是让 AI 来充当“编辑”筛选出最相关、最有价值的内容。3.2.1 提示词工程与链式调用我们使用 LangChain 的LLMChain来组织我们的请求。核心在于设计一个好的提示词Prompt。from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain_community.chat_models import ChatOpenAI # 通过 OpenRouter 兼容 # 定义提示词模板 SUMMARIZE_AND_FILTER_PROMPT_TEMPLATE 你是一个资深技术编辑负责从一系列推文中筛选出最具技术深度和分享价值的条目。 请根据以下推文信息执行以下任务 1. **判断相关性**此推文是否主要讨论编程、软件开发、人工智能、DevOps、新工具发布或深度技术教程如果是纯新闻、八卦、情绪化发言或与科技无关则直接标记为不相关。 2. **生成摘要**如果相关请为这条推文生成一个简洁、准确的中文摘要最多100字抓住其核心技术点或分享的价值。 3. **评估价值等级**根据其信息量、独特性和实用性给出一个1-5分的评分5分最高。 请以以下 JSON 格式输出且仅输出 JSON {{ is_relevant: true/false, summary: 生成的摘要文本如果不相关则为空字符串, score: 1-5的整数 }} 推文文本 {tweet_text} 推文互动数据喜欢/转发/浏览 {interaction_metrics} PROMPT PromptTemplate.from_template(SUMMARIZE_AND_FILTER_PROMPT_TEMPLATE) # 初始化 LLM (通过 OpenRouter) llm ChatOpenAI( model_nameopenai/gpt-3.5-turbo, # OpenRouter 的模型路径 openai_api_basehttps://openrouter.ai/api/v1, openai_api_keyos.getenv(OPENROUTER_API_KEY), temperature0.1, # 低温度保证输出稳定性 ) chain LLMChain(llmllm, promptPROMPT)关键点解析结构化输出要求 LLM 返回严格的 JSON 格式便于我们用json.loads()解析这是实现可靠自动化的关键。提供上下文除了推文文本我们还提供了互动数据。这相当于给了 AI 一个“群众投票”的参考有助于它判断内容的热度或争议性。温度Temperature设置为较低值如 0.1是为了让输出更加确定和一致减少随机性这对于自动化流程非常重要。错误处理LLM API 调用可能失败网络、限速。必须用try...except包裹并实现重试机制如tenacity库。3.2.2 批量处理与速率限制如果抓取了上百条推文逐条调用 API 会非常慢且可能触发速率限制。我们需要批量处理并妥善管理 API 调用。import asyncio from tenacity import retry, stop_after_attempt, wait_exponential class AIProcessor: def __init__(self, llm_chain, max_batch_size10, requests_per_minute30): self.chain llm_chain self.max_batch_size max_batch_size # 每批处理条数 self.delay_between_batches 60.0 / requests_per_minute # 计算延迟 retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min4, max10)) async def _process_single_tweet(self, tweet_data): 处理单条推文包含重试逻辑 try: response await self.chain.arun( tweet_texttweet_data[text], interaction_metricsstr(tweet_data[metrics]) ) result json.loads(response.strip()) return {**tweet_data, **result} # 合并原数据和AI处理结果 except json.JSONDecodeError: logging.error(fAI 返回了非 JSON 格式: {response}) return {**tweet_data, is_relevant: False, summary: , score: 0} except Exception as e: logging.error(f处理推文失败: {e}) raise # 触发重试 async def process_batch(self, all_tweets): 批量处理所有推文控制速率 relevant_tweets [] for i in range(0, len(all_tweets), self.max_batch_size): batch all_tweets[i:i self.max_batch_size] tasks [self._process_single_tweet(tweet) for tweet in batch] batch_results await asyncio.gather(*tasks, return_exceptionsTrue) for result in batch_results: if isinstance(result, Exception): logging.error(f批处理中出现异常: {result}) continue if result.get(is_relevant) and result.get(score, 0) 3: # 阈值过滤 relevant_tweets.append(result) # 批次间延迟遵守速率限制 if i self.max_batch_size len(all_tweets): await asyncio.sleep(self.delay_between_batches) # 按评分排序返回最优质的内容 relevant_tweets.sort(keylambda x: x[score], reverseTrue) return relevant_tweets[:10] # 返回前10条最相关的这个AIProcessor类封装了与 LLM 交互的所有复杂性批处理提升效率、速率限制避免被封、重试机制增强鲁棒性、结果解析与合并。3.3 Email Sender 与 Scheduler 模块完成闭环3.3.1 生成专业的 HTML 简报EmailSender模块的任务是将 AI 筛选后的结构化数据渲染成视觉上过得去的 HTML 邮件。这里不建议从零写 HTML可以使用 Jinja2 模板引擎。# newsletter_template.html html body h1 你的技术精选简报 - {{ date }}/h1 p本期从 {{ total_scanned }} 条推文中为你筛选出 {{ total_selected }} 条优质内容。/p {% for tweet in tweets %} div classtweet-item h3a href{{ tweet.url }} 推文链接/a | 评分: {{ tweet.score }}/5/h3 blockquote{{ tweet.text[:200] }}.../blockquote pstrongAI摘要/strong{{ tweet.summary }}/p psmall互动: {{ tweet.metrics.likes }} | {{ tweet.metrics.retweets }} | ️ {{ tweet.metrics.views }}/small/p hr /div {% endfor %} /body /htmlfrom jinja2 import Environment, FileSystemLoader import resend class EmailSender: def __init__(self, api_key, from_email): resend.api_key api_key self.from_email from_email self.env Environment(loaderFileSystemLoader(templates/)) def generate_html(self, tweets, total_scanned): template self.env.get_template(newsletter_template.html) html_content template.render( datedatetime.now().strftime(%Y-%m-%d), tweetstweets, total_scannedtotal_scanned, total_selectedlen(tweets) ) return html_content async def send_newsletter(self, to_emails, html_content, subject你的技术精选简报): try: params { from: self.from_email, to: to_emails, subject: subject, html: html_content, } email resend.Emails.send(params) logging.info(f简报发送成功ID: {email[id]}) except Exception as e: logging.error(f发送邮件失败: {e})使用模板引擎的好处是内容与样式分离后续想要更换邮件风格非常方便。3.3.2 可靠的任务调度Scheduler模块使用 APScheduler 将以上所有步骤串联起来并设定执行计划。from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger import asyncio class TaskScheduler: def __init__(self, scraper, ai_processor, email_sender, config): self.scheduler AsyncIOScheduler() self.scraper scraper self.ai_processor ai_processor self.email_sender email_sender self.config config async def run_pipeline(self): 完整的管道执行任务 logging.info(开始执行抓取与分析管道...) try: # 1. 抓取 raw_tweets await self.scraper.scrape_hashtags(self.config[hashtag_groups]) # 2. AI处理 curated_tweets await self.ai_processor.process_batch(raw_tweets) # 3. 生成并发送邮件 if curated_tweets: html self.email_sender.generate_html(curated_tweets, len(raw_tweets)) await self.email_sender.send_newsletter( self.config[subscriber_list], html, subjectf技术简报 {datetime.now().strftime(%Y-%m-%d)} ) else: logging.info(今日未筛选出优质内容不发送简报。) except Exception as e: logging.critical(f管道执行失败: {e}, exc_infoTrue) finally: await self.scraper.close() # 确保浏览器资源被释放 def start(self): # 每天上午9点执行 trigger CronTrigger(hour9, minute0) self.scheduler.add_job( self.run_pipeline, triggertrigger, iddaily_newsletter, replace_existingTrue ) self.scheduler.start() logging.info(调度器已启动计划任务已添加。) # 保持主线程运行 try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): self.scheduler.shutdown() logging.info(调度器已关闭。)这个调度器配置了一个每天运行的定时任务。在生产环境中你可能会将这个脚本作为后台服务如 systemd service 或 Docker 容器运行。4. 工程化实践测试、质量与自动化一个能长期运行的项目离不开完善的工程化实践。否则今天能跑的脚本明天可能因为网站的一个小改动就崩溃了。4.1 如何测试一个依赖外部网站的爬虫测试爬虫是公认的难点因为它严重依赖不稳定的第三方页面。我们的策略是模拟Mock和存根Stub。4.1.1 单元测试Mock 一切外部依赖我们使用pytest和pytest-asyncio来编写异步测试。核心思想是不启动真正的浏览器也不访问真实的 X.com。# tests/test_scraper.py import pytest from unittest.mock import AsyncMock, MagicMock, patch from src.scraper import TweetScraper pytest.mark.asyncio async def test_parse_interactions(): 测试互动数据解析逻辑 scraper TweetScraper() # 模拟一个 Playwright 元素对象 mock_element AsyncMock() # 模拟 query_selector 的链式调用返回包含特定文本的 Mock 对象 mock_like_element AsyncMock() mock_like_element.inner_text.return_value 1.2K mock_retweet_element AsyncMock() mock_retweet_element.inner_text.return_value 345 mock_view_element AsyncMock() mock_view_element.inner_text.return_value 5.6M # 设置 mock_element 的 query_selector 行为 def side_effect(selector): if selector [data-testidlike]: return mock_like_element elif selector [data-testidretweet]: return mock_retweet_element elif selector [data-testidview]: return mock_view_element else: return AsyncMock() # 其他选择器返回空 Mock mock_element.query_selector.side_effect side_effect # 调用被测试的方法 metrics await scraper._parse_interactions(mock_element) # 断言 assert metrics[likes] 1200 # 1.2K 应解析为 1200 assert metrics[retweets] 345 assert metrics[views] 5_600_000 # 5.6M 应解析为 5,600,000 assert metrics[total] 1200 345 5_600_000这个测试完全隔离了网络和浏览器只验证我们的业务逻辑字符串解析是否正确。我们通过 Mock 对象模拟了page.query_selector_all返回的元素列表以及每个元素上query_selector和inner_text()的行为。4.1.2 集成测试可选对于 Scraper 的集成测试可以有一个标记为“慢速”或“在线”的测试套件它使用真实的浏览器可配置为headlessTrue访问一个我们可控的、静态的测试页面。这个页面本地托管HTML 结构模仿 X.com 的推文组件。这样可以测试 Playwright 选择器是否能正确找到元素而无需担心 X.com 的反爬或网络波动。4.2 代码质量与自动化流水线4.2.1 本地开发护栏Pre-commit Hooks在代码提交到仓库前自动进行检查可以防止低级错误进入代码库。我们在.pre-commit-config.yaml中配置repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: - id: trailing-whitespace # 删除行尾空格 - id: end-of-file-fixer # 确保文件以换行符结尾 - id: check-yaml # 检查 YAML 语法 - id: check-added-large-files # 防止提交大文件 - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.3.0 hooks: - id: ruff # 超快的 Python Linter args: [--fix, --exit-non-zero-on-fix] - id: ruff-format # 代码格式化 - repo: local # 自定义本地钩子 hooks: - id: check-test-coverage name: Check Test Coverage entry: python scripts/pre_commit_coverage.py language: system pass_filenames: false always_run: trueruff替代了flake8和isort速度极快能检查代码风格和潜在错误。ruff-format则替代了black确保代码格式一致。自定义的pre_commit_coverage.py脚本会运行测试并检查覆盖率是否低于预设阈值如 80%如果低于则阻止提交。4.2.2 持续集成GitHub Actions每次推送到主分支或创建拉取请求时GitHub Actions 会自动运行完整的检查流程。# .github/workflows/ci.yml name: CI on: [push, pull_request] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkoutv4 - name: Set up Python 3.11 uses: actions/setup-pythonv5 with: python-version: 3.11 - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install -r requirements-dev.txt - name: Lint with ruff run: | ruff check . ruff format --check . - name: Type check with mypy run: mypy src/ - name: Run tests with pytest run: pytest --covsrc --cov-reportxml - name: Upload coverage to Codecov uses: codecov/codecov-actionv3 with: file: ./coverage.xml这个流水线确保了代码库的持续健康风格一致、类型安全、测试通过且覆盖率达标。4.3 配置管理与部署4.3.1 使用 YAML 进行灵活配置将可变参数从代码中分离出来是良好实践。我们使用config.yaml或hashtags.yaml。# config.yaml scraper: headless: true stealth: true scroll_delay_ms: 3000 max_tweets_per_group: 50 min_interaction_threshold: 10 wait_between_groups_ms: 7000 ai: model: openai/gpt-3.5-turbo api_key: ${OPENROUTER_API_KEY} # 从环境变量读取 max_batch_size: 5 requests_per_minute: 20 relevance_score_threshold: 3 email: provider: resend api_key: ${RESEND_API_KEY} from_address: newsletteryourdomain.com subscriber_list: - user1example.com - user2example.com hashtag_groups: - name: python_dev hashtags: [#Python, #Django, #FastAPI] - name: ai_ml hashtags: [#MachineLearning, #LLM, #AI]在代码中使用pyyaml库加载配置并使用os.path.expandvars或类似方法处理环境变量替换。这样在不同环境开发、生产中只需修改配置文件或环境变量而无需改动代码。4.3.2 部署选项本地/服务器常驻运行使用systemd或supervisord将调度器脚本作为后台服务运行。确保配置好日志轮转logrotate。云函数/Serverless将整个管道拆分为三个独立的云函数抓取、处理、发送由云服务商的定时触发器驱动。这更具可扩展性且无需管理服务器但需要适应无状态环境如 Cookie 管理可能需要使用云存储。Docker 容器将整个应用及其依赖打包成 Docker 镜像。这保证了环境一致性可以轻松部署在任何支持 Docker 的地方包括你自己的服务器或云平台的容器服务。结合docker-compose可以方便地管理。5. 常见问题与故障排查实录在实际开发和运行中你一定会遇到各种问题。以下是一些典型问题及其解决思路。5.1 抓取相关问题问题1脚本运行几分钟后就被 X.com 屏蔽返回登录墙或验证码。可能原因1指纹检测。即使使用了playwright-stealth某些指纹如 WebGL、字体、Canvas可能仍有泄漏。排查与解决尝试使用playwright.chromium.launch_persistent_context加载一个真实的用户数据目录User Data Dir这包含了完整的浏览器指纹。但注意这个目录可能很大且需要你先用这个浏览器手动登录一次。可能原因2行为模式异常。滚动速度恒定、请求间隔完全一致、鼠标移动轨迹过于机械。排查与解决在所有sleep和延迟中引入随机性如random.uniform(2, 5)。可以考虑使用更复杂的行为模拟库但通常随机延迟已能解决大部分问题。可能原因3IP 地址被标记。如果你从数据中心 IP如 AWS、GCP 的 IP发起大量请求很容易被识别。排查与解决考虑使用住宅代理 IP 池。但这会显著增加复杂性和成本。对于个人或小规模项目降低请求频率增加wait_between_groups_ms和每次抓取的数据量减少max_tweets_per_group是更经济的选择。问题2抓取到的推文数量远少于预期或者很快停止。可能原因1选择器失效。X.com 的前端代码可能更新>