- Add wxauto package with WeChat UI automation and message handling capabilities - Implement job_extractor.py for automated job posting extraction from WeChat groups - Add job_extractor_gui.py providing graphical interface for job extraction tool - Create comprehensive documentation in Chinese covering GUI usage, multi-group support, and quick start guides - Add build configuration files (build_exe.py, build_exe.spec) for packaging as standalone executable - Include utility scripts for WeChat interaction (auto_send_msg.py, get_history.py, receive_file_transfer.py) - Add project configuration files (pyproject.toml, setup.cfg, requirements.txt) - Include test files (test_api.py, test_com_fix.py) for API and compatibility validation - Add Apache 2.0 LICENSE and comprehensive README documentation - Configure .gitignore to exclude build artifacts, logs, and temporary files
455 lines
17 KiB
Python
455 lines
17 KiB
Python
from .base import BaseUISubWnd
|
||
from wxauto.ui.component import (
|
||
CMenuWnd,
|
||
)
|
||
from wxauto.param import (
|
||
WxParam,
|
||
WxResponse,
|
||
)
|
||
from wxauto.languages import *
|
||
from wxauto.utils import (
|
||
SetClipboardText,
|
||
SetClipboardFiles,
|
||
GetAllWindowExs,
|
||
)
|
||
from wxauto.msgs import parse_msg
|
||
from wxauto import uiautomation as uia
|
||
from wxauto.logger import wxlog
|
||
from wxauto.uiautomation import Control
|
||
from wxauto.utils.tools import roll_into_view
|
||
import time
|
||
import os
|
||
|
||
USED_MSG_IDS = {}
|
||
|
||
class ChatBox(BaseUISubWnd):
|
||
def __init__(self, control: uia.Control, parent):
|
||
self.control: Control = control
|
||
self.root = parent
|
||
self.parent = parent # `wx` or `chat`
|
||
self.init()
|
||
|
||
def init(self):
|
||
self.msgbox = self.control.ListControl(Name=self._lang("消息"))
|
||
# if not self.msgbox.Exists(0):
|
||
# return
|
||
self.editbox = self.control.EditControl()
|
||
self.sendbtn = self.control.ButtonControl(Name=self._lang('发送'))
|
||
self.tools = self.control.PaneControl().ToolBarControl()
|
||
# self.id = self.msgbox.runtimeid
|
||
self._empty = False # 用于记录是否为完全没有聊天记录的窗口,因为这种窗口之前有不会触发新消息判断的问题
|
||
if (cid := self.id) and cid not in USED_MSG_IDS:
|
||
USED_MSG_IDS[self.id] = tuple((i.runtimeid for i in self.msgbox.GetChildren()))
|
||
if not USED_MSG_IDS[cid]:
|
||
self._empty = True
|
||
|
||
def _lang(self, text: str) -> str:
|
||
return WECHAT_CHAT_BOX.get(text, {WxParam.LANGUAGE: text}).get(WxParam.LANGUAGE)
|
||
|
||
def _update_used_msg_ids(self):
|
||
USED_MSG_IDS[self.id] = tuple((i.runtimeid for i in self.msgbox.GetChildren()))
|
||
|
||
|
||
def _open_chat_more_info(self):
|
||
for chatinfo_control, depth in uia.WalkControl(self.control):
|
||
if chatinfo_control.Name == self._lang('聊天信息'):
|
||
chatinfo_control.Click()
|
||
break
|
||
else:
|
||
return WxResponse.failure('未找到聊天信息按钮')
|
||
return ChatRoomDetailWnd(self)
|
||
|
||
|
||
def _activate_editbox(self):
|
||
if not self.editbox.HasKeyboardFocus:
|
||
self.editbox.MiddleClick()
|
||
|
||
@property
|
||
def who(self):
|
||
if hasattr(self, '_who'):
|
||
return self._who
|
||
self._who = self.editbox.Name
|
||
return self._who
|
||
|
||
@property
|
||
def id(self):
|
||
if self.msgbox.Exists(0):
|
||
return self.msgbox.runtimeid
|
||
return None
|
||
|
||
@property
|
||
def used_msg_ids(self):
|
||
return USED_MSG_IDS[self.id]
|
||
|
||
def get_info(self):
|
||
chat_info = {}
|
||
walk = uia.WalkControl(self.control)
|
||
for chat_name_control, depth in walk:
|
||
if isinstance(chat_name_control, uia.TextControl):
|
||
break
|
||
if (
|
||
not isinstance(chat_name_control, uia.TextControl)
|
||
or depth < 8
|
||
):
|
||
return {}
|
||
|
||
# chat_name_control = self.control.GetProgenyControl(11)
|
||
chat_name_control_list = chat_name_control.GetParentControl().GetChildren()
|
||
chat_name_control_count = len(chat_name_control_list)
|
||
|
||
if chat_name_control_count == 1:
|
||
if self.control.ButtonControl(Name='公众号主页', searchDepth=9).Exists(0):
|
||
chat_info['chat_type'] = 'official'
|
||
else:
|
||
chat_info['chat_type'] = 'friend'
|
||
chat_info['chat_name'] = chat_name_control.Name
|
||
elif chat_name_control_count >= 2:
|
||
try:
|
||
second_text = chat_name_control_list[1].Name
|
||
if second_text.startswith('@'):
|
||
chat_info['company'] = second_text
|
||
chat_info['chat_type'] = 'service'
|
||
chat_info['chat_name'] = chat_name_control.Name
|
||
else:
|
||
chat_info['group_member_count'] =\
|
||
int(second_text.replace('(', '').replace(')', ''))
|
||
chat_info['chat_type'] = 'group'
|
||
chat_info['chat_name'] =\
|
||
chat_name_control.Name.replace(second_text, '')
|
||
except:
|
||
chat_info['chat_type'] = 'friend'
|
||
chat_info['chat_name'] = chat_name_control.Name
|
||
|
||
ori_chat_name_control =\
|
||
chat_name_control.GetParentControl().\
|
||
GetParentControl().TextControl(searchDepth=1)
|
||
if ori_chat_name_control.Exists(0):
|
||
chat_info['chat_remark'] = chat_info['chat_name']
|
||
chat_info['chat_name'] = ori_chat_name_control.Name
|
||
self._info = chat_info
|
||
return chat_info
|
||
|
||
|
||
def input_at(self, at_list):
|
||
self._show()
|
||
if isinstance(at_list, str):
|
||
at_list = [at_list]
|
||
self._activate_editbox()
|
||
for friend in at_list:
|
||
self.editbox.SendKeys('@'+friend.replace(' ', ''))
|
||
atmenu = AtMenu(self)
|
||
atmenu.select(friend)
|
||
|
||
|
||
def clear_edit(self):
|
||
self._show()
|
||
self.editbox.Click()
|
||
self.editbox.SendKeys('{Ctrl}a', waitTime=0)
|
||
self.editbox.SendKeys('{DELETE}')
|
||
|
||
|
||
def send_text(self, content: str):
|
||
self._show()
|
||
t0 = time.time()
|
||
while True:
|
||
if time.time() - t0 > 10:
|
||
return WxResponse.failure(f'Timeout --> {self.who} - {content}')
|
||
SetClipboardText(content)
|
||
self._activate_editbox()
|
||
self.editbox.SendKeys('{Ctrl}v')
|
||
if self.editbox.GetValuePattern().Value.replace('', '').strip():
|
||
break
|
||
self.editbox.SendKeys('{Ctrl}v')
|
||
if self.editbox.GetValuePattern().Value.replace('', '').strip():
|
||
break
|
||
self.editbox.RightClick()
|
||
menu = CMenuWnd(self)
|
||
menu.select('粘贴')
|
||
if self.editbox.GetValuePattern().Value.replace('', '').strip():
|
||
break
|
||
t0 = time.time()
|
||
while self.editbox.GetValuePattern().Value:
|
||
if time.time() - t0 > 10:
|
||
return WxResponse.failure(f'Timeout --> {self.who} - {content}')
|
||
self._activate_editbox()
|
||
|
||
self.sendbtn.Click()
|
||
if not self.editbox.GetValuePattern().Value:
|
||
return WxResponse.success(f"success")
|
||
elif not self.editbox.GetValuePattern().Value.replace('', '').strip():
|
||
return self.send_text(content)
|
||
|
||
def send_msg(self, content: str, clear: bool=True, at=None):
|
||
if not content and not at:
|
||
return WxResponse.failure(f"参数 `content` 和 `at` 不能同时为空")
|
||
|
||
if clear:
|
||
self.clear_edit()
|
||
if at:
|
||
self.input_at(at)
|
||
|
||
return self.send_text(content)
|
||
|
||
|
||
def send_file(self, file_path):
|
||
self._show()
|
||
if isinstance(file_path, str):
|
||
file_path = [file_path]
|
||
file_path = [os.path.abspath(f) for f in file_path]
|
||
SetClipboardFiles(file_path)
|
||
self._activate_editbox()
|
||
self.editbox.SendKeys('{Ctrl}v')
|
||
self.sendbtn.Click()
|
||
if self.editbox.GetValuePattern().Value:
|
||
return WxResponse.fail("发送失败,请重试")
|
||
return WxResponse.success()
|
||
|
||
def load_more(self, interval=0.3):
|
||
self._show()
|
||
msg_len = len(self.msgbox.GetChildren())
|
||
loadmore = self.msgbox.GetChildren()[0]
|
||
loadmore_top = loadmore.BoundingRectangle.top
|
||
while True:
|
||
if len(self.msgbox.GetChildren()) > msg_len:
|
||
isload = True
|
||
break
|
||
else:
|
||
msg_len = len(self.msgbox.GetChildren())
|
||
self.msgbox.WheelUp(wheelTimes=10)
|
||
time.sleep(interval)
|
||
if self.msgbox.GetChildren()[0].BoundingRectangle.top == loadmore_top\
|
||
and len(self.msgbox.GetChildren()) == msg_len:
|
||
isload = False
|
||
break
|
||
else:
|
||
loadmore_top = self.msgbox.GetChildren()[0].BoundingRectangle.top
|
||
|
||
self.msgbox.WheelUp(wheelTimes=1, waitTime=0.1)
|
||
if isload:
|
||
return WxResponse.success()
|
||
else:
|
||
return WxResponse.failure("没有更多消息了")
|
||
|
||
|
||
def get_msgs(self):
|
||
if self.msgbox.Exists(0):
|
||
return [
|
||
parse_msg(msg_control, self)
|
||
for msg_control
|
||
in self.msgbox.GetChildren()
|
||
if msg_control.ControlTypeName == 'ListItemControl'
|
||
]
|
||
return []
|
||
|
||
def get_new_msgs(self):
|
||
if not self.msgbox.Exists(0):
|
||
return []
|
||
msg_controls = self.msgbox.GetChildren()
|
||
now_msg_ids = tuple((i.runtimeid for i in msg_controls))
|
||
if not now_msg_ids: # 当前没有消息id
|
||
return []
|
||
if self._empty and self.used_msg_ids:
|
||
self._empty = False
|
||
if not self._empty and (
|
||
(not self.used_msg_ids and now_msg_ids) # 没有使用过的消息id,但当前有消息id
|
||
or now_msg_ids[-1] == self.used_msg_ids[-1] # 当前最后一条消息id和上次一样
|
||
or not set(now_msg_ids)&set(self.used_msg_ids) # 当前消息id和上次没有交集
|
||
):
|
||
# wxlog.debug('没有新消息')
|
||
return []
|
||
|
||
used_msg_ids_set = set(self.used_msg_ids)
|
||
last_one_msgid = max(
|
||
(x for x in now_msg_ids if x in used_msg_ids_set),
|
||
key=self.used_msg_ids.index, default=None
|
||
)
|
||
new1 = [x for x in now_msg_ids if x not in used_msg_ids_set]
|
||
new2 = now_msg_ids[now_msg_ids.index(last_one_msgid) + 1 :]\
|
||
if last_one_msgid is not None else []
|
||
new = [i for i in new1 if i in new2] if new2 else new1
|
||
USED_MSG_IDS[self.id] = tuple(self.used_msg_ids + tuple(new))[-100:]
|
||
new_controls = [i for i in msg_controls if i.runtimeid in new]
|
||
self.msgbox.MiddleClick()
|
||
return [
|
||
parse_msg(msg_control, self)
|
||
for msg_control
|
||
in new_controls
|
||
if msg_control.ControlTypeName == 'ListItemControl'
|
||
]
|
||
def get_msg_by_id(self, msg_id: str):
|
||
if not self.msgbox.Exists(0):
|
||
return []
|
||
msg_controls = self.msgbox.GetChildren()
|
||
if control_list := [i for i in msg_controls if i.runtimeid == msg_id]:
|
||
return parse_msg(control_list[0], self)
|
||
|
||
def _get_tail_after_nth_match(self, msgs, last_msg, n):
|
||
matches = [
|
||
i for i, msg in reversed(list(enumerate(msgs)))
|
||
if msg.content == last_msg
|
||
]
|
||
if len(matches) >= n:
|
||
wxlog.debug(f'匹配到基准消息:{last_msg}')
|
||
else:
|
||
split_last_msg = last_msg.split(':')
|
||
nickname = split_last_msg[0]
|
||
content = ''.join(split_last_msg[1:])
|
||
matches = [
|
||
i for i, msg in reversed(list(enumerate(msgs)))
|
||
if msg.content == content
|
||
and msg.sender_remark == nickname
|
||
]
|
||
if len(matches) >= n:
|
||
wxlog.debug(f'匹配到基准消息:<{nickname}> {content}')
|
||
else:
|
||
wxlog.debug(f"未匹配到基准消息,以最后一条消息为基准:{msgs[-1].content}")
|
||
matches = [
|
||
i for i, msg in reversed(list(enumerate(msgs)))
|
||
if msg.attr in ('self', 'friend')
|
||
]
|
||
try:
|
||
index = matches[n - 1]
|
||
return msgs[index:]
|
||
except IndexError:
|
||
wxlog.debug(f"未匹配到第{n}条消息,返回空列表")
|
||
return []
|
||
|
||
def get_next_new_msgs(self, count=None, last_msg=None):
|
||
# 1. 消息列表不存在,则返回空列表
|
||
if not self.msgbox.Exists(0):
|
||
wxlog.debug('消息列表不存在,返回空列表')
|
||
return []
|
||
|
||
# 2. 判断是否有新消息按钮,有的话点一下
|
||
load_new_button = self.control.ButtonControl(RegexName=self._lang('re_新消息按钮'))
|
||
if load_new_button.Exists(0):
|
||
self._show()
|
||
wxlog.debug('检测到新消息按钮,点击加载新消息')
|
||
load_new_button.Click()
|
||
time.sleep(0.5)
|
||
msg_controls = self.msgbox.GetChildren()
|
||
USED_MSG_IDS[self.id] = tuple((i.runtimeid for i in msg_controls))
|
||
msgs = [
|
||
parse_msg(msg_control, self)
|
||
for msg_control
|
||
in msg_controls
|
||
if msg_control.ControlTypeName == 'ListItemControl'
|
||
]
|
||
|
||
# 3. 如果有“以下是新消息”标志,则直接返回该标志下的所有消息即可
|
||
index = next((
|
||
i for i, msg in enumerate(msgs)
|
||
if self._lang('以下为新消息') == msg.content
|
||
), None)
|
||
if index is not None:
|
||
wxlog.debug('获取以下是新消息下的所有消息')
|
||
return msgs[index:]
|
||
|
||
# 4. 根据会话列表传入的消息数量和最后一条新消息内容来判断新消息
|
||
if count and last_msg:
|
||
# index = next((
|
||
# i for i, msg in enumerate(msgs[::-1])
|
||
# if last_msg == msg.content
|
||
# ), None)
|
||
|
||
# if index is not None:
|
||
wxlog.debug(f'获取{count}条新消息,基准消息内容为:{last_msg}')
|
||
return self._get_tail_after_nth_match(msgs, last_msg, count)
|
||
|
||
def get_group_members(self):
|
||
self._show()
|
||
roominfoWnd = self._open_chat_more_info()
|
||
return roominfoWnd.get_group_members()
|
||
|
||
|
||
class ChatRoomDetailWnd(BaseUISubWnd):
|
||
_ui_cls_name: str = 'SessionChatRoomDetailWnd'
|
||
|
||
def __init__(self, parent):
|
||
self.parent = parent
|
||
self.root = parent.root
|
||
self.control = self.root.control.Control(ClassName=self._ui_cls_name, searchDepth=1)
|
||
|
||
def _lang(self, text: str) -> str:
|
||
return CHATROOM_DETAIL_WINDOW.get(text, {WxParam.LANGUAGE: text}).get(WxParam.LANGUAGE)
|
||
|
||
def _edit(self, key, value):
|
||
wxlog.debug(f'修改{key}为`{value}`')
|
||
btn = self.control.TextControl(Name=key).GetParentControl().ButtonControl(Name=key)
|
||
if btn.Exists(0):
|
||
roll_into_view(self.control, btn)
|
||
btn.Click()
|
||
else:
|
||
wxlog.debug(f'当前非群聊,无法修改{key}')
|
||
return WxResponse.failure(f'Not a group chat, cannot modify `{key}`')
|
||
while True:
|
||
edit_hwnd_list = [
|
||
i[0]
|
||
for i in GetAllWindowExs(self.control.NativeWindowHandle)
|
||
if i[1] == 'EditWnd'
|
||
]
|
||
if edit_hwnd_list:
|
||
edit_hwnd = edit_hwnd_list[0]
|
||
break
|
||
btn.Click()
|
||
edit_win32 = uia.Win32(edit_hwnd)
|
||
edit_win32.shortcut_select_all()
|
||
edit_win32.send_keys_shortcut('{DELETE}')
|
||
edit_win32.input(value)
|
||
edit_win32.send_keys_shortcut('{ENTER}')
|
||
return WxResponse.success()
|
||
|
||
def get_group_members(self, control=False):
|
||
"""获取群成员"""
|
||
more = self.control.ButtonControl(Name=self._lang('查看更多'), searchDepth=8)
|
||
if more.Exists(0.5):
|
||
more.Click()
|
||
members = [i for i in self.control.ListControl(Name=self._lang('聊天成员')).GetChildren()]
|
||
while members[-1].Name in [self._lang('添加'), self._lang('移出')]:
|
||
members = members[:-1]
|
||
if control:
|
||
return members
|
||
member_names = [i.Name for i in members]
|
||
self.close()
|
||
return member_names
|
||
|
||
class GroupMemberElement:
|
||
def __init__(self, control, parent) -> None:
|
||
self.control = control
|
||
self.parent = parent
|
||
self.root = self.parent.root
|
||
self.nickname = self.control.Name
|
||
|
||
def __repr__(self) -> str:
|
||
return f"<wxauto Group Member Element at {hex(id(self))}>"
|
||
|
||
|
||
class AtMenu(BaseUISubWnd):
|
||
_ui_cls_name = 'ChatContactMenu'
|
||
|
||
def __init__(self, parent):
|
||
self.root = parent.root
|
||
self.control = parent.parent.control.PaneControl(ClassName='ChatContactMenu')
|
||
# self.control.Exists(1)
|
||
|
||
def clear(self, friend):
|
||
if self.exists():
|
||
self.control.SendKeys('{ESC}')
|
||
for _ in range(len(friend)+1):
|
||
self.root.chatbox.editbox.SendKeys('{BACK}')
|
||
|
||
def select(self, friend):
|
||
friend_ = friend.replace(' ', '')
|
||
if self.exists():
|
||
ateles = self.control.ListControl().GetChildren()
|
||
if len(ateles) == 1:
|
||
ateles[0].Click()
|
||
else:
|
||
atele = self.control.ListItemControl(Name=friend)
|
||
if atele.Exists(0):
|
||
roll_into_view(self.control, atele)
|
||
atele.Click()
|
||
else:
|
||
self.clear(friend_)
|
||
else:
|
||
self.clear(friend_) |