space genocide

This commit is contained in:
hogweed1 2025-05-12 21:52:47 +10:00
parent 7e52c86067
commit 2830335b12
13 changed files with 88 additions and 148 deletions

19
bot.py
View File

@ -7,7 +7,6 @@ from global_conf import CONFIG
from prometheus_client import start_http_server from prometheus_client import start_http_server
def remake_field_generator(d, field='text'): def remake_field_generator(d, field='text'):
if isinstance(d, list): if isinstance(d, list):
for k2 in d: for k2 in d:
@ -16,7 +15,7 @@ def remake_field_generator(d, field='text'):
elif isinstance(d, dict): elif isinstance(d, dict):
for k, v in d.items(): for k, v in d.items():
if k == field: if k == field:
d[k] = '<blank!>' d[k] = '<blank!>'
yield v yield v
elif isinstance(v, list): elif isinstance(v, list):
for k2 in v: for k2 in v:
@ -24,20 +23,19 @@ def remake_field_generator(d, field='text'):
yield id_val yield id_val
elif isinstance(v, dict): elif isinstance(v, dict):
for id_val in remake_field_generator(v): for id_val in remake_field_generator(v):
yield id_val yield id_val
def filter_grammar_messages(record): def filter_grammar_messages(record):
if record.args and (not None in record.args): if record.args and (not None in record.args):
j = json.loads(record.args[2]) j = json.loads(record.args[2])
for _ in remake_field_generator(j): for _ in remake_field_generator(j):
pass pass
record.args = (record.args[0], record.args[1], json.dumps(j)) record.args = (record.args[0], record.args[1], json.dumps(j))
return True return True
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logging.getLogger("aiogram").addFilter(filter_grammar_messages) logging.getLogger("aiogram").addFilter(filter_grammar_messages)
# pipisa.register_handlers_pipisa(dp) # pipisa.register_handlers_pipisa(dp)
# time_new_year.register_handlers_time(dp) # time_new_year.register_handlers_time(dp)
# sendalarm.register_handlers_test(dp) # sendalarm.register_handlers_test(dp)
@ -46,11 +44,6 @@ logging.getLogger("aiogram").addFilter(filter_grammar_messages)
if __name__ == '__main__': if __name__ == '__main__':
#print(CONFIG) #print(CONFIG)
start_http_server(1337) start_http_server(1337)
logging.info('Бот в строю.') logging.info('Бот в строю.')
executor.start_polling(dp, skip_updates=True) executor.start_polling(dp, skip_updates=True)
logging.info('Всем пока.') logging.info('Всем пока.')

View File

@ -1,44 +1,40 @@
from global_conf import CONFIG from global_conf import CONFIG
#### https://docs.python-arango.com/en/main/ #### https://docs.python-arango.com/en/main/
from arango import ArangoClient from arango import ArangoClient
import logging import logging
import os import os
def get_dicks_collection(): def get_dicks_collection():
try: try:
arango_client = ArangoClient(hosts=CONFIG['databaso']['host'] ) arango_client = ArangoClient(hosts=CONFIG['databaso']['host'] )
pipisa_db = arango_client.db( pipisa_db = arango_client.db(
CONFIG['databaso']['base'], CONFIG['databaso']['base'],
username=os.environ['ARANGO_USR'], username=os.environ['ARANGO_USR'],
password=os.environ['ARANGO_PWD'], password=os.environ['ARANGO_PWD'],
#username=CONFIG['databaso']['user'], #username=CONFIG['databaso']['user'],
#password=CONFIG['databaso']['pass'] #password=CONFIG['databaso']['pass']
) )
dicks_collection = pipisa_db.collection(CONFIG['databaso']['collection']) dicks_collection = pipisa_db.collection(CONFIG['databaso']['collection'])
return dicks_collection return dicks_collection
except Exception as e: except Exception as e:
logging.error('ошибка DB при взятии коллекции пипис') logging.error('ошибка DB при взятии коллекции пипис')
logging.error(e) logging.error(e)
def get_posts_removal_collection():
def get_posts_removal_collection():
try: try:
arango_client = ArangoClient(hosts=CONFIG['databaso']['host'] ) arango_client = ArangoClient(hosts=CONFIG['databaso']['host'])
pipisa_db = arango_client.db( pipisa_db = arango_client.db(
CONFIG['databaso']['base'], CONFIG['databaso']['base'],
username=os.environ['ARANGO_USR'], username=os.environ['ARANGO_USR'],
password=os.environ['ARANGO_PWD'], password=os.environ['ARANGO_PWD'],
#username=CONFIG['databaso']['user'], #username=CONFIG['databaso']['user'],
#password=CONFIG['databaso']['pass'] #password=CONFIG['databaso']['pass']
) )
posts_removal_collection = pipisa_db.collection(CONFIG['databaso']['posts_removal_collection']) posts_removal_collection = pipisa_db.collection(CONFIG['databaso']['posts_removal_collection'])
return posts_removal_collection return posts_removal_collection
except Exception as e: except Exception as e:
logging.error('ошибка DB при взятии коллекции постов-на-удаление') logging.error('ошибка DB при взятии коллекции постов-на-удаление')
logging.error(e) logging.error(e)

