ReadLater/main.py

175 lines
5.4 KiB
Python

import json
import logging
import os
import re
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from functools import wraps
from pprint import pprint
import telegram
from dotenv import load_dotenv
from telegram import (InlineKeyboardButton, InlineKeyboardMarkup, ParseMode,
Update)
from telegram.ext import (CallbackContext, CallbackQueryHandler,
CommandHandler, MessageHandler, Updater)
from models import Link
load_dotenv()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
link_regex = re.compile(
'((https?):((//)|(\\\\))+([\w\d:#@%/;$()~_?\+-=\\\.&](#!)?)*)', re.DOTALL)
def private_only(f):
@wraps(f)
def cb(self, update: Update, *args, **kwargs):
if update.effective_chat.type != 'private':
return
return f(self, update, *args, **kwargs)
return cb
@dataclass
class BotSettings:
token: str
db: str
class Bot:
def __init__(self, settings: BotSettings):
self.settings = settings
self._init_db()
self.updater = Updater(token=self.settings.token, use_context=True)
self.dispatcher = self.updater.dispatcher
self._register_commands()
self._register_handlers()
def _register_commands(self):
for f in dir(self):
if f.startswith("cmd_"):
self.dispatcher.add_handler(
CommandHandler(f[4:], getattr(self, f)))
def _register_handlers(self):
self.dispatcher.add_handler(MessageHandler(
filters=None,
callback=self.message))
self.dispatcher.add_handler(CallbackQueryHandler(self.update))
@property
def db(self):
return sqlite3.connect(self.settings.db, detect_types=sqlite3.PARSE_DECLTYPES)
def _init_db(self):
with self.db as db:
cur = db.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
link TEXT,
user_id TEXT,
read_at TIMESTAMP,
added_at TIMESTAMP
);
""")
@private_only
def cmd_start(self, update: Update, context: CallbackContext):
context.bot.send_message(chat_id=update.effective_chat.id, text='Hi!')
# TODO: timezones
# TODO: ask to set time or something
@private_only
def cmd_test(self, update: Update, context: CallbackContext):
l = Link(
link="2137",
user_id=update.effective_user.id
)
with self.db as db:
l.create(db)
@private_only
def cmd_unread(self, update: Update, context: CallbackContext):
# TODO: ignore messages from group
user_id = update.effective_user.id
self.send_unread(context.bot, user_id)
def send_unread(self, bot: telegram.Bot, user_id: str):
with self.db as db:
unread = Link.get_unread(db, user_id)
bot.send_message(
user_id, f"*Your unread links as of {datetime.now().isoformat()}:*", parse_mode=ParseMode.MARKDOWN)
for link in unread:
keyboard = [
[InlineKeyboardButton(text='Mark as read',
callback_data=f"mark_as_read:{link.id}"),
InlineKeyboardButton(text='Delete', callback_data=f"delete:{link.id}")]
]
bot.send_message(user_id, link.link,
reply_markup=InlineKeyboardMarkup(keyboard))
# TODO: button to postpone
# TODO: button to mark all as read or postpone
def _natural_count(self, n, singular, plural):
if abs(n) == 1:
return f"{n} {singular}"
return f"{n} {plural}"
@private_only
def message(self, update: Update, context: CallbackContext):
user_id = update.effective_user.id
links = re.findall(link_regex, update.message.text)
with self.db as db:
for link in links:
l = Link(
user_id=user_id,
link=link[0],
)
l.create(db)
context.bot.send_message(
update.effective_chat.id, f"Added {self._natural_count(len(links), 'link', 'links')} to your list.")
@private_only
def update(self, update: Update, context: CallbackContext):
user_id = update.effective_user.id
action, link_id = update.callback_query.data.split(":", 1)
with self.db as db:
l = Link.get(db, user_id, link_id)
if l is None:
context.bot.send_message(
user_id, "Couldn't find the link you were looking for.")
return
if action == "mark_as_read":
l.mark_as_read(db)
context.bot.send_message(
user_id, "Marked 1 link as read.")
update.callback_query.message.delete()
elif action == "delete":
l.delete(db)
update.callback_query.message.delete()
context.bot.send_message(
user_id, "Deleted 1 link.")
def run(self):
self.updater.start_polling()
if __name__ == '__main__':
s = BotSettings(
token=os.environ['READLATER_TOKEN'],
db=os.environ.get('READLATER_DB', './db.sqlite3'),
)
b = Bot(s)
b.run()