send unread links with buttons to mark as read or delete

This commit is contained in:
Wojciech Kwolek 2022-01-09 05:55:10 +01:00
parent 5d7698462a
commit a8014e363c
2 changed files with 54 additions and 23 deletions

27
main.py
View File

@ -1,3 +1,4 @@
import json
from functools import wraps from functools import wraps
import logging import logging
import os import os
@ -8,7 +9,7 @@ from datetime import datetime
from pprint import pprint from pprint import pprint
from dotenv import load_dotenv from dotenv import load_dotenv
from telegram import Update, user from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ParseMode
import telegram import telegram
from telegram.ext import (CallbackContext, CommandHandler, MessageHandler, from telegram.ext import (CallbackContext, CommandHandler, MessageHandler,
Updater) Updater)
@ -25,12 +26,13 @@ link_regex = re.compile(
def private_only(f): def private_only(f):
@wraps(f) @wraps(f)
def cb(update: Update, *args, **kwargs): def cb(self, update: Update, *args, **kwargs):
if update.effective_chat.type != 'private': if update.effective_chat.type != 'private':
return return
return f(update, *args, **kwargs) return f(self, update, *args, **kwargs)
return cb return cb
@dataclass @dataclass
class BotSettings: class BotSettings:
token: str token: str
@ -51,7 +53,7 @@ class Bot:
for f in dir(self): for f in dir(self):
if f.startswith("cmd_"): if f.startswith("cmd_"):
self.dispatcher.add_handler( self.dispatcher.add_handler(
CommandHandler(f[4:], private_only(getattr(self, f)))) CommandHandler(f[4:], getattr(self, f)))
def _register_handlers(self): def _register_handlers(self):
self.dispatcher.add_handler(MessageHandler( self.dispatcher.add_handler(MessageHandler(
@ -75,11 +77,13 @@ class Bot:
); );
""") """)
@private_only
def cmd_start(self, update: Update, context: CallbackContext): def cmd_start(self, update: Update, context: CallbackContext):
context.bot.send_message(chat_id=update.effective_chat.id, text='Hi!') context.bot.send_message(chat_id=update.effective_chat.id, text='Hi!')
# TODO: timezones # TODO: timezones
# TODO: ask to set time or something # TODO: ask to set time or something
@private_only
def cmd_test(self, update: Update, context: CallbackContext): def cmd_test(self, update: Update, context: CallbackContext):
l = Link( l = Link(
link="2137", link="2137",
@ -88,6 +92,7 @@ class Bot:
with self.db as db: with self.db as db:
l.create(db) l.create(db)
@private_only
def cmd_unread(self, update: Update, context: CallbackContext): def cmd_unread(self, update: Update, context: CallbackContext):
# TODO: ignore messages from group # TODO: ignore messages from group
user_id = update.effective_user.id user_id = update.effective_user.id
@ -98,10 +103,16 @@ class Bot:
unread = Link.get_unread(db, user_id) unread = Link.get_unread(db, user_id)
bot.send_message( bot.send_message(
user_id, f"**Your unread links as of {datetime.now().isoformat()}:", parse_mode="MARKDOWN") user_id, f"*Your unread links as of {datetime.now().isoformat()}:*", parse_mode=ParseMode.MARKDOWN)
for link in unread: for link in unread:
bot.send_message(user_id, link.link) keyboard = [
# TODO: button to read or postpone [InlineKeyboardButton(text='Mark as read',
callback_data=f"mark_as_read:{link.id}"),
InlineKeyboardButton(text='Delete', callback_data=f"delete:{link.id}")]
]
bot.send_message(user_id, link.link,
reply_markup=InlineKeyboardMarkup(keyboard))
# TODO: button to postpone
# TODO: button to mark all as read or postpone # TODO: button to mark all as read or postpone
def _natural_count(self, n, singular, plural): def _natural_count(self, n, singular, plural):
@ -111,8 +122,6 @@ class Bot:
@private_only @private_only
def message(self, update: Update, context: CallbackContext): def message(self, update: Update, context: CallbackContext):
if update.effective_chat.type != 'private':
return
user_id = update.effective_user.id user_id = update.effective_user.id
links = re.findall(link_regex, update.message.text) links = re.findall(link_regex, update.message.text)
with self.db as db: with self.db as db:

View File

@ -1,7 +1,7 @@
import sqlite3 import sqlite3
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Tuple from typing import Dict, List, Tuple
@dataclass @dataclass
@ -12,6 +12,39 @@ class Link:
read_at: datetime = None read_at: datetime = None
added_at: datetime = None added_at: datetime = None
def as_dict(self):
def timestamp(t): return t.timestamp() if t is not None else None
return {
"id": self.id,
"user_id": self.user_id,
"read_at": timestamp(self.read_at),
"added_at": timestamp(self.added_at),
"link": self.link,
}
@classmethod
def from_dict(cls, dict: Dict) -> 'Link':
def convert_date(t): return datetime.fromtimestamp(
t) if t is not None else None
return Link(
id=dict.get('id', None),
user_id=dict["user_id"],
read_at=convert_date(dict.get("read_at", None)),
added_at=convert_date(dict.get("added_at", None)),
link=dict["link"]
)
@classmethod
def _from_tuple(cls, tuple) -> 'Link':
id, link, user_id, read_at, added_at = tuple
return cls(
id=id,
link=link,
user_id=user_id,
read_at=read_at,
added_at=added_at
)
def create(self, db: sqlite3.Connection): def create(self, db: sqlite3.Connection):
self.added_at = datetime.now() self.added_at = datetime.now()
cur = db.cursor() cur = db.cursor()
@ -26,22 +59,11 @@ class Link:
self.id = r[0][0] self.id = r[0][0]
@classmethod @classmethod
def _from_tuple(cls, tuple) -> 'Link': def _get(cls, db: sqlite3.Connection, where: str, values: Tuple = ()) -> List['Link']:
id, link, user_id, read_at, added_at = tuple
return cls(
id=id,
link=link,
user_id=user_id,
read_at=read_at,
added_at=added_at
)
@classmethod
def _get(cls, db: sqlite3.Connection, where: str, values: Tuple = ()) -> 'Link':
cur = db.cursor() cur = db.cursor()
rows = cur.execute(f"SELECT * FROM links WHERE {where}", values) rows = cur.execute(f"SELECT * FROM links WHERE {where}", values)
return map(cls._from_tuple, rows) return map(cls._from_tuple, rows)
@classmethod @classmethod
def get_unread(cls, db: sqlite3.Connection, user_id: str): def get_unread(cls, db: sqlite3.Connection, user_id: str) -> List['Link']:
return cls._get(db, "user_id = ? AND read_at IS NULL", (user_id,)) return cls._get(db, "user_id = ? AND read_at IS NULL", (user_id,))