View File

@ -2,22 +2,20 @@ from db_logic import collections
import datetime, logging import datetime, logging
def append_post_to_cleaning_sequence(message, type=None): def append_post_to_cleaning_sequence(message, type=None):
try: try:
novenkiy = { novenkiy = {
'msg_id': message.message_id, 'msg_id': message.message_id,
'chat_id': message.chat.id, 'chat_id': message.chat.id,
'type': type, 'type': type,
'datetimes': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 'datetimes': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
} }
metadata = collections.get_posts_removal_collection().insert(novenkiy, overwrite_mode='update') metadata = collections.get_posts_removal_collection().insert(novenkiy, overwrite_mode='update')
logging.debug('Успешно добавлен пост на удаление') logging.debug('Успешно добавлен пост на удаление')
except Exception as e2: except Exception as e2:
logging.error('ошибка DB :: добавление нового поста на удаление') logging.error('ошибка DB :: добавление нового поста на удаление')
logging.error(e2) logging.error(e2)
def get_posts_to_be_removed(chat_id, type=None, max_id=None): def get_posts_to_be_removed(chat_id, type=None, max_id=None):
# собираем # собираем
@ -29,10 +27,10 @@ def get_posts_to_be_removed(chat_id, type=None, max_id=None):
else: else:
posts = [p for p in collections.get_posts_removal_collection().find({'chat_id': chat_id}, skip=0, limit=1100) ] posts = [p for p in collections.get_posts_removal_collection().find({'chat_id': chat_id}, skip=0, limit=1100) ]
posts_ret = [ p for p in posts ] posts_ret = [ p for p in posts ]
# # for p in posts: # # for p in posts:
# # last_time = datetime.datetime.strptime(p['datetimes'], '%Y-%m-%d %H:%M:%S') # # last_time = datetime.datetime.strptime(p['datetimes'], '%Y-%m-%d %H:%M:%S')
# # timediff = (datetime.datetime.now() - last_time).total_seconds() # # timediff = (datetime.datetime.now() - last_time).total_seconds()
# # if timediff > 60: # # if timediff > 60:
# # posts_ret.append(p) # # posts_ret.append(p)
@ -42,34 +40,31 @@ def get_posts_to_be_removed(chat_id, type=None, max_id=None):
else: else:
posts_ret = [ p for p in posts_ret if p['msg_id'] != max([pp['msg_id'] for pp in posts_ret])] posts_ret = [ p for p in posts_ret if p['msg_id'] != max([pp['msg_id'] for pp in posts_ret])]
#### TODO удалять все предыдущие без учёта времени #### TODO удалять все предыдущие без учёта времени
return posts_ret return posts_ret
except Exception as e: except Exception as e:
logging.error('ошибка DB :: получение постов на удаление') logging.error('ошибка DB :: получение постов на удаление')
logging.error(e) logging.error(e)
def del_post(msg_id, chat_id): def del_post(msg_id, chat_id):
# удаляем из базы # удаляем из базы
try: try:
candidate_cursor = collections.get_posts_removal_collection().find( candidate_cursor = collections.get_posts_removal_collection().find(
{ {
'msg_id': msg_id, 'msg_id': msg_id,
'chat_id': chat_id, 'chat_id': chat_id,
}, },
skip = 0, skip = 0,
limit = 1488 limit = 1488
) )
if candidate_cursor.count() > 0: if candidate_cursor.count() > 0:
pp = candidate_cursor.pop() pp = candidate_cursor.pop()
else: else:
pp = None pp = None
collections.get_posts_removal_collection().delete(pp) collections.get_posts_removal_collection().delete(pp)
except Exception as e: except Exception as e:
logging.error('ошибка DB :: удаление поста на удаление') logging.error('ошибка DB :: удаление поста на удаление')
logging.error(e) logging.error(e)

