Files
wechat_crawler/wxauto/ui/chatbox.py
李顺东 b66bac7ca8 feat: Initialize wxauto WeChat automation project with job extraction tools
- Add wxauto package with WeChat UI automation and message handling capabilities
- Implement job_extractor.py for automated job posting extraction from WeChat groups
- Add job_extractor_gui.py providing graphical interface for job extraction tool
- Create comprehensive documentation in Chinese covering GUI usage, multi-group support, and quick start guides
- Add build configuration files (build_exe.py, build_exe.spec) for packaging as standalone executable
- Include utility scripts for WeChat interaction (auto_send_msg.py, get_history.py, receive_file_transfer.py)
- Add project configuration files (pyproject.toml, setup.cfg, requirements.txt)
- Include test files (test_api.py, test_com_fix.py) for API and compatibility validation
- Add Apache 2.0 LICENSE and comprehensive README documentation
- Configure .gitignore to exclude build artifacts, logs, and temporary files
2026-02-11 14:49:38 +08:00

455 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from .base import BaseUISubWnd
from wxauto.ui.component import (
CMenuWnd,
)
from wxauto.param import (
WxParam,
WxResponse,
)
from wxauto.languages import *
from wxauto.utils import (
SetClipboardText,
SetClipboardFiles,
GetAllWindowExs,
)
from wxauto.msgs import parse_msg
from wxauto import uiautomation as uia
from wxauto.logger import wxlog
from wxauto.uiautomation import Control
from wxauto.utils.tools import roll_into_view
import time
import os
USED_MSG_IDS = {}
class ChatBox(BaseUISubWnd):
def __init__(self, control: uia.Control, parent):
self.control: Control = control
self.root = parent
self.parent = parent # `wx` or `chat`
self.init()
def init(self):
self.msgbox = self.control.ListControl(Name=self._lang("消息"))
# if not self.msgbox.Exists(0):
# return
self.editbox = self.control.EditControl()
self.sendbtn = self.control.ButtonControl(Name=self._lang('发送'))
self.tools = self.control.PaneControl().ToolBarControl()
# self.id = self.msgbox.runtimeid
self._empty = False # 用于记录是否为完全没有聊天记录的窗口,因为这种窗口之前有不会触发新消息判断的问题
if (cid := self.id) and cid not in USED_MSG_IDS:
USED_MSG_IDS[self.id] = tuple((i.runtimeid for i in self.msgbox.GetChildren()))
if not USED_MSG_IDS[cid]:
self._empty = True
def _lang(self, text: str) -> str:
return WECHAT_CHAT_BOX.get(text, {WxParam.LANGUAGE: text}).get(WxParam.LANGUAGE)
def _update_used_msg_ids(self):
USED_MSG_IDS[self.id] = tuple((i.runtimeid for i in self.msgbox.GetChildren()))
def _open_chat_more_info(self):
for chatinfo_control, depth in uia.WalkControl(self.control):
if chatinfo_control.Name == self._lang('聊天信息'):
chatinfo_control.Click()
break
else:
return WxResponse.failure('未找到聊天信息按钮')
return ChatRoomDetailWnd(self)
def _activate_editbox(self):
if not self.editbox.HasKeyboardFocus:
self.editbox.MiddleClick()
@property
def who(self):
if hasattr(self, '_who'):
return self._who
self._who = self.editbox.Name
return self._who
@property
def id(self):
if self.msgbox.Exists(0):
return self.msgbox.runtimeid
return None
@property
def used_msg_ids(self):
return USED_MSG_IDS[self.id]
def get_info(self):
chat_info = {}
walk = uia.WalkControl(self.control)
for chat_name_control, depth in walk:
if isinstance(chat_name_control, uia.TextControl):
break
if (
not isinstance(chat_name_control, uia.TextControl)
or depth < 8
):
return {}
# chat_name_control = self.control.GetProgenyControl(11)
chat_name_control_list = chat_name_control.GetParentControl().GetChildren()
chat_name_control_count = len(chat_name_control_list)
if chat_name_control_count == 1:
if self.control.ButtonControl(Name='公众号主页', searchDepth=9).Exists(0):
chat_info['chat_type'] = 'official'
else:
chat_info['chat_type'] = 'friend'
chat_info['chat_name'] = chat_name_control.Name
elif chat_name_control_count >= 2:
try:
second_text = chat_name_control_list[1].Name
if second_text.startswith('@'):
chat_info['company'] = second_text
chat_info['chat_type'] = 'service'
chat_info['chat_name'] = chat_name_control.Name
else:
chat_info['group_member_count'] =\
int(second_text.replace('(', '').replace(')', ''))
chat_info['chat_type'] = 'group'
chat_info['chat_name'] =\
chat_name_control.Name.replace(second_text, '')
except:
chat_info['chat_type'] = 'friend'
chat_info['chat_name'] = chat_name_control.Name
ori_chat_name_control =\
chat_name_control.GetParentControl().\
GetParentControl().TextControl(searchDepth=1)
if ori_chat_name_control.Exists(0):
chat_info['chat_remark'] = chat_info['chat_name']
chat_info['chat_name'] = ori_chat_name_control.Name
self._info = chat_info
return chat_info
def input_at(self, at_list):
self._show()
if isinstance(at_list, str):
at_list = [at_list]
self._activate_editbox()
for friend in at_list:
self.editbox.SendKeys('@'+friend.replace(' ', ''))
atmenu = AtMenu(self)
atmenu.select(friend)
def clear_edit(self):
self._show()
self.editbox.Click()
self.editbox.SendKeys('{Ctrl}a', waitTime=0)
self.editbox.SendKeys('{DELETE}')
def send_text(self, content: str):
self._show()
t0 = time.time()
while True:
if time.time() - t0 > 10:
return WxResponse.failure(f'Timeout --> {self.who} - {content}')
SetClipboardText(content)
self._activate_editbox()
self.editbox.SendKeys('{Ctrl}v')
if self.editbox.GetValuePattern().Value.replace('', '').strip():
break
self.editbox.SendKeys('{Ctrl}v')
if self.editbox.GetValuePattern().Value.replace('', '').strip():
break
self.editbox.RightClick()
menu = CMenuWnd(self)
menu.select('粘贴')
if self.editbox.GetValuePattern().Value.replace('', '').strip():
break
t0 = time.time()
while self.editbox.GetValuePattern().Value:
if time.time() - t0 > 10:
return WxResponse.failure(f'Timeout --> {self.who} - {content}')
self._activate_editbox()
self.sendbtn.Click()
if not self.editbox.GetValuePattern().Value:
return WxResponse.success(f"success")
elif not self.editbox.GetValuePattern().Value.replace('', '').strip():
return self.send_text(content)
def send_msg(self, content: str, clear: bool=True, at=None):
if not content and not at:
return WxResponse.failure(f"参数 `content` 和 `at` 不能同时为空")
if clear:
self.clear_edit()
if at:
self.input_at(at)
return self.send_text(content)
def send_file(self, file_path):
self._show()
if isinstance(file_path, str):
file_path = [file_path]
file_path = [os.path.abspath(f) for f in file_path]
SetClipboardFiles(file_path)
self._activate_editbox()
self.editbox.SendKeys('{Ctrl}v')
self.sendbtn.Click()
if self.editbox.GetValuePattern().Value:
return WxResponse.fail("发送失败,请重试")
return WxResponse.success()
def load_more(self, interval=0.3):
self._show()
msg_len = len(self.msgbox.GetChildren())
loadmore = self.msgbox.GetChildren()[0]
loadmore_top = loadmore.BoundingRectangle.top
while True:
if len(self.msgbox.GetChildren()) > msg_len:
isload = True
break
else:
msg_len = len(self.msgbox.GetChildren())
self.msgbox.WheelUp(wheelTimes=10)
time.sleep(interval)
if self.msgbox.GetChildren()[0].BoundingRectangle.top == loadmore_top\
and len(self.msgbox.GetChildren()) == msg_len:
isload = False
break
else:
loadmore_top = self.msgbox.GetChildren()[0].BoundingRectangle.top
self.msgbox.WheelUp(wheelTimes=1, waitTime=0.1)
if isload:
return WxResponse.success()
else:
return WxResponse.failure("没有更多消息了")
def get_msgs(self):
if self.msgbox.Exists(0):
return [
parse_msg(msg_control, self)
for msg_control
in self.msgbox.GetChildren()
if msg_control.ControlTypeName == 'ListItemControl'
]
return []
def get_new_msgs(self):
if not self.msgbox.Exists(0):
return []
msg_controls = self.msgbox.GetChildren()
now_msg_ids = tuple((i.runtimeid for i in msg_controls))
if not now_msg_ids: # 当前没有消息id
return []
if self._empty and self.used_msg_ids:
self._empty = False
if not self._empty and (
(not self.used_msg_ids and now_msg_ids) # 没有使用过的消息id但当前有消息id
or now_msg_ids[-1] == self.used_msg_ids[-1] # 当前最后一条消息id和上次一样
or not set(now_msg_ids)&set(self.used_msg_ids) # 当前消息id和上次没有交集
):
# wxlog.debug('没有新消息')
return []
used_msg_ids_set = set(self.used_msg_ids)
last_one_msgid = max(
(x for x in now_msg_ids if x in used_msg_ids_set),
key=self.used_msg_ids.index, default=None
)
new1 = [x for x in now_msg_ids if x not in used_msg_ids_set]
new2 = now_msg_ids[now_msg_ids.index(last_one_msgid) + 1 :]\
if last_one_msgid is not None else []
new = [i for i in new1 if i in new2] if new2 else new1
USED_MSG_IDS[self.id] = tuple(self.used_msg_ids + tuple(new))[-100:]
new_controls = [i for i in msg_controls if i.runtimeid in new]
self.msgbox.MiddleClick()
return [
parse_msg(msg_control, self)
for msg_control
in new_controls
if msg_control.ControlTypeName == 'ListItemControl'
]
def get_msg_by_id(self, msg_id: str):
if not self.msgbox.Exists(0):
return []
msg_controls = self.msgbox.GetChildren()
if control_list := [i for i in msg_controls if i.runtimeid == msg_id]:
return parse_msg(control_list[0], self)
def _get_tail_after_nth_match(self, msgs, last_msg, n):
matches = [
i for i, msg in reversed(list(enumerate(msgs)))
if msg.content == last_msg
]
if len(matches) >= n:
wxlog.debug(f'匹配到基准消息:{last_msg}')
else:
split_last_msg = last_msg.split('')
nickname = split_last_msg[0]
content = ''.join(split_last_msg[1:])
matches = [
i for i, msg in reversed(list(enumerate(msgs)))
if msg.content == content
and msg.sender_remark == nickname
]
if len(matches) >= n:
wxlog.debug(f'匹配到基准消息:<{nickname}> {content}')
else:
wxlog.debug(f"未匹配到基准消息,以最后一条消息为基准:{msgs[-1].content}")
matches = [
i for i, msg in reversed(list(enumerate(msgs)))
if msg.attr in ('self', 'friend')
]
try:
index = matches[n - 1]
return msgs[index:]
except IndexError:
wxlog.debug(f"未匹配到第{n}条消息,返回空列表")
return []
def get_next_new_msgs(self, count=None, last_msg=None):
# 1. 消息列表不存在,则返回空列表
if not self.msgbox.Exists(0):
wxlog.debug('消息列表不存在,返回空列表')
return []
# 2. 判断是否有新消息按钮,有的话点一下
load_new_button = self.control.ButtonControl(RegexName=self._lang('re_新消息按钮'))
if load_new_button.Exists(0):
self._show()
wxlog.debug('检测到新消息按钮,点击加载新消息')
load_new_button.Click()
time.sleep(0.5)
msg_controls = self.msgbox.GetChildren()
USED_MSG_IDS[self.id] = tuple((i.runtimeid for i in msg_controls))
msgs = [
parse_msg(msg_control, self)
for msg_control
in msg_controls
if msg_control.ControlTypeName == 'ListItemControl'
]
# 3. 如果有“以下是新消息”标志,则直接返回该标志下的所有消息即可
index = next((
i for i, msg in enumerate(msgs)
if self._lang('以下为新消息') == msg.content
), None)
if index is not None:
wxlog.debug('获取以下是新消息下的所有消息')
return msgs[index:]
# 4. 根据会话列表传入的消息数量和最后一条新消息内容来判断新消息
if count and last_msg:
# index = next((
# i for i, msg in enumerate(msgs[::-1])
# if last_msg == msg.content
# ), None)
# if index is not None:
wxlog.debug(f'获取{count}条新消息,基准消息内容为:{last_msg}')
return self._get_tail_after_nth_match(msgs, last_msg, count)
def get_group_members(self):
self._show()
roominfoWnd = self._open_chat_more_info()
return roominfoWnd.get_group_members()
class ChatRoomDetailWnd(BaseUISubWnd):
_ui_cls_name: str = 'SessionChatRoomDetailWnd'
def __init__(self, parent):
self.parent = parent
self.root = parent.root
self.control = self.root.control.Control(ClassName=self._ui_cls_name, searchDepth=1)
def _lang(self, text: str) -> str:
return CHATROOM_DETAIL_WINDOW.get(text, {WxParam.LANGUAGE: text}).get(WxParam.LANGUAGE)
def _edit(self, key, value):
wxlog.debug(f'修改{key}为`{value}`')
btn = self.control.TextControl(Name=key).GetParentControl().ButtonControl(Name=key)
if btn.Exists(0):
roll_into_view(self.control, btn)
btn.Click()
else:
wxlog.debug(f'当前非群聊,无法修改{key}')
return WxResponse.failure(f'Not a group chat, cannot modify `{key}`')
while True:
edit_hwnd_list = [
i[0]
for i in GetAllWindowExs(self.control.NativeWindowHandle)
if i[1] == 'EditWnd'
]
if edit_hwnd_list:
edit_hwnd = edit_hwnd_list[0]
break
btn.Click()
edit_win32 = uia.Win32(edit_hwnd)
edit_win32.shortcut_select_all()
edit_win32.send_keys_shortcut('{DELETE}')
edit_win32.input(value)
edit_win32.send_keys_shortcut('{ENTER}')
return WxResponse.success()
def get_group_members(self, control=False):
"""获取群成员"""
more = self.control.ButtonControl(Name=self._lang('查看更多'), searchDepth=8)
if more.Exists(0.5):
more.Click()
members = [i for i in self.control.ListControl(Name=self._lang('聊天成员')).GetChildren()]
while members[-1].Name in [self._lang('添加'), self._lang('移出')]:
members = members[:-1]
if control:
return members
member_names = [i.Name for i in members]
self.close()
return member_names
class GroupMemberElement:
def __init__(self, control, parent) -> None:
self.control = control
self.parent = parent
self.root = self.parent.root
self.nickname = self.control.Name
def __repr__(self) -> str:
return f"<wxauto Group Member Element at {hex(id(self))}>"
class AtMenu(BaseUISubWnd):
_ui_cls_name = 'ChatContactMenu'
def __init__(self, parent):
self.root = parent.root
self.control = parent.parent.control.PaneControl(ClassName='ChatContactMenu')
# self.control.Exists(1)
def clear(self, friend):
if self.exists():
self.control.SendKeys('{ESC}')
for _ in range(len(friend)+1):
self.root.chatbox.editbox.SendKeys('{BACK}')
def select(self, friend):
friend_ = friend.replace(' ', '')
if self.exists():
ateles = self.control.ListControl().GetChildren()
if len(ateles) == 1:
ateles[0].Click()
else:
atele = self.control.ListItemControl(Name=friend)
if atele.Exists(0):
roll_into_view(self.control, atele)
atele.Click()
else:
self.clear(friend_)
else:
self.clear(friend_)