去落格博客阅读完整排版的我给落格输入法的用户群添加了个自动回复机器人
如题图,我给落格输入法的用户群弄了个机器人,随着落格输入法的用户越来越多,一些慕名而来的新手也多了。很多常见问题重复提问,搞得人焦头烂额,如果能有个机器人,就像 Siri 那样,让它自动捕捉那些关键字然后回复这些用户,岂不美哉?这样用户能够得到精心编辑的答案,而我也能空余出更多的时间去写wan代you码xi。
当然,这样的机器人我是见过的,所以我想一定有现成的东西,显然,搜索之下我先找到了这个:https://github.com/zeruniverse/QQRobot 是作者实验学习做的一个 QQ 小黄鸡,基于 Python,简单方便,我看中它的地方在于能在 linux 上跑,也就是可以跑在我的 vps 上。
不过这个用起来不太方便,它是通过二维码扫描来登录的,而我的vps又没有图形端……所以我还得把那个二维码下载下来,太费事了。后来我通过它的说明链接,找到了这个 https://github.com/zeruniverse/QBotWebWrap
他集成了上文所说的 QQRobot,可以在网页端控制啦!不过可惜的是,配置很复杂,php插件要求的也多,重点是并不稳定,几乎一上线就掉了。
琢磨了这么一通之后,我也明白了这些挂机工具的原理,其实就是大家破解了 web QQ 的协议(就是那个 smart qq),虽然高级功能做不到,但聊天的的功能还是妥妥的了,比如能够监控消息,然后发送给第三方的人工智能接口获得反馈再给对方发回去这样。这就是智能聊天机器人的原理啦!
后来,我又找到了一个比较活跃的项目:https://github.com/Yinzo/SmartQQBot
这个的原理与上文的 QQRobot 一样,但一直有人维护,也很活跃,项目稳定。没什么说的,根据需求安装包后就能启动,记得防火墙开 8888 端口,然后可以用
python run.py --no-gui --http启动,如此就可以了,然后打开vps对应的页面,同样扫码登录。
SmartQQBot 自带了几个插件比如天气以及一个群内可调戏的自动回复插件,你可以用固定的语法教它,然后就会回复你这样,这虽然能够稍微达到我的目的,但还不够。作为一个专门的问答机器人,怎么能让用户随便地增删呢对吧? SmartQQBot 的另外一个好处是以插件形式来增加功能的,它自带了几个插件,比如 Satoru ,我们不用这个,把它关掉。项目介绍里没有说明,其实还自带了一个叫做 tucao 的插件,这个插件与 Satoru 不同的是,它支持在用户发言的语句中匹配关键词而不是全文匹配!这一点很不错,如果你愿意,可以做到很自然地回复不是吗?
所以,我们来改这个,SmartQQBot 支持三种消息标签:
from smart_qq_bot.signals import ( on_all_message, //包括了群消息和私聊消息 on_group_message, //群消息 on_private_message, //私聊消息 )
通过给你的函数赋予不同的标签,可以让你的插件对不同的消息作出响应。
首先是改改消息收发,我把群消息的权限减少,不再检测控制语法,只做回复,同时改为也接受私聊,这有我就可以自己做测试了。所以代码大概是这样:
@on_all_message(name='loginput[学习遗忘]') def loginput(msg, bot): global core reply = bot.reply_msg(msg, return_function=True) group_id = "16788008" core.load(group_id) for key in list(core.loginput_dict[group_id].keys()): if str(key) in msg.content and core.loginput_dict[group_id][key]: logger.info("RUNTIMELOG loginput pattern detected, replying...") reply(random.choice(core.loginput_dict[group_id][key])) return True return False
由于 tucao 这个插件是针对不同群的群号做处理的,这里我直接写成了固定的落格输入法用户群的群号,不隐藏了,大家随便看 :)
这里要注意,虽然本身程序传进来的群号是 int ,但 tucao 都是按照 str 处理的,所以这里要写成字符串。
然后是私聊的控制功能,这里我除了修改了 tucao 自带的显示列表功能外(列表大了根本发不了,太长,我改为了显示数量),还加入了一个重新加载的功能,这样就方便我更改后台数据统一添加内容了:
@on_private_message(name='loginput[管理]') def current_loginput_list(msg, bot): # webqq接受的消息会以空格结尾 global core reply = bot.reply_msg(msg, return_function=True) group_id = "16788008" match = re.match(r'^(?:!|!)(learn|delete|list|reload)(?:\s?){(.+)}(?:\s?){(.+)}', msg.content) if match: core.load(group_id) logger.info("RUNTIMELOG loginput command detected.") command = str(match.group(1)).decode('utf8') key = str(match.group(2)).decode('utf8') value = str(match.group(3)).decode('utf8') if command == 'learn': if group_id not in core.loginput_dict: core.load(group_id) if key in core.loginput_dict[group_id] and value not in core.loginput_dict[group_id][key]: core.loginput_dict[group_id][key].append(value) else: core.loginput_dict[group_id][key] = [value] reply("学习成功!快对我说" + str(key) + "试试吧!") core.save(group_id) return True elif command == 'reload' : loginput_file_path = LOGINPUT_PATH + str(group_id) + ".loginput" try: core.loginput_dict[str(group_id)] = core.readdatafromfile(loginput_file_path) logger.info("RUNTIMELOG loginput loaded. Now loginput list: {0}".format(str(core.loginput_dict))) except: core.loginput_dict[str(group_id)] = dict() logger.info("RUNTIMELOG loginput file is empty.") reply("重新加载完成!") return True elif command == 'delete': if key in core.loginput_dict[group_id] and core.loginput_dict[group_id][key].count(value): core.loginput_dict[group_id][key].remove(value) reply("呜呜呜我再也不说" + str(value) + "了") core.save(group_id) return True elif command == 'list': result = 0 for key in list(core.loginput_dict[group_id].keys()): result += 1 logger.info("RUNTIMELOG Replying the list of loginput for group {}".format(group_id)) reply("总共有关键字" + result + "条") return True return
其实关键字的检测就是来自于正则表达式,顺着表达式的格式就能猜得出来,添加一条回复就是这样:
!learn {test}{test reply!}。
那么删除呢?格式一样,由于它可以根据同一个关键字进行多条随机回复,所以如果删除,一定要对应,不能留空:
!delete {test}{test reply!}
但由于格式要固定,所以查看数量以及重载都是需要用这样的格式,只不过括号内的内容是无效的,随便写:
!reload {test}{test reply!}
最后,由于 tucao 本身用的是 Python 的序列化,保存文件不利于编辑,我们把它改为 JSON 格式,这就需要一个额外的 json 包。
由于我平时使用的都是 Python 3,所以这里我顺手把其他项目里的代码拿过来用了,再添加:
import json import io def writejson2file(self,obj, filename): with io.open(filename, 'w', encoding='utf8') as outfile: data = json.dumps(obj, indent=4, sort_keys=True, ensure_ascii=False) outfile.write(data) def readdatafromfile(self,filename): with io.open(filename, encoding='utf8') as outfile: return json.load(outfile)
这样就可以改保存和加载的代码了:
def save(self, group_id): """ :type group_id: int, 用于保存指定群的吐槽存档 """ global LOGINPUT_PATH try: loginput_file_path = LOGINPUT_PATH + str(group_id) + ".loginput" self.writejson2file(self.loginput_dict[str(group_id)],loginput_file_path) logger.info("RUNTIMELOG loginput saved. Now loginput list: {0}".format(str(self.loginput_dict))) except Exception: logger.error("RUNTIMELOG Fail to save loginput.") raise IOError("Fail to save loginput.") def load(self, group_id): """ :type group_id: int, 用于读取指定群的吐槽存档 """ global LOGINPUT_PATH if str(group_id) in set(self.loginput_dict.keys()): return loginput_file_path = LOGINPUT_PATH + str(group_id) + ".loginput" if not os.path.isdir(LOGINPUT_PATH): os.makedirs(LOGINPUT_PATH) if not os.path.exists(loginput_file_path): with open(loginput_file_path, "w") as tmp: tmp.close() try: self.loginput_dict[str(group_id)] = self.readdatafromfile(loginput_file_path) logger.info("RUNTIMELOG loginput loaded. Now loginput list: {0}".format(str(self.loginput_dict))) except : self.loginput_dict[str(group_id)] = dict() logger.info("RUNTIMELOG loginput file is empty.")
最后,去配置里选择加载这个插件:
vi config/plugin.json { "plugin_packages": [], "plugin_on": [ "basic", "manager", "weather", "loginput" ] }
然后把修改好的插件放到
smart_qq_plugins里就可以了。
如果你需要,这个文件的所有代码你可以直接在这里复制,新建一个
loginput.py并写入即可:
# -*- coding: utf-8 -*- import re import os import random import json import io from smart_qq_bot.logger import logger from smart_qq_bot.signals import ( on_all_message, on_private_message, ) LOGINPUT_PATH = 'smart_qq_plugins/loginput/' class LogInputCore(object): def __init__(self): self.loginput_dict = dict() def writejson2file(self,obj, filename): with io.open(filename, 'w', encoding='utf8') as outfile: data = json.dumps(obj, indent=4, sort_keys=True, ensure_ascii=False) outfile.write(data) def readdatafromfile(self,filename): with io.open(filename, encoding='utf8') as outfile: return json.load(outfile) def save(self, group_id): """ :type group_id: int, 用于保存指定群的吐槽存档 """ global LOGINPUT_PATH try: loginput_file_path = LOGINPUT_PATH + str(group_id) + ".loginput" self.writejson2file(self.loginput_dict[str(group_id)],loginput_file_path) logger.info("RUNTIMELOG loginput saved. Now loginput list: {0}".format(str(self.loginput_dict))) except Exception: logger.error("RUNTIMELOG Fail to save loginput.") raise IOError("Fail to save loginput.") def load(self, group_id): """ :type group_id: int, 用于读取指定群的吐槽存档 """ global LOGINPUT_PATH if str(group_id) in set(self.loginput_dict.keys()): return loginput_file_path = LOGINPUT_PATH + str(group_id) + ".loginput" if not os.path.isdir(LOGINPUT_PATH): os.makedirs(LOGINPUT_PATH) if not os.path.exists(loginput_file_path): with open(loginput_file_path, "w") as tmp: tmp.close() try: self.loginput_dict[str(group_id)] = self.readdatafromfile(loginput_file_path) logger.info("RUNTIMELOG loginput loaded. Now loginput list: {0}".format(str(self.loginput_dict))) except : self.loginput_dict[str(group_id)] = dict() logger.info("RUNTIMELOG loginput file is empty.") core = LogInputCore() @on_all_message(name='loginput[学习遗忘]') def loginput(msg, bot): global core reply = bot.reply_msg(msg, return_function=True) group_id = "16788008" core.load(group_id) for key in list(core.loginput_dict[group_id].keys()): if str(key) in msg.content and core.loginput_dict[group_id][key]: logger.info("RUNTIMELOG loginput pattern detected, replying...") reply(random.choice(core.loginput_dict[group_id][key])) return True return False @on_private_message(name='loginput[管理]') def current_loginput_list(msg, bot): # webqq接受的消息会以空格结尾 global core reply = bot.reply_msg(msg, return_function=True) group_id = "16788008" match = re.match(r'^(?:!|!)(learn|delete|list|reload)(?:\s?){(.+)}(?:\s?){(.+)}', msg.content) if match: core.load(group_id) logger.info("RUNTIMELOG loginput command detected.") command = str(match.group(1)).decode('utf8') key = str(match.group(2)).decode('utf8') value = str(match.group(3)).decode('utf8') if command == 'learn': if group_id not in core.loginput_dict: core.load(group_id) if key in core.loginput_dict[group_id] and value not in core.loginput_dict[group_id][key]: core.loginput_dict[group_id][key].append(value) else: core.loginput_dict[group_id][key] = [value] reply("学习成功!快对我说" + str(key) + "试试吧!") core.save(group_id) return True elif command == 'reload' : loginput_file_path = LOGINPUT_PATH + str(group_id) + ".loginput" try: core.loginput_dict[str(group_id)] = core.readdatafromfile(loginput_file_path) logger.info("RUNTIMELOG loginput loaded. Now loginput list: {0}".format(str(core.loginput_dict))) except: core.loginput_dict[str(group_id)] = dict() logger.info("RUNTIMELOG loginput file is empty.") reply("重新加载完成!") return True elif command == 'delete': if key in core.loginput_dict[group_id] and core.loginput_dict[group_id][key].count(value): core.loginput_dict[group_id][key].remove(value) reply("呜呜呜我再也不说" + str(value) + "了") core.save(group_id) return True elif command == 'list': result = 0 for key in list(core.loginput_dict[group_id].keys()): result += 1 logger.info("RUNTIMELOG Replying the list of loginput for group {}".format(group_id)) reply("总共有关键字" + result + "条") return True return