View File

@ -4,36 +4,34 @@ import logging
def get_tops(top_ = False, glob_ = False, chat_id = None): def get_tops(top_ = False, glob_ = False, chat_id = None):
if top_:
if top_:
try: try:
dicks = [d for d in collections.get_dicks_collection().find({'chat_id': chat_id}, skip=0, limit=1100)] dicks = [d for d in collections.get_dicks_collection().find({'chat_id': chat_id}, skip=0, limit=1100)]
except Exception as e: except Exception as e:
logging.error('ошибка DB в /topdick') logging.error('ошибка DB в /topdick')
logging.error(e) logging.error(e)
elif glob_: elif glob_:
try: try:
dicks = [d for d in collections.get_dicks_collection().all( ) if d['user_id'] != d['chat_id']] dicks = [d for d in collections.get_dicks_collection().all( ) if d['user_id'] != d['chat_id']]
except Exception as e: except Exception as e:
logging.error('ошибка DB в /globaldick') logging.error('ошибка DB в /globaldick')
logging.error(e) logging.error(e)
else: else:
logging.error('вызывают хз что!') logging.error('вызывают хз что!')
top_dicks = sorted(dicks, key=lambda dick: dick['dick_size'], reverse=True) top_dicks = sorted(dicks, key=lambda dick: dick['dick_size'], reverse=True)
top_dicks = top_dicks[:( min(len(top_dicks), 10) )] top_dicks = top_dicks[:(min(len(top_dicks), 10))]
dickos = '' dickos = ''
emo = ['🏆','🚀','🍆','🍌','🐍','🐎','🌭','🌶','👌','💩'] emo = ['🏆','🚀','🍆','🍌','🐍','🐎','🌭','🌶','👌','💩']
if len(top_dicks) > 0: if len(top_dicks) > 0:
for i in range(len(top_dicks)): for i in range(len(top_dicks)):
dickos += f' {emo[i]} {i+1}. {top_dicks[i]["user_fullname"]}: {top_dicks[i]["dick_size"]}см\n' dickos += f' {emo[i]} {i+1}. {top_dicks[i]["user_fullname"]}: {top_dicks[i]["dick_size"]}см\n'
i += 1 i += 1
return dickos return dickos
def get_antitops(top_ = False, glob_ = False, chat_id = None): def get_antitops(top_ = False, glob_ = False, chat_id = None):
dicks = [] dicks = []
if top_: if top_:
@ -62,4 +60,4 @@ def get_antitops(top_ = False, glob_ = False, chat_id = None):
dickos += f' {emo[i]} {i+1}. {antitop_dicks[i]["user_fullname"]}: {antitop_dicks[i]["dick_size"]}см\n' dickos += f' {emo[i]} {i+1}. {antitop_dicks[i]["user_fullname"]}: {antitop_dicks[i]["dick_size"]}см\n'
i += 1 i += 1
return dickos return dickos

View File

