# -*- coding: utf-8 -*- import time from docopt import Dict from irc3.plugins.command import command from irc3.utils import IrcString from . import Plugin class CTCP(Plugin): requires = Plugin.requires + ['irc3.plugins.async'] TIMEOUT = 5 @command async def ping(self, mask: IrcString, target: IrcString, args: Dict): """Sends ping via CTCP to user and sends the time needed %%ping [] """ nick = args.get('', mask.nick) data = await self.bot.ctcp_async(nick, 'PING {}'.format(time.time())) if not data or data['timeout']: reply = 'timeout' elif not data['success']: reply = 'Error: {}'.format(data['reply']) else: delta = time.time() - float(data['reply']) if delta < 1.0: delta *= 1000 unit = 'ms' else: unit = 's' reply = '{0:.3f} {1}'.format(delta, unit) return self._ctcp('ping', nick, reply) @command async def time(self, mask: IrcString, target: IrcString, args: Dict): """Gets the client time from nick via CTCP %%time [] """ return await self.ctcp('time', mask, args) @command async def ver(self, mask: IrcString, target: IrcString, args: Dict): """Gets the client version from nick via CTCP %%ver [] """ return await self.ctcp('version', mask, args) async def ctcp(self, name: str, mask: IrcString, args: Dict): nick = args.get('', mask.nick) data = await self.bot.ctcp_async(nick, name.upper(), self.TIMEOUT) if not data or data['timeout']: reply = 'timeout' elif not data['success']: reply = 'Error: {}'.format(data['reply']) else: reply = data['reply'] return self._ctcp(name, nick, reply) # noinspection PyMethodMayBeStatic def _ctcp(self, name: str, nick: str, reply: str): return '\x02[{}]\x02 {}: {}'.format(name.upper(), nick, reply)