ReadLater/main.py

140 lines
4.1 KiB
Python

from functools import wraps
import logging
import os
import re
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from pprint import pprint
from dotenv import load_dotenv
from telegram import Update, user
import telegram
from telegram.ext import (CallbackContext, CommandHandler, MessageHandler,
Updater)
from models import Link
load_dotenv()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
link_regex = re.compile(
'((https?):((//)|(\\\\))+([\w\d:#@%/;$()~_?\+-=\\\.&](#!)?)*)', re.DOTALL)
def private_only(f):
@wraps(f)
def cb(update: Update, *args, **kwargs):
if update.effective_chat.type != 'private':
return
return f(update, *args, **kwargs)
return cb
@dataclass
class BotSettings:
token: str
db: str
class Bot:
def __init__(self, settings: BotSettings):
self.settings = settings
self._init_db()
self.updater = Updater(token=self.settings.token, use_context=True)
self.dispatcher = self.updater.dispatcher
self._register_commands()
self._register_handlers()
def _register_commands(self):
for f in dir(self):
if f.startswith("cmd_"):
self.dispatcher.add_handler(
CommandHandler(f[4:], private_only(getattr(self, f))))
def _register_handlers(self):
self.dispatcher.add_handler(MessageHandler(
filters=None,
callback=self.message))
@property
def db(self):
return sqlite3.connect(self.settings.db, detect_types=sqlite3.PARSE_DECLTYPES)
def _init_db(self):
with self.db as db:
cur = db.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
link TEXT,
user_id TEXT,
read_at TIMESTAMP,
added_at TIMESTAMP
);
""")
def cmd_start(self, update: Update, context: CallbackContext):
context.bot.send_message(chat_id=update.effective_chat.id, text='Hi!')
# TODO: timezones
# TODO: ask to set time or something
def cmd_test(self, update: Update, context: CallbackContext):
l = Link(
link="2137",
user_id=update.effective_user.id
)
with self.db as db:
l.create(db)
def cmd_unread(self, update: Update, context: CallbackContext):
# TODO: ignore messages from group
user_id = update.effective_user.id
self.send_unread(context.bot, user_id)
def send_unread(self, bot: telegram.Bot, user_id: str):
with self.db as db:
unread = Link.get_unread(db, user_id)
bot.send_message(
user_id, f"**Your unread links as of {datetime.now().isoformat()}:", parse_mode="MARKDOWN")
for link in unread:
bot.send_message(user_id, link.link)
# TODO: button to read or postpone
# TODO: button to mark all as read or postpone
def _natural_count(self, n, singular, plural):
if abs(n) == 1:
return f"{n} {singular}"
return f"{n} {plural}"
@private_only
def message(self, update: Update, context: CallbackContext):
if update.effective_chat.type != 'private':
return
user_id = update.effective_user.id
links = re.findall(link_regex, update.message.text)
with self.db as db:
for link in links:
l = Link(
user_id=user_id,
link=link[0],
)
l.create(db)
context.bot.send_message(
update.effective_chat.id, f"Added {self._natural_count(len(links), 'link', 'links')} to your list.")
def run(self):
self.updater.start_polling()
if __name__ == '__main__':
s = BotSettings(
token=os.environ['READLATER_TOKEN'],
db=os.environ.get('READLATER_DB', './db.sqlite3'),
)
b = Bot(s)
b.run()