@ -4,7 +4,6 @@ import datetime, logging
def store_new_user(message, result ): def store_new_user(message, result ):
try: try:
novenkiy = { novenkiy = {
'user_id': message.from_user.id, 'user_id': message.from_user.id,
@ -17,12 +16,11 @@ def store_new_user(message, result ):
metadata = collections.get_dicks_collection().insert(novenkiy, overwrite_mode='update') metadata = collections.get_dicks_collection().insert(novenkiy, overwrite_mode='update')
metric_wrap(message.chat.id, message.from_user.full_name, abs(result), 1) metric_wrap(message.chat.id, message.from_user.full_name, abs(result), 1)
logging.debug(f'Успешно добавлен нового пользователь @{message.from_user.username}') logging.debug(f'Успешно добавлен нового пользователь @{message.from_user.username}')
except Exception as e2: except Exception as e2:
logging.error(f'ошибка DB в /dick :: добавление нового пользователя @{message.from_user.username}') logging.error(f'ошибка DB в /dick :: добавление нового пользователя @{message.from_user.username}')
logging.error(e2) logging.error(e2)
def update_attempts(message, user ): def update_attempts(message, user ):
try: try:
metadata = collections.get_dicks_collection().insert( metadata = collections.get_dicks_collection().insert(
@ -39,14 +37,12 @@ def update_attempts(message, user ):
) )
metric_wrap(message.chat.id, message.from_user.full_name, user['dick_size'], user['attempts'] + 1) metric_wrap(message.chat.id, message.from_user.full_name, user['dick_size'], user['attempts'] + 1)
logging.debug(f'Успешно апдейтнули попытку крутить пипису @{message.from_user.username}') logging.debug(f'Успешно апдейтнули попытку крутить пипису @{message.from_user.username}')
except Exception as e2: except Exception as e2:
logging.error(f'ошибка DB в /dick :: обновление попытки крутить пипису @{message.from_user.username}') logging.error(f'ошибка DB в /dick :: обновление попытки крутить пипису @{message.from_user.username}')
logging.error(e2) logging.error(e2)
def update_dick_size(message, user, updatedDick ): def update_dick_size(message, user, updatedDick ):
try: try:
metadata = collections.get_dicks_collection().insert( metadata = collections.get_dicks_collection().insert(
{ {
@ -62,12 +58,11 @@ def update_dick_size(message, user, updatedDick ):
) )
metric_wrap(message.chat.id, message.from_user.full_name, updatedDick, user['attempts'] + 1) metric_wrap(message.chat.id, message.from_user.full_name, updatedDick, user['attempts'] + 1)
logging.info(f'Успешно апдейтнули пипису @{message.from_user.username}') logging.info(f'Успешно апдейтнули пипису @{message.from_user.username}')
except Exception as e2: except Exception as e2:
logging.error(f'ошибка DB в /dick :: обновление пиписы @{message.from_user.username}') logging.error(f'ошибка DB в /dick :: обновление пиписы @{message.from_user.username}')
logging.error(e2) logging.error(e2)
def get_user(message): def get_user(message):
try: try:
#### Чекнуть есть ли юзер в базе #### Чекнуть есть ли юзер в базе
@ -76,15 +71,15 @@ def get_user(message):
'user_id': message.from_user.id, 'user_id': message.from_user.id,
'chat_id': message.chat.id 'chat_id': message.chat.id
}, },
skip = 0, skip = 0,
limit = 1488 limit = 1488
) )
if candidate_cursor.count() > 0: if candidate_cursor.count() > 0:
user = candidate_cursor.pop() user = candidate_cursor.pop()
else: else:
user = None user = None
return user return user
except Exception as e: except Exception as e:
logging.error('ошибка DB в /dick :: поиск юзера') logging.error('ошибка DB в /dick :: поиск юзера')
logging.error(e) logging.error(e)

View File

@ -1,6 +1,6 @@
import random import random
def RandomDick(): def RandomDick():
# Забирает рандомный синоним к слову хуй из dick_sinonims.txt # Забирает рандомный синоним к слову хуй из dick_sinonims.txt
# у линуха и винды не забывай про разные \/ # у линуха и винды не забывай про разные \/
@ -9,4 +9,3 @@ def RandomDick():
words = file.readlines() words = file.readlines()
return random.choice(words).strip() return random.choice(words).strip()

