import json import logging import os import re import sqlite3 from dataclasses import dataclass from datetime import datetime from functools import wraps from pprint import pprint import telegram from dotenv import load_dotenv from telegram import (InlineKeyboardButton, InlineKeyboardMarkup, ParseMode, Update) from telegram.ext import (CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, Updater) from models import Link load_dotenv() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') link_regex = re.compile( '((https?):((//)|(\\\\))+([\w\d:#@%/;$()~_?\+-=\\\.&](#!)?)*)', re.DOTALL) def private_only(f): @wraps(f) def cb(self, update: Update, *args, **kwargs): if update.effective_chat.type != 'private': return return f(self, update, *args, **kwargs) return cb @dataclass class BotSettings: token: str db: str class Bot: def __init__(self, settings: BotSettings): self.settings = settings self._init_db() self.updater = Updater(token=self.settings.token, use_context=True) self.dispatcher = self.updater.dispatcher self._register_commands() self._register_handlers() def _register_commands(self): for f in dir(self): if f.startswith("cmd_"): self.dispatcher.add_handler( CommandHandler(f[4:], getattr(self, f))) def _register_handlers(self): self.dispatcher.add_handler(MessageHandler( filters=None, callback=self.message)) self.dispatcher.add_handler(CallbackQueryHandler(self.update)) @property def db(self): return sqlite3.connect(self.settings.db, detect_types=sqlite3.PARSE_DECLTYPES) def _init_db(self): with self.db as db: cur = db.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS links ( id INTEGER PRIMARY KEY AUTOINCREMENT, link TEXT, user_id TEXT, read_at TIMESTAMP, added_at TIMESTAMP ); """) @private_only def cmd_start(self, update: Update, context: CallbackContext): context.bot.send_message(chat_id=update.effective_chat.id, text='Hi!') # TODO: timezones # TODO: ask to set time or something @private_only def cmd_test(self, update: Update, context: CallbackContext): l = Link( link="2137", user_id=update.effective_user.id ) with self.db as db: l.create(db) @private_only def cmd_unread(self, update: Update, context: CallbackContext): # TODO: ignore messages from group user_id = update.effective_user.id self.send_unread(context.bot, user_id) def send_unread(self, bot: telegram.Bot, user_id: str): with self.db as db: unread = Link.get_unread(db, user_id) bot.send_message( user_id, f"*Your unread links as of {datetime.now().isoformat()}:*", parse_mode=ParseMode.MARKDOWN) for link in unread: keyboard = [ [InlineKeyboardButton(text='Mark as read', callback_data=f"mark_as_read:{link.id}"), InlineKeyboardButton(text='Delete', callback_data=f"delete:{link.id}")] ] bot.send_message(user_id, link.link, reply_markup=InlineKeyboardMarkup(keyboard)) # TODO: button to postpone # TODO: button to mark all as read or postpone def _natural_count(self, n, singular, plural): if abs(n) == 1: return f"{n} {singular}" return f"{n} {plural}" @private_only def message(self, update: Update, context: CallbackContext): user_id = update.effective_user.id links = re.findall(link_regex, update.message.text) with self.db as db: for link in links: l = Link( user_id=user_id, link=link[0], ) l.create(db) context.bot.send_message( update.effective_chat.id, f"Added {self._natural_count(len(links), 'link', 'links')} to your list.") @private_only def update(self, update: Update, context: CallbackContext): user_id = update.effective_user.id action, link_id = update.callback_query.data.split(":", 1) with self.db as db: l = Link.get(db, user_id, link_id) if l is None: context.bot.send_message( user_id, "Couldn't find the link you were looking for.") return if action == "mark_as_read": l.mark_as_read(db) update.callback_query.message.delete() elif action == "delete": l.delete(db) update.callback_query.message.delete() def run(self): self.updater.start_polling() if __name__ == '__main__': s = BotSettings( token=os.environ['READLATER_TOKEN'], db=os.environ.get('READLATER_DB', './db.sqlite3'), ) b = Bot(s) b.run()