Files
wechat_crawler/receive_file_transfer.py
李顺东 b66bac7ca8 feat: Initialize wxauto WeChat automation project with job extraction tools
- Add wxauto package with WeChat UI automation and message handling capabilities
- Implement job_extractor.py for automated job posting extraction from WeChat groups
- Add job_extractor_gui.py providing graphical interface for job extraction tool
- Create comprehensive documentation in Chinese covering GUI usage, multi-group support, and quick start guides
- Add build configuration files (build_exe.py, build_exe.spec) for packaging as standalone executable
- Include utility scripts for WeChat interaction (auto_send_msg.py, get_history.py, receive_file_transfer.py)
- Add project configuration files (pyproject.toml, setup.cfg, requirements.txt)
- Include test files (test_api.py, test_com_fix.py) for API and compatibility validation
- Add Apache 2.0 LICENSE and comprehensive README documentation
- Configure .gitignore to exclude build artifacts, logs, and temporary files
2026-02-11 14:49:38 +08:00

90 lines
2.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
监听「文件传输助手」消息,在控制台输出内容
运行前请确保1) 已安装依赖 pip install -e .
2) 电脑已登录微信 3.9 版本,且主窗口已打开
按 Ctrl+C 可退出
"""
import sys
import os
_script_dir = os.path.dirname(os.path.abspath(__file__))
if _script_dir not in sys.path:
sys.path.insert(0, _script_dir)
from wxauto import WeChat
# 要监听的聊天(可改成其他好友/群名)
LISTEN_CHAT = "文件传输助手"
# 启动时是否先获取并输出历史记录
PRINT_HISTORY_ON_START = True
# 启动时向上加载历史页数0=不加载更多,只取当前可见)
LOAD_HISTORY_PAGES = 1
def format_history_msg(msg, index: int):
"""历史消息单条格式化"""
msg_type = getattr(msg, "type", "text")
content = getattr(msg, "content", "")
sender = getattr(msg, "sender_remark", "") or getattr(msg, "sender", "")
content = (content or "").replace("\n", " ").strip()
if msg_type != "text":
content = f"[{msg_type}] " + (content or "(非文本)")
return f" {index}. [{sender}] {content}"
def print_history(wx):
"""获取当前聊天历史并输出到控制台"""
if not wx.ChatWith(LISTEN_CHAT):
print(f"无法切换到「{LISTEN_CHAT}」,跳过历史记录")
return
for _ in range(LOAD_HISTORY_PAGES):
if not wx.LoadMoreMessage(interval=0.3):
break
msgs = wx.GetAllMessage()
if not msgs:
print("(无历史消息)\n")
return
print(f"---------- 「{LISTEN_CHAT}」 历史记录(共 {len(msgs)} 条)----------")
for i, msg in enumerate(msgs, 1):
print(format_history_msg(msg, i))
print("---------- 以上为历史,以下为新消息 ----------\n")
def on_message(msg, chat):
"""收到新消息时在控制台输出"""
# msg: 消息对象,有 .type .content .sender 等
# chat: 当前聊天窗口对象
msg_type = getattr(msg, "type", "text")
content = getattr(msg, "content", "")
sender = getattr(msg, "sender_remark", "") or getattr(msg, "sender", "")
if msg_type == "text":
print(f"[{LISTEN_CHAT}] {sender}: {content}")
else:
# 图片、视频、语音等只输出类型,内容多为路径或占位
print(f"[{LISTEN_CHAT}] [{msg_type}] {sender}: {content or '(非文本)'}")
def main():
print("正在连接微信...")
try:
wx = WeChat()
except Exception as e:
print("连接失败,请确保:")
print(" 1. 已安装依赖: pip install -e .")
print(" 2. 微信 3.9 已登录并保持主窗口打开")
print(f" 错误: {e}")
return
if PRINT_HISTORY_ON_START:
print_history(wx)
print(f"开始监听「{LISTEN_CHAT}」新消息,按 Ctrl+C 退出\n")
wx.AddListenChat(LISTEN_CHAT, callback=on_message)
wx.StartListening()
wx.KeepRunning()
if __name__ == "__main__":
main()