View File

@ -3,7 +3,6 @@ import random
def ChangeWord(result): #выбирает рандомное слово для изменения размера пиписы def ChangeWord(result): #выбирает рандомное слово для изменения размера пиписы
with open("dicktxt/dick_changes.yaml", 'r', encoding='utf-8') as f: with open("dicktxt/dick_changes.yaml", 'r', encoding='utf-8') as f:
words = yaml.load(f, Loader=yaml.Loader) words = yaml.load(f, Loader=yaml.Loader)
@ -14,8 +13,8 @@ def ChangeWord(result): #выбирает рандомное слово для
if result > 0: if result > 0:
size_change = random.choice(dick_inc) + '🚀' size_change = random.choice(dick_inc) + '🚀'
elif result == 0: elif result == 0:
size_change = random.choice(dick_static) + '🤨' size_change = random.choice(dick_static) + '🤨'
else: else:
size_change = random.choice(dick_decr) + '' size_change = random.choice(dick_decr) + ''
return size_change return size_change

View File

@ -1 +1,2 @@
from . import ForReadDict from . import ForReadDict

View File

@ -1,6 +1,4 @@
from handlers import pipisa from handlers import pipisa
from handlers import start_help from handlers import start_help
from handlers import time_new_year from handlers import time_new_year

View File

@ -3,15 +3,14 @@ import time
from prometheus_client import Gauge, Info from prometheus_client import Gauge, Info
from global_conf import pipisa_length_metric, user_attempt_metric, user_last_attempt_metric from global_conf import pipisa_length_metric, user_attempt_metric, user_last_attempt_metric
class user_info_struct:
user_id: str
user_fullname: str
dick_size: int
datetimes: str
attemptsCount: int
chat_id: int
class user_info_struct:
user_id: str
user_fullname: str
dick_size: int
datetimes: str
attemptsCount: int
chat_id: int
def metric_wrap(chatID:int, userName: str, length: int, attemptsCount: int): def metric_wrap(chatID:int, userName: str, length: int, attemptsCount: int):
"""Pass here chat ID, username, pipisa length and attempts count to wrap them into metric.""" """Pass here chat ID, username, pipisa length and attempts count to wrap them into metric."""
@ -26,6 +25,6 @@ def metric_wrap(chatID:int, userName: str, length: int, attemptsCount: int):
if not user_last_attempt_metric: if not user_last_attempt_metric:
user_last_attempt_metric = Gauge('user_last_attempt_time', 'Last attempt time', labelnames=['ChatID', 'Username']) user_last_attempt_metric = Gauge('user_last_attempt_time', 'Last attempt time', labelnames=['ChatID', 'Username'])
pipisa_length_metric.labels(chatID, userName).set(length) pipisa_length_metric.labels(chatID, userName).set(length)
user_attempt_metric.labels(chatID, userName).set(attemptsCount) user_attempt_metric.labels(chatID, userName).set(attemptsCount)
user_last_attempt_metric.labels(chatID, userName).set(time.time_ns()) user_last_attempt_metric.labels(chatID, userName).set(time.time_ns())

View File

