From 2830335b121829a2808d756a87485b8992ce4cff Mon Sep 17 00:00:00 2001 From: hogweed1 Date: Mon, 12 May 2025 21:52:47 +1000 Subject: [PATCH] space genocide --- bot.py | 19 +++++--------- db_logic/collections.py | 30 ++++++++++------------ db_logic/postcleaner.py | 35 +++++++++++--------------- db_logic/tops.py | 22 ++++++++--------- db_logic/user_stuff.py | 19 ++++++-------- dicktxt/ForReadDict.py | 3 +-- dicktxt/WordsChange.py | 5 ++-- dicktxt/__init__.py | 1 + handlers/__init__.py | 2 -- handlers/metrics.py | 17 ++++++------- handlers/pipisa.py | 28 ++++++--------------- handlers/start_help.py | 11 +-------- pipisa_functions/pipisa_time.py | 44 +++++++++++++-------------------- 13 files changed, 88 insertions(+), 148 deletions(-) diff --git a/bot.py b/bot.py index 1b85b7f..4046173 100644 --- a/bot.py +++ b/bot.py @@ -7,7 +7,6 @@ from global_conf import CONFIG from prometheus_client import start_http_server - def remake_field_generator(d, field='text'): if isinstance(d, list): for k2 in d: @@ -16,7 +15,7 @@ def remake_field_generator(d, field='text'): elif isinstance(d, dict): for k, v in d.items(): if k == field: - d[k] = '' + d[k] = '' yield v elif isinstance(v, list): for k2 in v: @@ -24,20 +23,19 @@ def remake_field_generator(d, field='text'): yield id_val elif isinstance(v, dict): for id_val in remake_field_generator(v): - yield id_val + yield id_val def filter_grammar_messages(record): if record.args and (not None in record.args): - j = json.loads(record.args[2]) + j = json.loads(record.args[2]) for _ in remake_field_generator(j): - pass + pass record.args = (record.args[0], record.args[1], json.dumps(j)) return True logging.basicConfig(level=logging.DEBUG) logging.getLogger("aiogram").addFilter(filter_grammar_messages) - # pipisa.register_handlers_pipisa(dp) # time_new_year.register_handlers_time(dp) # sendalarm.register_handlers_test(dp) @@ -46,11 +44,6 @@ logging.getLogger("aiogram").addFilter(filter_grammar_messages) if __name__ == '__main__': #print(CONFIG) start_http_server(1337) - logging.info('Бот в строю.') + logging.info('Бот в строю.') executor.start_polling(dp, skip_updates=True) - logging.info('Всем пока.') - - - - - + logging.info('Всем пока.') diff --git a/db_logic/collections.py b/db_logic/collections.py index bfaa145..cf8d700 100644 --- a/db_logic/collections.py +++ b/db_logic/collections.py @@ -1,44 +1,40 @@ - from global_conf import CONFIG - #### https://docs.python-arango.com/en/main/ from arango import ArangoClient - import logging import os -def get_dicks_collection(): +def get_dicks_collection(): try: - arango_client = ArangoClient(hosts=CONFIG['databaso']['host'] ) + arango_client = ArangoClient(hosts=CONFIG['databaso']['host'] ) pipisa_db = arango_client.db( - CONFIG['databaso']['base'], + CONFIG['databaso']['base'], username=os.environ['ARANGO_USR'], password=os.environ['ARANGO_PWD'], #username=CONFIG['databaso']['user'], #password=CONFIG['databaso']['pass'] - ) - dicks_collection = pipisa_db.collection(CONFIG['databaso']['collection']) + ) + dicks_collection = pipisa_db.collection(CONFIG['databaso']['collection']) return dicks_collection - except Exception as e: + except Exception as e: logging.error('ошибка DB при взятии коллекции пипис') - logging.error(e) + logging.error(e) - -def get_posts_removal_collection(): +def get_posts_removal_collection(): try: - arango_client = ArangoClient(hosts=CONFIG['databaso']['host'] ) + arango_client = ArangoClient(hosts=CONFIG['databaso']['host']) pipisa_db = arango_client.db( - CONFIG['databaso']['base'], + CONFIG['databaso']['base'], username=os.environ['ARANGO_USR'], password=os.environ['ARANGO_PWD'], #username=CONFIG['databaso']['user'], #password=CONFIG['databaso']['pass'] - ) - posts_removal_collection = pipisa_db.collection(CONFIG['databaso']['posts_removal_collection']) + ) + posts_removal_collection = pipisa_db.collection(CONFIG['databaso']['posts_removal_collection']) return posts_removal_collection - except Exception as e: + except Exception as e: logging.error('ошибка DB при взятии коллекции постов-на-удаление') logging.error(e) diff --git a/db_logic/postcleaner.py b/db_logic/postcleaner.py index e51c23c..5567b93 100644 --- a/db_logic/postcleaner.py +++ b/db_logic/postcleaner.py @@ -2,22 +2,20 @@ from db_logic import collections import datetime, logging - def append_post_to_cleaning_sequence(message, type=None): try: novenkiy = { - 'msg_id': message.message_id, - 'chat_id': message.chat.id, - 'type': type, - 'datetimes': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + 'msg_id': message.message_id, + 'chat_id': message.chat.id, + 'type': type, + 'datetimes': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") } metadata = collections.get_posts_removal_collection().insert(novenkiy, overwrite_mode='update') logging.debug('Успешно добавлен пост на удаление') except Exception as e2: logging.error('ошибка DB :: добавление нового поста на удаление') - logging.error(e2) - + logging.error(e2) def get_posts_to_be_removed(chat_id, type=None, max_id=None): # собираем @@ -29,10 +27,10 @@ def get_posts_to_be_removed(chat_id, type=None, max_id=None): else: posts = [p for p in collections.get_posts_removal_collection().find({'chat_id': chat_id}, skip=0, limit=1100) ] - posts_ret = [ p for p in posts ] - # # for p in posts: - # # last_time = datetime.datetime.strptime(p['datetimes'], '%Y-%m-%d %H:%M:%S') - # # timediff = (datetime.datetime.now() - last_time).total_seconds() + posts_ret = [ p for p in posts ] + # # for p in posts: + # # last_time = datetime.datetime.strptime(p['datetimes'], '%Y-%m-%d %H:%M:%S') + # # timediff = (datetime.datetime.now() - last_time).total_seconds() # # if timediff > 60: # # posts_ret.append(p) @@ -42,34 +40,31 @@ def get_posts_to_be_removed(chat_id, type=None, max_id=None): else: posts_ret = [ p for p in posts_ret if p['msg_id'] != max([pp['msg_id'] for pp in posts_ret])] - #### TODO удалять все предыдущие без учёта времени return posts_ret - except Exception as e: + except Exception as e: logging.error('ошибка DB :: получение постов на удаление') logging.error(e) - def del_post(msg_id, chat_id): # удаляем из базы - try: - + try: candidate_cursor = collections.get_posts_removal_collection().find( { 'msg_id': msg_id, 'chat_id': chat_id, }, - skip = 0, + skip = 0, limit = 1488 ) - if candidate_cursor.count() > 0: + if candidate_cursor.count() > 0: pp = candidate_cursor.pop() else: pp = None collections.get_posts_removal_collection().delete(pp) - except Exception as e: + except Exception as e: logging.error('ошибка DB :: удаление поста на удаление') - logging.error(e) \ No newline at end of file + logging.error(e) diff --git a/db_logic/tops.py b/db_logic/tops.py index cff5d83..cca7860 100644 --- a/db_logic/tops.py +++ b/db_logic/tops.py @@ -4,36 +4,34 @@ import logging def get_tops(top_ = False, glob_ = False, chat_id = None): - - if top_: + if top_: try: dicks = [d for d in collections.get_dicks_collection().find({'chat_id': chat_id}, skip=0, limit=1100)] - except Exception as e: + except Exception as e: logging.error('ошибка DB в /topdick') - logging.error(e) - elif glob_: + logging.error(e) + elif glob_: try: dicks = [d for d in collections.get_dicks_collection().all( ) if d['user_id'] != d['chat_id']] - except Exception as e: + except Exception as e: logging.error('ошибка DB в /globaldick') logging.error(e) else: - logging.error('вызывают хз что!') + logging.error('вызывают хз что!') - top_dicks = sorted(dicks, key=lambda dick: dick['dick_size'], reverse=True) - top_dicks = top_dicks[:( min(len(top_dicks), 10) )] + top_dicks = sorted(dicks, key=lambda dick: dick['dick_size'], reverse=True) + top_dicks = top_dicks[:(min(len(top_dicks), 10))] dickos = '' emo = ['🏆','🚀','🍆','🍌','🐍','🐎','🌭','🌶','👌','💩'] if len(top_dicks) > 0: - for i in range(len(top_dicks)): + for i in range(len(top_dicks)): dickos += f' {emo[i]} {i+1}. {top_dicks[i]["user_fullname"]}: {top_dicks[i]["dick_size"]}см\n' i += 1 return dickos - def get_antitops(top_ = False, glob_ = False, chat_id = None): dicks = [] if top_: @@ -62,4 +60,4 @@ def get_antitops(top_ = False, glob_ = False, chat_id = None): dickos += f' {emo[i]} {i+1}. {antitop_dicks[i]["user_fullname"]}: {antitop_dicks[i]["dick_size"]}см\n' i += 1 - return dickos \ No newline at end of file + return dickos diff --git a/db_logic/user_stuff.py b/db_logic/user_stuff.py index 9adc519..99096db 100644 --- a/db_logic/user_stuff.py +++ b/db_logic/user_stuff.py @@ -4,7 +4,6 @@ import datetime, logging def store_new_user(message, result ): - try: novenkiy = { 'user_id': message.from_user.id, @@ -17,12 +16,11 @@ def store_new_user(message, result ): metadata = collections.get_dicks_collection().insert(novenkiy, overwrite_mode='update') metric_wrap(message.chat.id, message.from_user.full_name, abs(result), 1) - logging.debug(f'Успешно добавлен нового пользователь @{message.from_user.username}') + logging.debug(f'Успешно добавлен нового пользователь @{message.from_user.username}') except Exception as e2: logging.error(f'ошибка DB в /dick :: добавление нового пользователя @{message.from_user.username}') logging.error(e2) - def update_attempts(message, user ): try: metadata = collections.get_dicks_collection().insert( @@ -39,14 +37,12 @@ def update_attempts(message, user ): ) metric_wrap(message.chat.id, message.from_user.full_name, user['dick_size'], user['attempts'] + 1) - logging.debug(f'Успешно апдейтнули попытку крутить пипису @{message.from_user.username}') + logging.debug(f'Успешно апдейтнули попытку крутить пипису @{message.from_user.username}') except Exception as e2: logging.error(f'ошибка DB в /dick :: обновление попытки крутить пипису @{message.from_user.username}') logging.error(e2) - def update_dick_size(message, user, updatedDick ): - try: metadata = collections.get_dicks_collection().insert( { @@ -62,12 +58,11 @@ def update_dick_size(message, user, updatedDick ): ) metric_wrap(message.chat.id, message.from_user.full_name, updatedDick, user['attempts'] + 1) - logging.info(f'Успешно апдейтнули пипису @{message.from_user.username}') + logging.info(f'Успешно апдейтнули пипису @{message.from_user.username}') except Exception as e2: logging.error(f'ошибка DB в /dick :: обновление пиписы @{message.from_user.username}') logging.error(e2) - def get_user(message): try: #### Чекнуть есть ли юзер в базе @@ -76,15 +71,15 @@ def get_user(message): 'user_id': message.from_user.id, 'chat_id': message.chat.id }, - skip = 0, + skip = 0, limit = 1488 ) - if candidate_cursor.count() > 0: + if candidate_cursor.count() > 0: user = candidate_cursor.pop() else: user = None return user - except Exception as e: + except Exception as e: logging.error('ошибка DB в /dick :: поиск юзера') - logging.error(e) \ No newline at end of file + logging.error(e) diff --git a/dicktxt/ForReadDict.py b/dicktxt/ForReadDict.py index 752a890..0dd010d 100644 --- a/dicktxt/ForReadDict.py +++ b/dicktxt/ForReadDict.py @@ -1,6 +1,6 @@ - import random + def RandomDick(): # Забирает рандомный синоним к слову хуй из dick_sinonims.txt # у линуха и винды не забывай про разные \/ @@ -9,4 +9,3 @@ def RandomDick(): words = file.readlines() return random.choice(words).strip() - diff --git a/dicktxt/WordsChange.py b/dicktxt/WordsChange.py index e511906..1eeb5f3 100644 --- a/dicktxt/WordsChange.py +++ b/dicktxt/WordsChange.py @@ -3,7 +3,6 @@ import random def ChangeWord(result): #выбирает рандомное слово для изменения размера пиписы - with open("dicktxt/dick_changes.yaml", 'r', encoding='utf-8') as f: words = yaml.load(f, Loader=yaml.Loader) @@ -14,8 +13,8 @@ def ChangeWord(result): #выбирает рандомное слово для if result > 0: size_change = random.choice(dick_inc) + '🚀' elif result == 0: - size_change = random.choice(dick_static) + '🤨' + size_change = random.choice(dick_static) + '🤨' else: size_change = random.choice(dick_decr) + '✂' - return size_change \ No newline at end of file + return size_change diff --git a/dicktxt/__init__.py b/dicktxt/__init__.py index 2508ff4..7d14353 100644 --- a/dicktxt/__init__.py +++ b/dicktxt/__init__.py @@ -1 +1,2 @@ from . import ForReadDict + diff --git a/handlers/__init__.py b/handlers/__init__.py index 34a19bf..b811d15 100644 --- a/handlers/__init__.py +++ b/handlers/__init__.py @@ -1,6 +1,4 @@ - from handlers import pipisa from handlers import start_help - from handlers import time_new_year diff --git a/handlers/metrics.py b/handlers/metrics.py index 78feb53..b3e666f 100644 --- a/handlers/metrics.py +++ b/handlers/metrics.py @@ -3,15 +3,14 @@ import time from prometheus_client import Gauge, Info from global_conf import pipisa_length_metric, user_attempt_metric, user_last_attempt_metric -class user_info_struct: - user_id: str - user_fullname: str - dick_size: int - datetimes: str - attemptsCount: int - chat_id: int - +class user_info_struct: + user_id: str + user_fullname: str + dick_size: int + datetimes: str + attemptsCount: int + chat_id: int def metric_wrap(chatID:int, userName: str, length: int, attemptsCount: int): """Pass here chat ID, username, pipisa length and attempts count to wrap them into metric.""" @@ -26,6 +25,6 @@ def metric_wrap(chatID:int, userName: str, length: int, attemptsCount: int): if not user_last_attempt_metric: user_last_attempt_metric = Gauge('user_last_attempt_time', 'Last attempt time', labelnames=['ChatID', 'Username']) - pipisa_length_metric.labels(chatID, userName).set(length) + pipisa_length_metric.labels(chatID, userName).set(length) user_attempt_metric.labels(chatID, userName).set(attemptsCount) user_last_attempt_metric.labels(chatID, userName).set(time.time_ns()) diff --git a/handlers/pipisa.py b/handlers/pipisa.py index 4bb1c29..8271f83 100644 --- a/handlers/pipisa.py +++ b/handlers/pipisa.py @@ -5,22 +5,16 @@ from random import randint import datetime, logging from dicktxt import ForReadDict, WordsChange from pipisa_functions import pipisa_time - from global_conf import CONFIG - from db_logic import tops, user_stuff, postcleaner admins = CONFIG['telegram_admins_ids'] - - @dp.message_handler(commands=["dick"]) async def up_dick(message: types.Message): if message.from_user.id in admins or message.chat.type != 'private': - - postcleaner.append_post_to_cleaning_sequence(message=message, type=f'COMMAND_CALL__DICK') await clean_posts(chat_id=message.chat.id, type='COMMAND_CALL__DICK', max_id=message.message_id) @@ -55,25 +49,23 @@ async def up_dick(message: types.Message): postcleaner.append_post_to_cleaning_sequence(message=mmm, type='ALREADY_ROLLED') await clean_posts(chat_id=mmm.chat.id, type='ALREADY_ROLLED', max_id=mmm.message_id) - else: + else: ## если нету, то создать user_stuff.store_new_user(message, result) await bot.send_message( message.chat.id, f'@{message.from_user.username}, Добро пожаловать в игру! Ваш писюн показал головку 🚀\nна {abs(result)} см!\nТеперь он равен {abs(result)} см!' - ) + ) else: await message.reply('Растить елду можно только в общих чатах!') - - @dp.message_handler(commands = ['topdick', 'globaldick']) async def send_topchat(message: types.Message): if message.from_user.id in admins or message.chat.type != 'private': top_ = message['text'].startswith('/topdick') - glob_ = message['text'].startswith('/globaldick') + glob_ = message['text'].startswith('/globaldick') if top_: dickos = tops.get_tops( top_ = True, chat_id=message.chat.id ) @@ -84,21 +76,19 @@ async def send_topchat(message: types.Message): postcleaner.append_post_to_cleaning_sequence(message=message, type=f'COMMAND_CALL__GLOBALDICK') await clean_posts(chat_id=message.chat.id, type='COMMAND_CALL__GLOBALDICK', max_id=message.message_id) else: - logging.error('вызывают хз что!') - + logging.error('вызывают хз что!') if not dickos: await bot.send_message(message.chat.id, '🍆 Никто ничего не нарастил! 🍌') else: if top_: await bot.send_message(message.chat.id, '🏆Топ 10 бубылд чата🏆\n\n' + dickos) - elif glob_: + elif glob_: await bot.send_message(message.chat.id, '🏆Топ 10 пипис в мире🏆\n\n' + dickos) else: await message.reply('Работает только в общих чатах!\n'\ 'Вы можете посмотреть топ по миру /globaldick') - @dp.message_handler(commands = ['antitopdick', 'antiglobaldick']) async def send_antitopchat(message: types.Message): if message.from_user.id in admins or message.chat.type != 'private': @@ -127,10 +117,8 @@ async def send_antitopchat(message: types.Message): await message.reply('Работает только в общих чатах!\n'\ 'Вы можете посмотреть антитоп по миру /antiglobaldick') - - async def clean_posts(chat_id, type=None, max_id=None): - psts = postcleaner.get_posts_to_be_removed(chat_id, type, max_id) + psts = postcleaner.get_posts_to_be_removed(chat_id, type, max_id) for p in psts: postcleaner.del_post(chat_id=p['chat_id'], msg_id=p['msg_id']) @@ -138,9 +126,7 @@ async def clean_posts(chat_id, type=None, max_id=None): #### TODO проверить админит ли бот try: - await bot.delete_message( chat_id=p['chat_id'], message_id=p['msg_id'], ) + await bot.delete_message(chat_id=p['chat_id'], message_id=p['msg_id']) except Exception as e: logging.error('ошибка удаления поста-на-удаление') logging.error(e) - - diff --git a/handlers/start_help.py b/handlers/start_help.py index c61e494..47f1897 100644 --- a/handlers/start_help.py +++ b/handlers/start_help.py @@ -5,23 +5,14 @@ from create_bot import dp, bot @dp.message_handler(commands=['start']) async def start_func(message: types.Message): if message.chat.type == 'private': - lkb = InlineKeyboardMarkup(row_width=1).add(InlineKeyboardButton(text = 'Добавить в группу', callback_data='add_2_group')) await message.answer(f'Добро пожаловать, {message.from_user.full_name}!\n' 'Это бот, который растит члены. Чтобы начать, добавь бота в чат'\ , reply_markup=lkb - ) # TODO добавить кнопку, по которой смогут добавлять юзера в чат - + @dp.callback_query_handler(text = 'add_2_group') async def addgroup(callback: types.CallbackQuery): await callback.answer('жмав') - - - - - - - diff --git a/pipisa_functions/pipisa_time.py b/pipisa_functions/pipisa_time.py index 5266444..a23db0e 100644 --- a/pipisa_functions/pipisa_time.py +++ b/pipisa_functions/pipisa_time.py @@ -2,9 +2,7 @@ import unittest import datetime - def rolltime(check_datetime , curr_time=datetime.datetime.now()): - last_time = datetime.datetime.strptime(check_datetime, '%Y-%m-%d %H:%M:%S') time_to_grow = curr_time.replace(hour=10, minute=0, second=0, microsecond=0) #+ datetime.timedelta(days=0) @@ -16,34 +14,30 @@ def rolltime(check_datetime , curr_time=datetime.datetime.now()): # print(f'\tcurr_time : {curr_time.strftime("%Y-%m-%d %H:%M:%S")}') # print(f'\t\tsecs diff : {timediff}') - if timediff > 86400 or (curr_time > time_to_grow and last_time < time_to_grow): return True else: return False - - class TestPipisa(unittest.TestCase): - def test_very_long_ago(self): + def test_very_long_ago(self): self.assertEqual(rolltime('2021-10-11 23:44:00'), True, "Пиписа прокручена очень давно. Очевидный прокрут") - def test_just_after(self): + def test_just_after(self): self.assertEqual(rolltime( datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') ), False, "Попытка крутить когда старый прокрут был через 1 минуту после сегодняшнего обновления роллов. Фейл.") - def test_yesterdayroll(self): + def test_yesterdayroll(self): self.assertEqual( rolltime( ( - datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(days=1) + datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(days=1) ).strftime('%Y-%m-%d %H:%M:%S'), datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) ), True, "Крутилась вчера через минуту после обновления роллов. Должно пропускать ") - - def test_hour_before_roll(self): + def test_hour_before_roll(self): self.assertEqual( rolltime( ( @@ -55,33 +49,29 @@ class TestPipisa(unittest.TestCase): def test_after_midnight(self): self.assertEqual( rolltime( - ( - datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=14) + ( + datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=14) ).strftime('%Y-%m-%d %H:%M:%S'), datetime.datetime.now().replace(hour=0, minute=11, second=8, microsecond=0) ), False, "Где-то ночью прокручивается повторно. Фейл.") - - def test_before_midnight(self): + + def test_before_midnight(self): self.assertEqual( rolltime( - ( - datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=15) + ( + datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=15) ).strftime('%Y-%m-%d %H:%M:%S'), - datetime.datetime.now().replace(hour=0, minute=11, second=8, microsecond=0) - datetime.timedelta(hours=15) + datetime.datetime.now().replace(hour=0, minute=11, second=8, microsecond=0) - datetime.timedelta(hours=15) ), False, "Где-то ночью прокручивается повторно. Фейл.") - - def test_somewhere_after(self): + + def test_somewhere_after(self): self.assertEqual( rolltime( - ( - datetime.datetime.now().replace(hour=10, minute=18, second=0, microsecond=0) + ( + datetime.datetime.now().replace(hour=10, minute=18, second=0, microsecond=0) ).strftime('%Y-%m-%d %H:%M:%S'), datetime.datetime.now().replace(hour=10, minute=19, second=0, microsecond=0) ), False, "Сегодня уже открутили а хотят ещё. Фейл.") - - - - if __name__ == "__main__": - unittest.main(verbosity=2) \ No newline at end of file + unittest.main(verbosity=2)