from datetime import datetime import sqlite3 from typing import Optional, Tuple from telegram.ext import Updater, CommandHandler, CallbackContext from telegram import Update, ParseMode import os import logging import random from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') class Bot: def __init__(self, token, db, chat, forward_to=None): self.chat = chat self.db_path = db self._init_db() self.forward_to = forward_to or [] self.updater = Updater(token=token, use_context=True) self.dispatcher = self.updater.dispatcher self._lastid = None for f in dir(self): if f.startswith("cmd_"): self.dispatcher.add_handler( CommandHandler(f[4:], getattr(self, f))) def db(self): return sqlite3.connect(self.db_path) def _init_db(self): with self.db() as db: cur = db.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, calendar_id TEXT, telegram_poll_id TEXT); """) def cmd_start(self, update: Update, context: CallbackContext): context.bot.send_message(chat_id=update.effective_chat.id, text=f'Hi! effective_chat.id={update.effective_chat.id}') def send_event(self, id: str, date: datetime, header: str, description: str, location: str, latlong: Optional[Tuple[float, float]] = None, is_new: bool = True): messages = [] d = date.strftime("%Y-%m-%d %H:%M") text = f"""\ *{header}* _{d}, {location}_ {description} """ m = self.updater.bot.send_message( self.chat, text=text, parse_mode=ParseMode.MARKDOWN) messages.append(m) if latlong is not None: m = self.updater.bot.send_location( self.chat, latitude=latlong[0], longitude=latlong[1]) messages.append(m) poll_id = None if is_new: m = self.updater.bot.send_poll( self.chat, f'Wpadasz? ({header})', ['Będę', 'Nie będę', '🍆'], is_anonymous=False, allows_multiple_answers=False, ) poll_id = m.message_id with self.db() as db: cur = db.cursor() cur.execute( "INSERT INTO events(calendar_id, telegram_poll_id) VALUES (?, ?)", (str(id), str(poll_id))) else: with self.db() as db: cur = db.cursor() rows = list(cur.execute( "SELECT * FROM events WHERE calendar_id = ?;", (str(id),))) if len(rows) != 1: raise Exception( f"None/multiple events with id {id} found in the database") id, calendar_id, telegram_poll_id = rows[0] poll_id = telegram_poll_id for chat in self.forward_to: for msg in messages: msg.forward(chat) self.updater.bot.forward_message( chat_id=chat, from_chat_id=self.chat, message_id=poll_id) def cmd_test(self, update: Update, context: CallbackContext): context.bot.send_message(chat_id=self.chat, text="Test") id = str(random.randrange(2137, 19823198)) self._lastid = id self.send_event(id, datetime.now(), "Header", "Description", "Szynowa 18", (51.060938, 17.057193)) def cmd_update(self, update: Update, context: CallbackContext): context.bot.send_message(chat_id=self.chat, text="Test") id = self._lastid self.send_event(id, datetime.now(), "Header2", "Description2", "new location", (51.060938, 10), is_new=False) def cmd_location(self, update: Update, context: CallbackContext): print(type(update)) context.bot.send_location( update.effective_chat.id, 51, 19 ) def run(self): self.updater.start_polling() if __name__ == '__main__': b = Bot( token=os.environ['SZYNOWO_TOKEN'], chat=os.environ['SZYNOWO_CHAT'], db=os.environ['SZYNOWO_DB'], forward_to=[s.strip() for s in os.environ.get( 'SZYNOWO_FORWARD_TO', '').split(',')], ) b.run()