@ -5,22 +5,16 @@ from random import randint
import datetime, logging import datetime, logging
from dicktxt import ForReadDict, WordsChange from dicktxt import ForReadDict, WordsChange
from pipisa_functions import pipisa_time from pipisa_functions import pipisa_time
from global_conf import CONFIG from global_conf import CONFIG
from db_logic import tops, user_stuff, postcleaner from db_logic import tops, user_stuff, postcleaner
admins = CONFIG['telegram_admins_ids'] admins = CONFIG['telegram_admins_ids']
@dp.message_handler(commands=["dick"]) @dp.message_handler(commands=["dick"])
async def up_dick(message: types.Message): async def up_dick(message: types.Message):
if message.from_user.id in admins or message.chat.type != 'private': if message.from_user.id in admins or message.chat.type != 'private':
postcleaner.append_post_to_cleaning_sequence(message=message, type=f'COMMAND_CALL__DICK') postcleaner.append_post_to_cleaning_sequence(message=message, type=f'COMMAND_CALL__DICK')
await clean_posts(chat_id=message.chat.id, type='COMMAND_CALL__DICK', max_id=message.message_id) await clean_posts(chat_id=message.chat.id, type='COMMAND_CALL__DICK', max_id=message.message_id)
@ -55,25 +49,23 @@ async def up_dick(message: types.Message):
postcleaner.append_post_to_cleaning_sequence(message=mmm, type='ALREADY_ROLLED') postcleaner.append_post_to_cleaning_sequence(message=mmm, type='ALREADY_ROLLED')
await clean_posts(chat_id=mmm.chat.id, type='ALREADY_ROLLED', max_id=mmm.message_id) await clean_posts(chat_id=mmm.chat.id, type='ALREADY_ROLLED', max_id=mmm.message_id)
else: else:
## если нету, то создать ## если нету, то создать
user_stuff.store_new_user(message, result) user_stuff.store_new_user(message, result)
await bot.send_message( message.chat.id, await bot.send_message( message.chat.id,
f'@{message.from_user.username}, Добро пожаловать в игру! Ваш писюн показал головку 🚀\nна <b>{abs(result)}</b> см!\nТеперь он равен <b>{abs(result)}</b> см!' f'@{message.from_user.username}, Добро пожаловать в игру! Ваш писюн показал головку 🚀\nна <b>{abs(result)}</b> см!\nТеперь он равен <b>{abs(result)}</b> см!'
) )
else: else:
await message.reply('Растить елду можно только в общих чатах!') await message.reply('Растить елду можно только в общих чатах!')
@dp.message_handler(commands = ['topdick', 'globaldick']) @dp.message_handler(commands = ['topdick', 'globaldick'])
async def send_topchat(message: types.Message): async def send_topchat(message: types.Message):
if message.from_user.id in admins or message.chat.type != 'private': if message.from_user.id in admins or message.chat.type != 'private':
top_ = message['text'].startswith('/topdick') top_ = message['text'].startswith('/topdick')
glob_ = message['text'].startswith('/globaldick') glob_ = message['text'].startswith('/globaldick')
if top_: if top_:
dickos = tops.get_tops( top_ = True, chat_id=message.chat.id ) dickos = tops.get_tops( top_ = True, chat_id=message.chat.id )
@ -84,21 +76,19 @@ async def send_topchat(message: types.Message):
postcleaner.append_post_to_cleaning_sequence(message=message, type=f'COMMAND_CALL__GLOBALDICK') postcleaner.append_post_to_cleaning_sequence(message=message, type=f'COMMAND_CALL__GLOBALDICK')
await clean_posts(chat_id=message.chat.id, type='COMMAND_CALL__GLOBALDICK', max_id=message.message_id) await clean_posts(chat_id=message.chat.id, type='COMMAND_CALL__GLOBALDICK', max_id=message.message_id)
else: else:
logging.error('вызывают хз что!') logging.error('вызывают хз что!')
if not dickos: if not dickos:
await bot.send_message(message.chat.id, '🍆 Никто ничего не нарастил! 🍌') await bot.send_message(message.chat.id, '🍆 Никто ничего не нарастил! 🍌')
else: else:
if top_: if top_:
await bot.send_message(message.chat.id, '🏆Топ 10 бубылд чата🏆\n\n' + dickos) await bot.send_message(message.chat.id, '🏆Топ 10 бубылд чата🏆\n\n' + dickos)
elif glob_: elif glob_:
await bot.send_message(message.chat.id, '🏆Топ 10 пипис в мире🏆\n\n' + dickos) await bot.send_message(message.chat.id, '🏆Топ 10 пипис в мире🏆\n\n' + dickos)
else: else:
await message.reply('Работает только в общих чатах!\n'\ await message.reply('Работает только в общих чатах!\n'\
'Вы можете посмотреть топ по миру /globaldick') 'Вы можете посмотреть топ по миру /globaldick')
@dp.message_handler(commands = ['antitopdick', 'antiglobaldick']) @dp.message_handler(commands = ['antitopdick', 'antiglobaldick'])
async def send_antitopchat(message: types.Message): async def send_antitopchat(message: types.Message):
if message.from_user.id in admins or message.chat.type != 'private': if message.from_user.id in admins or message.chat.type != 'private':
@ -127,10 +117,8 @@ async def send_antitopchat(message: types.Message):
await message.reply('Работает только в общих чатах!\n'\ await message.reply('Работает только в общих чатах!\n'\
'Вы можете посмотреть антитоп по миру /antiglobaldick') 'Вы можете посмотреть антитоп по миру /antiglobaldick')
async def clean_posts(chat_id, type=None, max_id=None): async def clean_posts(chat_id, type=None, max_id=None):
psts = postcleaner.get_posts_to_be_removed(chat_id, type, max_id) psts = postcleaner.get_posts_to_be_removed(chat_id, type, max_id)
for p in psts: for p in psts:
postcleaner.del_post(chat_id=p['chat_id'], msg_id=p['msg_id']) postcleaner.del_post(chat_id=p['chat_id'], msg_id=p['msg_id'])
@ -138,9 +126,7 @@ async def clean_posts(chat_id, type=None, max_id=None):
#### TODO проверить админит ли бот #### TODO проверить админит ли бот
try: try:
await bot.delete_message( chat_id=p['chat_id'], message_id=p['msg_id'], ) await bot.delete_message(chat_id=p['chat_id'], message_id=p['msg_id'])
except Exception as e: except Exception as e:
logging.error('ошибка удаления поста-на-удаление') logging.error('ошибка удаления поста-на-удаление')
logging.error(e) logging.error(e)

