A simple story about how I felt ashamed to constantly ask the groupmates for missing information and I decided to make our lives a little easier.
I suppose that many of my peers are familiar with the situation when in the general chat, where important information is often flashed, there are about 30 active interlocutors who constantly load Vkontakte databases with their messages. Under these conditions, it is unlikely that everyone will see this important information. It happens to me. A year ago, it was decided to correct this misunderstanding.
Those who are ready not to resent the next article about the bot, please under the cat.
Since I am a first level student, examples will be related to this topic.
So, there is a task: to make the transfer of information from the headman to the students convenient both for the headman and for the students. Thanks to the relatively new features of Vkontakte (namely, personal communications of communities), the solution caught my eye immediately. A bot sitting in a group should receive messages from the headman (the headman if there are many groups on the stream) and send them to interested persons (students).
The task is set, proceed.
We will need:
Also, before reading, I suggest refreshing the Observer patterns ( habr , wiki ) and façade ( habr , wiki )
Part 1. "Nice to meet you, comrade bot."
First you need to teach our bot to understand itself as a community. Create a class called Group. Let the session object and the representative (Proxy) object of the database be taken as arguments.
class Group(BaseCommunicateVK): def __init__(self, vksession, storage): super().__init__(vksession) self.storage = storage
The decision to bring this functionality into a separate class is explained by the fact that in the future, one of you may decide to supplement the bot with some other Vkontakte functionality.
Well, to relieve the abstraction of the community, of course.
class BaseCommunicateVK: longpoll = None def __init__(self, vksession): self.session = vksession self.api = vksession.get_api() if BaseCommunicateVK.longpoll is None: BaseCommunicateVK.longpoll = VkLongPoll(self.session) def get_api(self): return self.api def get_longpoll(self): return self.longpoll def method(self, func, args): return self.api.method(func, args) @staticmethod def create_session(token=None, login=None, password=None, api_v='5.85'): try: if token: session = vk_api.VkApi(token=token, api_version=api_v) elif login and password: session = vk_api.VkApi(login, password, api_version=api_v) else: raise vk_api.AuthError("Define login and password or token.") return session except vk_api.ApiError as error: logging.info(error) def get_last_message(self, user_id): return self.api.messages.getHistory( peer_id=user_id, count=1)["items"][0] @staticmethod def get_attachments(last_message): if not last_message or "attachments" not in last_message: return "" attachments = last_message["attachments"] attach_strings = [] for attach in attachments: attach_type = attach["type"] attach_info = attach[attach_type] attach_id = attach_info["id"] attach_owner_id = attach_info["owner_id"] if "access_key" in attach_info: access_key = attach_info["access_key"] attach_string = "{}{}_{}_{}".format(attach_type, attach_owner_id, attach_id, access_key) else: attach_string = "{}{}_{}".format(attach_type, attach_owner_id, attach_id) attach_strings.append(attach_string) return ",".join(attach_strings) @staticmethod def get_forwards(attachments, last_message): if not attachments or "fwd_count" not in attachments: return "" if len(last_message["fwd_messages"]) == int(attachments["fwd_count"]): return last_message["id"] def send(self, user_id, message, attachments=None, **kwargs): send_to = int(user_id) if "last_message" in kwargs: last_message = kwargs["last_message"] else: last_message = None p_attachments = self.get_attachments(last_message) p_forward = self.get_forwards(attachments, last_message) if message or p_attachments or p_forward: self.api.messages.send( user_id=send_to, message=message, attachment=p_attachments, forward_messages=p_forward) if destroy: accept_msg_id = self.api.messages \ .getHistory(peer_id=user_id, count=1) \ .get('items')[0].get('id') self.delete(accept_msg_id, destroy_type=destroy_type) def delete(self, msg_id, destroy_type=1): self.api.messages.delete(message_id=msg_id, delete_for_all=destroy_type)
Create a method for updating community members. Immediately divide them into administrators and members and save in the database.
def update_members(self): fields = 'domain, sex' admins = self.api.groups.getMembers(group_id=self.group_id, fields=fields, filter='managers') self.save_members(self._configure_users(admins)) members = self.api.groups.getMembers(group_id=self.group_id, fields=fields) self.save_members(self._configure_users(members)) return self def save_members(self, members): self.storage.update(members) @staticmethod def _configure_users(items, exclude=None): if exclude is None: exclude = [] users = [] for user in items.get('items'): if user.get('id') not in exclude: member = User() member.configure(**user) users.append(member) return users
This class should also be able to send messages to the addressees, so the following method is in the studio. In the parameters: the list of recipients, text messages and applications. All this business is launched in a separate thread so that the bot can receive messages from other participants.
Messages are received in synchronous mode, so with an increase in the number of active clients, the response speed will obviously diminish.
def broadcast(self, uids, message, attachments=None, **kwargs): report = BroadcastReport() def send_all(): users_ids = uids if not isinstance(users_ids, list): users_ids = list(users_ids) report.should_be_sent = len(users_ids) for user_id in users_ids: try: self.send(user_id, message, attachments, **kwargs) if message or attachments: report.sent += 1 except vk_api.VkApiError as error: report.errors.append('vk.com/id{}: {}'.format(user_id, error)) except ValueError: continue for uid in self.get_member_ids(admins=True, moders=True): self.send(uid, str(report)) broadcast_thread = Thread(target=send_all) broadcast_thread.start() broadcast_thread.join()
class BroadcastReport: def __init__(self): self.should_be_sent = 0 self.sent = 0 self.errors = [] def __str__(self): res = "# #" res += "\n: {} ".format(self.should_be_sent) res += "\n: {} ".format(self.sent) if self.errors: res += "\n:" for i in self.errors: res += "\n- {}".format(i) return res
On this, seemingly, the abstraction of the group is over. We met with all the community members, now we need to learn how to understand them.
Part 2. "Psh ... welcome .."
Make the bot listen to all messages from members of our community.
To do this, create a class HandHandler, which will do it
In the parameters:
class ChatHandler(Handler): def __init__(self, group_manager, command_observer): super().__init__() self.longpoll = group_manager.get_longpoll() self.group = group_manager self.api = group_manager.get_api() self.command_observer = command_observer
Next, in fact, we listen to messages from users and recognize the commands.
def listen(self): try: for event in self.longpoll.listen(): if event.user_id and event.type == VkEventType.MESSAGE_NEW and event.to_me: self.group.api.messages.markAsRead(peer_id=event.user_id) self.handle(event.user_id, event.text, event.attachments, message_id=event.message_id) except ConnectionError: logging.error("I HAVE BEEN DOWNED AT {}".format(datetime.datetime.today())) self.longpoll.update_longpoll_server() def handle(self, user_id, message, attachments, **kwargs): member = self.group.get_member(user_id) self.group.update_members() self.command_observer.execute(member, message, attachments, self.group, **kwargs) def run(self): self.listen()
Part 3. "What did you write about my ..?"
A separate subsystem implemented through the "Observer" pattern deals with the recognition of commands.
Attention, CommandObserver:
class CommandObserver(AbstractObserver): def execute(self, member, message, attachments, group, **kwargs): for command in self.commands: for trigger in command.triggers: body = command.get_body(trigger, message) if body is not None: group.api.messages.setActivity(user_id=member.id, type="typing") if command.system: kwargs.update({"trigger": trigger, "commands": self.commands}) else: kwargs.update({"trigger": trigger}) return command.proceed(member, body, attachments, group, **kwargs)
Again, the imposition is made for future possible expansion.
class AbstractObserver(metaclass=ABCMeta): def __init__(self): self.commands = [] def add(self, *args): for arg in args: self.commands.append(arg) @abstractmethod def execute(self, *args, **kwargs): pass
But what will this observer recognize?
So we got to the most interesting - the team.
Each command is an independent class, descendant from the base Command class.
All that is required of the command is to run the proceed () method if its keyword is found at the beginning of the user's message. Command keywords are defined in the command class triggers variable (string or list of strings)
class Command(metaclass=ABCMeta): def __init__(self): self.triggers = [] self.description = "Empty description." self.system = False self.privilege = False self.activate_times = [] self.activate_days = set() self.autostart_func = self.proceed def proceed(self, member, message, attachments, group, **kwargs): raise NotImplementedError() @staticmethod def get_body(kw, message): if not isinstance(kw, list): kw = [kw, ] for i in kw: reg = '^ *(\\{}) *'.format(i) if re.search(reg, message): return re.sub(reg, '', message).strip(' ')
As can be seen from the signature of the proceed () method, each team receives as input a link to the instance of a group member, its message (no longer a keyword), applications, and a link to the group instance. That is, all interaction with a member of the group falls on the shoulders of the team. I think this is the most correct decision, since it is thus possible to create a shell (Shell) for greater interactivity.
(In truth, for this you will need to either make an asynchronous, because the processing is synchronous, or each received message is processed in a new thread, which is not profitable)
Examples of command implementation:
class BroadcastCommand(Command): def __init__(self): super().__init__() self.triggers = ['.mb'] self.privilege = True self.description = " ." def proceed(self, member, message, attachments, group, **kwargs): if member.id not in group.get_member_ids(admins=True, editors=True): group.send(member.id, "You cannot do this ^_^") return True last_message = group.get_last_message(member.id) group.broadcast(group.get_member_ids(), message, attachments, last_message=last_message, **kwargs) return True
class HelpCommand(Command): def __init__(self): super().__init__() self.commands = [] self.triggers = ['.h', '.help'] self.system = True self.description = " ." def proceed(self, member, message, attachments, group, **kwargs): commands = kwargs["commands"] help = " :\n\n" admins = group.get_member_ids(admins=True, moders=True) i = 0 for command in commands: if command.privilege and member.id not in admins: continue help += "{}) {}\n\n".format(i + 1, command.name()) i += 1 group.send(member.id, help) return True
Part 4. "We are one big team."
Now all these modules and handlers need to be combined and configured.
One more class please!
Create a facade that will customize our bot.
class VKManage: def __init__(self, token=None, login=None, password=None): self.session = BaseCommunicateVK.create_session(token, login, password, api_version) self.storage = DBProxy(DatabaseORM) self.group = Group(self.session, self.storage).setup().update_members() self.chat = ChatHandler(self.group, CommandObserver.get_observer()) def start(self): self.chat.run() def get_command(self, command_name): return { " ": BroadcastCommand(), " ": AdminBroadcastCommand(), "": HelpCommand(), " ": SkippedLectionsCommand(), "": TopicTimetableCommand().setup_account(self.bot.api), }.get(command_name) def connect_command(self, command_name): command = self.get_command(str(command_name).lower()) if command: self.chat.command_observer.add(command) return self def connect_commands(self, command_names): for i in command_names.split(','): self.connect_command(i.strip()) return self
The last stage is the launch. Always the nastiest, because any surprise can come out. Not this time.
project_path is imported from the settings module in the project root.
if __name__ == '__main__': config = ConfigParser(project_path) VKManage(token=config['token'], login=config['login'], password=config['password'])\ .connect_commands(", , , ")\ .start()
This seems to be all.
At the moment, this program has benefited at least three groups and, I hope, will bring you too.
You can deploy for free on Heroku, but that's another story.
References:
Source: https://habr.com/ru/post/428808/
All Articles