nxy/bot/timer.py
jkhsjdhjs e03f5d0a43 add auto reconnect for postgres
This only works on the second database interaction, since psycopg2 only notices that
the connection is gone, when a query is executed.

So in the common case reconnect works as follows:
- some bot method calls a cursor function like .execute(), .fetchone(), etc.
  - this raises an error if the connection is broken
  - if following code then requests a new cursor, this will also fail since psycopg2
    now knows that the connection is gone
  - the error is caught in storage.DBConn.cursor(), a new connection will be set up
    of which a new cursor is yielded
If the error happens in connection.commit() or .rollback() instead we can instantly
reconnect since these methods are wrapped.

So why not wrap the cursor methods as well?
Consider the following example:
A query is the last thing that was executed on a cursor.
The database connection is lost.
Now .fetchone() is called on the cursor.
We could wrap .fetchone() and reconnect, but we'd have to use a new cursor since
cursors are linked to connections. And on this new cursor .fetchone() wouldn't
make any sense, since we haven't executed a query on this cursor.
2020-03-16 21:51:32 +00:00

97 lines
2.9 KiB
Python

# -*- coding: utf-8 -*-
import re
import asyncio
from datetime import datetime
from aiocron import crontab
from docopt import Dict
from irc3.plugins.command import command
from irc3.utils import IrcString
from psycopg2 import Error
from psycopg2.extras import DictRow
from . import DatabasePlugin, Bot
class Timer(DatabasePlugin):
def __init__(self, bot: Bot):
super().__init__(bot)
self.timers = set()
self.set_timers()
crontab('0 * * * *', func=self.set_timers)
@command
def timer(self, mask: IrcString, target: IrcString, args: Dict):
"""Sets a timer, delay can be: s, m, h, d, w, mon, y
%%timer <delay> <message>...
"""
delay = args['<delay>']
message = ' '.join(args['<message>'])
# TODO: allow precise delays
if not re.match(r'\d+[smhdwy]|mon', delay):
return 'Invalid timer delay: {}'.format(delay)
try:
with self.con.cursor() as cur:
cur.execute('''
INSERT INTO
timers (mask, target, message, delay, ends_at)
VALUES
(%s, %s, %s, %s, now() + INTERVAL %s)
RETURNING
*
''', [mask, target, message, delay, delay])
self.con.commit()
asyncio.ensure_future(self.exec_timer(cur.fetchone()))
self.bot.notice(mask.nick, 'Timer in {delay} set: {message}'.format(delay=delay, message=message))
except Error as ex:
self.log.error(ex)
self.con.rollback()
def set_timers(self):
"""Function which queries all timers in the next hour and schedules them."""
self.log.debug('Fetching timers')
with self.con.cursor() as cur:
cur.execute('''
SELECT
*
FROM
timers
WHERE
ends_at >= now()
AND ends_at < now() + INTERVAL '1h'
''')
for timer in cur.fetchall():
asyncio.ensure_future(self.exec_timer(timer))
async def exec_timer(self, timer: DictRow):
"""Sets the actual timer (sleeps until it fires), sends the reminder and deletes the timer from database."""
if timer['id'] in self.timers:
return
self.timers.add(timer['id'])
seconds = (timer['ends_at'] - datetime.now()).total_seconds()
if seconds > 0.0:
await asyncio.sleep(seconds)
self.bot.privmsg(timer['target'], '\x02[Timer]\x02 {nick}: {message} ({delay})'.format(
message=timer['message'],
nick=IrcString(timer['mask']).nick,
delay=timer['delay'],
))
self.timers.remove(timer['id'])
with self.con.cursor() as cur:
cur.execute('''
DELETE FROM
timers
WHERE
id = %s
''', [timer['id']])
self.con.commit()