Files
wechat_crawler/wxauto/msgs/base.py

185 lines
5.6 KiB
Python
Raw Normal View History

from wxauto import uiautomation as uia
from wxauto.logger import wxlog
from wxauto.param import (
WxResponse,
WxParam,
PROJECT_NAME
)
from wxauto.ui.component import (
CMenuWnd,
SelectContactWnd
)
from wxauto.utils.tools import roll_into_view
from wxauto.languages import *
from typing import (
Dict,
List,
Union,
TYPE_CHECKING
)
from hashlib import md5
import time
if TYPE_CHECKING:
from wxauto.ui.chatbox import ChatBox
def truncate_string(s: str, n: int=8) -> str:
s = s.replace('\n', '').strip()
return s if len(s) <= n else s[:n] + '...'
class Message:...
class BaseMessage(Message):
type: str = 'base'
attr: str = 'base'
control: uia.Control
def __init__(
self,
control: uia.Control,
parent: "ChatBox",
):
self.control = control
self.parent = parent
self.root = parent.root
self.content = self.control.Name
self.id = self.control.runtimeid
self.sender = self.attr
self.sender_remark = self.attr
def __repr__(self):
cls_name = self.__class__.__name__
content = truncate_string(self.content)
return f"<{PROJECT_NAME} - {cls_name}({content}) at {hex(id(self))}>"
@property
def message_type_name(self) -> str:
return self.__class__.__name__
def chat_info(self) -> Dict:
if self.control.Exists(0):
return self.parent.get_info()
def _lang(self, text: str) -> str:
return MESSAGES.get(text, {WxParam.LANGUAGE: text}).get(WxParam.LANGUAGE)
def roll_into_view(self) -> WxResponse:
if roll_into_view(self.control.GetParentControl(), self.control, equal=True) == 'not exist':
wxlog.warning('消息目标控件不存在,无法滚动至显示窗口')
return WxResponse.failure('消息目标控件不存在,无法滚动至显示窗口')
return WxResponse.success('成功')
@property
def info(self) -> Dict:
_info = self.parent.get_info().copy()
_info['class'] = self.message_type_name
_info['id'] = self.id
_info['type'] = self.type
_info['attr'] = self.attr
_info['content'] = self.content
return _info
class HumanMessage(BaseMessage):
attr = 'human'
def __init__(
self,
control: uia.Control,
parent: "ChatBox",
):
super().__init__(control, parent)
self.head_control = self.control.ButtonControl(searchDepth=2)
def roll_into_view(self) -> WxResponse:
if roll_into_view(self.control.GetParentControl(), self.head_control, equal=True) == 'not exist':
return WxResponse.failure('消息目标控件不存在,无法滚动至显示窗口')
return WxResponse.success('成功')
def click(self):
self.roll_into_view()
self.head_control.Click(x=self._xbias)
def right_click(self):
self.roll_into_view()
self.head_control.Click(x=-self._xbias)
self.head_control.RightClick(x=self._xbias)
def select_option(self, option: str, timeout=None) -> WxResponse:
self.root._show()
def _select_option(self, option):
if not (roll_result := self.roll_into_view()):
return roll_result
self.right_click()
menu = CMenuWnd(self.root)
return menu.select(item=option)
if timeout:
t0 = time.time()
while True:
if (time.time() - t0) > timeout:
return WxResponse(False, '引用消息超时')
if quote_result := _select_option(self, option):
return quote_result
else:
return _select_option(self, option)
def quote(
self, text: str,
at: Union[List[str], str] = None,
timeout: int = 3
) -> WxResponse:
"""引用消息
Args:
text (str): 引用内容
at (List[str], optional): @用户列表
timeout (int, optional): 超时时间单位为秒若为None则不启用超时设置
Returns:
WxResponse: 调用结果
"""
if not self.select_option('引用', timeout=timeout):
wxlog.debug(f"当前消息无法引用:{self.content}")
return WxResponse(False, '当前消息无法引用')
if at:
self.parent.input_at(at)
return self.parent.send_text(text)
def reply(
self, text: str,
at: Union[List[str], str] = None
) -> WxResponse:
"""引用消息
Args:
text (str): 回复内容
at (List[str], optional): @用户列表
timeout (int, optional): 超时时间单位为秒若为None则不启用超时设置
Returns:
WxResponse: 调用结果
"""
if at:
self.parent.input_at(at)
return self.parent.send_text(text)
def forward(self, targets: Union[List[str], str], timeout: int = 3) -> WxResponse:
"""转发消息
Args:
targets (Union[List[str], str]): 目标用户列表
timeout (int, optional): 超时时间单位为秒若为None则不启用超时设置
Returns:
WxResponse: 调用结果
"""
if not self.select_option('转发', timeout=timeout):
return WxResponse(False, '当前消息无法转发')
select_wnd = SelectContactWnd(self)
return select_wnd.send(targets)