bot/main.py

133 lines
4.6 KiB
Python

from datetime import datetime
import sqlite3
from typing import Optional, Tuple
from telegram.ext import Updater, CommandHandler, CallbackContext
from telegram import Update, ParseMode
import os
import logging
import random
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class Bot:
def __init__(self, token, db, chat, forward_to=None):
self.chat = chat
self.db_path = db
self._init_db()
self.forward_to = forward_to or []
self.updater = Updater(token=token, use_context=True)
self.dispatcher = self.updater.dispatcher
self._lastid = None
for f in dir(self):
if f.startswith("cmd_"):
self.dispatcher.add_handler(
CommandHandler(f[4:], getattr(self, f)))
def db(self):
return sqlite3.connect(self.db_path)
def _init_db(self):
with self.db() as db:
cur = db.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
calendar_id TEXT,
telegram_poll_id TEXT);
""")
def cmd_start(self, update: Update, context: CallbackContext):
context.bot.send_message(chat_id=update.effective_chat.id,
text=f'Hi! effective_chat.id={update.effective_chat.id}')
def send_event(self, id: str, date: datetime, header: str, description: str, location: str, latlong: Optional[Tuple[float, float]] = None, is_new: bool = True):
messages = []
d = date.strftime("%Y-%m-%d %H:%M")
text = f"""\
*{header}*
_{d}, {location}_
{description}
"""
if not is_new:
text = "*ZMIANA WYDARZENIA*\n_Sprawdź, czy Twoja odpowiedź jest wciąż aktualna!_\n\n" + text
m = self.updater.bot.send_message(
self.chat, text=text, parse_mode=ParseMode.MARKDOWN)
messages.append(m)
if latlong is not None:
m = self.updater.bot.send_location(
self.chat, latitude=latlong[0], longitude=latlong[1])
messages.append(m)
poll_id = None
if is_new:
m = self.updater.bot.send_poll(
self.chat,
f'Wpadasz? ({header})',
['Będę', 'Nie będę', '🍆'],
is_anonymous=False,
allows_multiple_answers=False,
)
poll_id = m.message_id
with self.db() as db:
cur = db.cursor()
cur.execute(
"INSERT INTO events(calendar_id, telegram_poll_id) VALUES (?, ?)", (str(id), str(poll_id)))
else:
with self.db() as db:
cur = db.cursor()
rows = list(cur.execute(
"SELECT * FROM events WHERE calendar_id = ?;", (str(id),)))
if len(rows) != 1:
raise Exception(
f"None/multiple events with id {id} found in the database")
id, calendar_id, telegram_poll_id = rows[0]
poll_id = telegram_poll_id
for chat in self.forward_to:
for msg in messages:
msg.forward(chat)
self.updater.bot.forward_message(
chat_id=chat, from_chat_id=self.chat, message_id=poll_id)
def cmd_test(self, update: Update, context: CallbackContext):
context.bot.send_message(chat_id=self.chat, text="Test")
id = str(random.randrange(2137, 19823198))
self._lastid = id
self.send_event(id, datetime.now(), "Header",
"Description", "Szynowa 18", (51.060938, 17.057193))
def cmd_update(self, update: Update, context: CallbackContext):
context.bot.send_message(chat_id=self.chat, text="Test")
id = self._lastid
self.send_event(id, datetime.now(), "Header2",
"Description2", "new location", (51.060938, 10), is_new=False)
def cmd_location(self, update: Update, context: CallbackContext):
print(type(update))
context.bot.send_location(
update.effective_chat.id,
51,
19
)
def run(self):
self.updater.start_polling()
if __name__ == '__main__':
b = Bot(
token=os.environ['SZYNOWO_TOKEN'],
chat=os.environ['SZYNOWO_CHAT'],
db=os.environ['SZYNOWO_DB'],
forward_to=[s.strip() for s in os.environ.get(
'SZYNOWO_FORWARD_TO', '').split(',')],
)
b.run()