133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
from datetime import datetime
|
|
import sqlite3
|
|
from typing import Optional, Tuple
|
|
from telegram.ext import Updater, CommandHandler, CallbackContext
|
|
from telegram import Update, ParseMode
|
|
import os
|
|
import logging
|
|
import random
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
class Bot:
|
|
def __init__(self, token, db, chat, forward_to=None):
|
|
self.chat = chat
|
|
self.db_path = db
|
|
self._init_db()
|
|
self.forward_to = forward_to or []
|
|
self.updater = Updater(token=token, use_context=True)
|
|
self.dispatcher = self.updater.dispatcher
|
|
self._lastid = None
|
|
for f in dir(self):
|
|
if f.startswith("cmd_"):
|
|
self.dispatcher.add_handler(
|
|
CommandHandler(f[4:], getattr(self, f)))
|
|
|
|
def db(self):
|
|
return sqlite3.connect(self.db_path)
|
|
|
|
def _init_db(self):
|
|
with self.db() as db:
|
|
cur = db.cursor()
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
calendar_id TEXT,
|
|
telegram_poll_id TEXT);
|
|
""")
|
|
|
|
def cmd_start(self, update: Update, context: CallbackContext):
|
|
context.bot.send_message(chat_id=update.effective_chat.id,
|
|
text=f'Hi! effective_chat.id={update.effective_chat.id}')
|
|
|
|
def send_event(self, id: str, date: datetime, header: str, description: str, location: str, latlong: Optional[Tuple[float, float]] = None, is_new: bool = True):
|
|
messages = []
|
|
d = date.strftime("%Y-%m-%d %H:%M")
|
|
text = f"""\
|
|
*{header}*
|
|
_{d}, {location}_
|
|
{description}
|
|
"""
|
|
if not is_new:
|
|
text = "*ZMIANA WYDARZENIA*\n_Sprawdź, czy Twoja odpowiedź jest wciąż aktualna!_\n\n" + text
|
|
m = self.updater.bot.send_message(
|
|
self.chat, text=text, parse_mode=ParseMode.MARKDOWN)
|
|
messages.append(m)
|
|
if latlong is not None:
|
|
m = self.updater.bot.send_location(
|
|
self.chat, latitude=latlong[0], longitude=latlong[1])
|
|
messages.append(m)
|
|
poll_id = None
|
|
if is_new:
|
|
m = self.updater.bot.send_poll(
|
|
self.chat,
|
|
f'Wpadasz? ({header})',
|
|
['Będę', 'Nie będę', '🍆'],
|
|
is_anonymous=False,
|
|
allows_multiple_answers=False,
|
|
)
|
|
poll_id = m.message_id
|
|
with self.db() as db:
|
|
cur = db.cursor()
|
|
cur.execute(
|
|
"INSERT INTO events(calendar_id, telegram_poll_id) VALUES (?, ?)", (str(id), str(poll_id)))
|
|
else:
|
|
with self.db() as db:
|
|
cur = db.cursor()
|
|
rows = list(cur.execute(
|
|
"SELECT * FROM events WHERE calendar_id = ?;", (str(id),)))
|
|
if len(rows) != 1:
|
|
raise Exception(
|
|
f"None/multiple events with id {id} found in the database")
|
|
id, calendar_id, telegram_poll_id = rows[0]
|
|
poll_id = telegram_poll_id
|
|
|
|
for chat in self.forward_to:
|
|
for msg in messages:
|
|
msg.forward(chat)
|
|
self.updater.bot.forward_message(
|
|
chat_id=chat, from_chat_id=self.chat, message_id=poll_id)
|
|
|
|
def cmd_test(self, update: Update, context: CallbackContext):
|
|
context.bot.send_message(chat_id=self.chat, text="Test")
|
|
id = str(random.randrange(2137, 19823198))
|
|
self._lastid = id
|
|
|
|
self.send_event(id, datetime.now(), "Header",
|
|
"Description", "Szynowa 18", (51.060938, 17.057193))
|
|
|
|
def cmd_update(self, update: Update, context: CallbackContext):
|
|
context.bot.send_message(chat_id=self.chat, text="Test")
|
|
id = self._lastid
|
|
|
|
self.send_event(id, datetime.now(), "Header2",
|
|
"Description2", "new location", (51.060938, 10), is_new=False)
|
|
|
|
def cmd_location(self, update: Update, context: CallbackContext):
|
|
print(type(update))
|
|
context.bot.send_location(
|
|
update.effective_chat.id,
|
|
51,
|
|
19
|
|
)
|
|
|
|
def run(self):
|
|
self.updater.start_polling()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
b = Bot(
|
|
token=os.environ['SZYNOWO_TOKEN'],
|
|
chat=os.environ['SZYNOWO_CHAT'],
|
|
db=os.environ['SZYNOWO_DB'],
|
|
forward_to=[s.strip() for s in os.environ.get(
|
|
'SZYNOWO_FORWARD_TO', '').split(',')],
|
|
)
|
|
|
|
b.run()
|