View File

@ -5,23 +5,14 @@ from create_bot import dp, bot
@dp.message_handler(commands=['start']) @dp.message_handler(commands=['start'])
async def start_func(message: types.Message): async def start_func(message: types.Message):
if message.chat.type == 'private': if message.chat.type == 'private':
lkb = InlineKeyboardMarkup(row_width=1).add(InlineKeyboardButton(text = 'Добавить в группу', callback_data='add_2_group')) lkb = InlineKeyboardMarkup(row_width=1).add(InlineKeyboardButton(text = 'Добавить в группу', callback_data='add_2_group'))
await message.answer(f'<b>Добро пожаловать, {message.from_user.full_name}!</b>\n' await message.answer(f'<b>Добро пожаловать, {message.from_user.full_name}!</b>\n'
'Это бот, который растит члены. Чтобы начать, добавь бота в чат'\ 'Это бот, который растит члены. Чтобы начать, добавь бота в чат'\
, reply_markup=lkb , reply_markup=lkb
) )
# TODO добавить кнопку, по которой смогут добавлять юзера в чат # TODO добавить кнопку, по которой смогут добавлять юзера в чат
@dp.callback_query_handler(text = 'add_2_group') @dp.callback_query_handler(text = 'add_2_group')
async def addgroup(callback: types.CallbackQuery): async def addgroup(callback: types.CallbackQuery):
await callback.answer('жмав') await callback.answer('жмав')

View File

@ -2,9 +2,7 @@ import unittest
import datetime import datetime
def rolltime(check_datetime , curr_time=datetime.datetime.now()): def rolltime(check_datetime , curr_time=datetime.datetime.now()):
last_time = datetime.datetime.strptime(check_datetime, '%Y-%m-%d %H:%M:%S') last_time = datetime.datetime.strptime(check_datetime, '%Y-%m-%d %H:%M:%S')
time_to_grow = curr_time.replace(hour=10, minute=0, second=0, microsecond=0) #+ datetime.timedelta(days=0) time_to_grow = curr_time.replace(hour=10, minute=0, second=0, microsecond=0) #+ datetime.timedelta(days=0)
@ -16,34 +14,30 @@ def rolltime(check_datetime , curr_time=datetime.datetime.now()):
# print(f'\tcurr_time : {curr_time.strftime("%Y-%m-%d %H:%M:%S")}') # print(f'\tcurr_time : {curr_time.strftime("%Y-%m-%d %H:%M:%S")}')
# print(f'\t\tsecs diff : {timediff}') # print(f'\t\tsecs diff : {timediff}')
if timediff > 86400 or (curr_time > time_to_grow and last_time < time_to_grow): if timediff > 86400 or (curr_time > time_to_grow and last_time < time_to_grow):
return True return True
else: else:
return False return False
class TestPipisa(unittest.TestCase): class TestPipisa(unittest.TestCase):
def test_very_long_ago(self): def test_very_long_ago(self):
self.assertEqual(rolltime('2021-10-11 23:44:00'), True, "Пиписа прокручена очень давно. Очевидный прокрут") self.assertEqual(rolltime('2021-10-11 23:44:00'), True, "Пиписа прокручена очень давно. Очевидный прокрут")
def test_just_after(self): def test_just_after(self):
self.assertEqual(rolltime( self.assertEqual(rolltime(
datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
), False, "Попытка крутить когда старый прокрут был через 1 минуту после сегодняшнего обновления роллов. Фейл.") ), False, "Попытка крутить когда старый прокрут был через 1 минуту после сегодняшнего обновления роллов. Фейл.")
def test_yesterdayroll(self): def test_yesterdayroll(self):
self.assertEqual( self.assertEqual(
rolltime( rolltime(
( (
datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(days=1) datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(days=1)
).strftime('%Y-%m-%d %H:%M:%S'), ).strftime('%Y-%m-%d %H:%M:%S'),
datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0)
), True, "Крутилась вчера через минуту после обновления роллов. Должно пропускать ") ), True, "Крутилась вчера через минуту после обновления роллов. Должно пропускать ")
def test_hour_before_roll(self):
def test_hour_before_roll(self):
self.assertEqual( self.assertEqual(
rolltime( rolltime(
( (
@ -55,33 +49,29 @@ class TestPipisa(unittest.TestCase):
def test_after_midnight(self): def test_after_midnight(self):
self.assertEqual( self.assertEqual(
rolltime( rolltime(
( (
datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=14) datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=14)
).strftime('%Y-%m-%d %H:%M:%S'), ).strftime('%Y-%m-%d %H:%M:%S'),
datetime.datetime.now().replace(hour=0, minute=11, second=8, microsecond=0) datetime.datetime.now().replace(hour=0, minute=11, second=8, microsecond=0)
), False, "Где-то ночью прокручивается повторно. Фейл.") ), False, "Где-то ночью прокручивается повторно. Фейл.")
def test_before_midnight(self): def test_before_midnight(self):
self.assertEqual( self.assertEqual(
rolltime( rolltime(
( (
datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=15) datetime.datetime.now().replace(hour=10, minute=1, second=0, microsecond=0) - datetime.timedelta(hours=15)
).strftime('%Y-%m-%d %H:%M:%S'), ).strftime('%Y-%m-%d %H:%M:%S'),
datetime.datetime.now().replace(hour=0, minute=11, second=8, microsecond=0) - datetime.timedelta(hours=15) datetime.datetime.now().replace(hour=0, minute=11, second=8, microsecond=0) - datetime.timedelta(hours=15)
), False, "Где-то ночью прокручивается повторно. Фейл.") ), False, "Где-то ночью прокручивается повторно. Фейл.")
def test_somewhere_after(self): def test_somewhere_after(self):
self.assertEqual( self.assertEqual(
rolltime( rolltime(
( (
datetime.datetime.now().replace(hour=10, minute=18, second=0, microsecond=0) datetime.datetime.now().replace(hour=10, minute=18, second=0, microsecond=0)
).strftime('%Y-%m-%d %H:%M:%S'), ).strftime('%Y-%m-%d %H:%M:%S'),
datetime.datetime.now().replace(hour=10, minute=19, second=0, microsecond=0) datetime.datetime.now().replace(hour=10, minute=19, second=0, microsecond=0)
), False, "Сегодня уже открутили а хотят ещё. Фейл.") ), False, "Сегодня уже открутили а хотят ещё. Фейл.")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main(verbosity=2) unittest.main(verbosity=2)