Введение

В этой инструкции мы создадим приложение, которое интегрируется со сторонними API. Разберем I/O Bound нагрузку и поработаем с асинхронностью в Python. 

Часть сервера Telegram-бота мы будем писать на С, так как он считается перформанс-ориентированным языком, поэтому посмотрим также модуль обработки СPU Bound нагрузки, использующийся для сложных вычислений процессора. 

Мы постараемся объяснить все: от развертывания каждого модуля до настройки проекта на удаленном сервере.

Работать все это будет в вебе, поэтому для примера клиент-серверной архитектуры используется Telegram. У нас по дефолту будет мобильный+десктопный клиент, поэтому не придется писать фронтенд самому.

Теория

У многих приложений есть открытый интерфейс (API), к которому можно подключиться из своих программ. Например, так можно подключиться к YouTube и попросить сервис прислать описания всех роликов из конкретного плейлиста.

Что такое CPU Bound и I/O Bound нагрузка. Примеры

  1. Есть какая-то задача Х, которая сильно нагружает процессор. Например, одни из самых медленных операций, которые выполняет современный процессор — деление или тригонометрические функции типа sin или cos. Такие сложные для процессора операции называют CPU Bound нагрузкой.
  2. Если операции упираются в скорость сети, скорость записи или чтения  какого-то файла с диска или ожидания запроса из базы данных — это I/O Bound нагрузка. Отправили запрос к серверу и ждете, пока он придет — процессор простаивает, как правило, 90% времени.

Как эти проблемы решаются в Python?

  1. Можно использовать модуль мультипроцессинга, который распараллелит вычисления на несколько процессов на нескольких реальных ядрах. Первый оверхед — создание новых процессов в операционной системе — достаточно дорогая операция. А их взаимодействие — второй оверхед, то есть обмен данными между ними — не самая простая вещь. Как минимум, чтобы передать объект между ними, объект должен быть сериализован и десериализован соответствующе. Если кратко: сериализация — это очень дорого.
  2. Мультитрединг — дешевле, особенно вскейле, то есть когда мы планируем масштабировать и создавать большие пулы потоков. Потому что поток — это более легковесная сущность с точки зрения операционной системы и в сравнении с процессом, как минимум.

Самое важное, что потоки расшаривают одну и ту же область памяти внутри одного процесса. То есть обмен данными между ними организован гораздо проще. 

В Python есть GIL или блок интерпретатора, с которым могут быть проблемы. В каждый конкретный момент времени может работать только один, залоченный поток, а другим приходится ждать.

В случае решения второй проблемы и I/O Bound нагрузки, то в Python она решается асинхронным программированием.

Практика

О том, как создаются Telegram-боты, можно посмотреть здесь. Начнем с нашего интерфейса.

Возьмем первую библиотеку для Python: Python-telegram-bot

Чтобы не захламлять компьютер чем-то лишним, будем использовать питоновскую виртуальную среду (venv). Это похоже на изолированный контейнер, в котором можно проводить любые опыты, и доступно это только внутри этой папки. 

Сначала нам нужен токен от BotFather для аутентификации с API Telegram. Плюс здесь можно устанавливать ряд других настроек: изменить описание, поставить моды. BotFather по сути представляет собой админку. 

Токен мы будем хранить в скрытых данных. За это в Python отвечает модуль dotenv, который также ставится внутри нашей виртуалки, и в сам файл уже переносим токен.

    from bot.bot import start, list_files, upload_file, search_button, cython, pi_button, FIRST, SECOND
from dotenv import load_dotenv
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, CallbackQueryHandler, ConversationHandler
import os
 
async def hello(update: Update, context: ContextTypes.DEFAULT_Type) ->Name: Name: 
	await update.message.replay_text(f'Hello(update.effective_user.first_name)')
 
 
 load_dotenv()
   application = ApplicationBuilder().token(os.getenv('TOKEN')).build()
   app.add_handler(CommandHandler(“hello”, hello))
   app.run.polling()

Чтобы подружить Telegram с Google Drive воспользуемся API /quickstart/python. Здесь нам нужно получить что-то вроде токена, как и в случае аутентификации с API Telegram, только в Google API это называется(Credentials). В main.py мы должны добавить и токен, и обработчик функции start.

По этой логике нужно написать еще одну функцию и добавить ее в еще один handler уже для сэмпла от Google Drive. 

async list_files() 

Добавляем еще один CommandHandler (обработчик команд или все, что мы пишем через слэш).

В Telegram, проверить работу функции можно командой /files, которую мы можем легко добавить в админке BotFather

По коду Google: мы достаем секретки и вытаскиваем все файлы, дальше выводим их в консоль. Можно сделать, чтобы ответ приходил не в консоль сервера, а сразу в бот на клиент.

Загрузка файлов

Для демонстрации возможностей добавим еще и загрузку файлов. Google предлагает здесь обширный список того, что можно делать: Работа со всеми видами файлов, работа с UI Google Drive, и так далее. 

Нам для интерактивности нужна динамика, поэтому мы будем грузить файл сначала на наш сервер через интерфейс Telegram, а уже оттуда его перенаправлять в Google Drive.

Чтобы загрузить файл сначала к нам на сервер, воспользуемся вот этим код-сниппетом из библиотеки telegram-bot-api а конкретно Download file.

Промежуточный итог

У нас есть сервер. Место, где мы разворачиваем наш код: удаленный или наш компьютер. Там мы запускаем наш Telegram-бот. Через интерфейс Telegram-бота, который нам дается по стандарту, мы можем загрузить файл, и он будет у нас лежать либо на компьютере, либо на удаленном сервере. 

Этим сниппетом сначала загружаем файлы на компьютер. В отдельную функцию вынесем аутентификацию с Google Drive, чтобы было почище. Дальше во все захардкоденные сниппеты Google мы добавим наши динамические данные в виде имен файлов и путей. Все это достается из аргументов Update и Context. 

Добавим также обработчик команд MessageHandler для медиасообщений. 

Теперь, если вызвать эту функцию, то сначала файл появится в нашей папке Downloaded, а потом в Google Drive. 

В конечном счете код в файле main.py у нас должен выглядеть так:

    from bot.bot import start, list_files, upload_file, search_button, cython, pi_button, FIRST, SECOND
from dotenv import load_dotenv
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, CallbackQueryHandler, ConversationHandler
import os
 
 
def main():
   import cProfile
   import pstats
 
   load_dotenv()
   application = ApplicationBuilder().token(os.getenv('TOKEN')).build()
  
   application.add_handler(CommandHandler('start', start))
   with cProfile.Profile() as pr:
       application.add_handler(CommandHandler('files', list_files))
   stats = pstats.Stats(pr)
   stats.sort_stats(pstats.SortKey.TIME)
   stats.dump_stats(filename='profiling.prof')
 
 
   conv_handler = ConversationHandler(
           entry_points=[CommandHandler("cython", cython)],
           states={
               FIRST: [CallbackQueryHandler(search_button)],
               SECOND: [CallbackQueryHandler(pi_button)],
           },
           fallbacks=[CommandHandler("cython", cython)],
       )
 
   application.add_handler(conv_handler)
  
   application.add_handler(MessageHandler(filters.Document.ALL, upload_file))
 
 
   application.run_polling()
 
 
if __name__ == '__main__':
   main()

На этом мы закончили работу с I/O Bound нагрузкой и ее функциями. А что если мы хотим добавить функциональность по нагрузке процессора? Вариаций действительно много. Можно сделать промежуточный лэйер, который будет брать видео, которые мы грузим, монтажить их в клип, сжимать и выгружать на Google Drive уже готовые препродакшн видео.

Модуль для сложных вычислений на процессоре

Следующая фича на бэкенде. Мы сейчас добавим модуль для CPU Bound обработки именно на процессоре, и делать это будет Python в связке с языком С. Более того, делать мы это будем в нескольких потоках. То есть мы отключим питоновский GIL. 

Потенциально это может создать много проблем с синхронизацией, но тема многопоточности — это отдельная история, которую мы в рамках инструкции вынесем за скобки. 

Первый модуль будет большой аллокацией + бинарный поиск по массиву. Заодно так проверим, как это работает. Папка lowlevel для C-файлов и сразу заведем модуль с парой простых функций.

    int*allocate(int N)
	int* ptr = malloc(sizeof(int)* N);

Выделяем память для массива данных. Внутри функции поиска элементов будем вызывать наши функции для аллокации и генерации данных. Так можно увидеть наглядно, как работает процессор линейно, в нескольких потоках и в нескольких процессах. Эта практика сейчас используется для наглядности, но в боевых проектах так делать не стоит.

Чтобы вызывать код на С из-под Python нам потребуется Cython. Это суперсет Python, который позволяет писать на C прямо внутри Python. Чтобы все это работало, нужно поставить это в виртуалку и добавить файл для интерфейса Python и C.

    cdef extern from "lowlevel/module.c" nogil;
	int binary_search_imp(int index)

	cdef int b_search_(int index): 

Затем создаем промежуточную функцию, которая будет вызывать нашу экспортированную C функцию.

    cdef int b_search(int index):
	return binary_search_impl(index);

def binary_search(int index, results = None):
	cdef int result;
	with nogil:

	result = b_search(index);

	if results is not None:

	results.append(result)

	return result


Самое важное — поставить аннотацию nogil, то есть здесь мы отключаем лок интерпретатора (GIL). 

Итак, у нас есть С файл с core-функциями. У нас есть файл, чтобы этими функциями пользоваться в самом Python. 

Теперь в новом файле опишем все Python-функции. Чтобы не путаться, назовем его, например, performance.py. В него сразу добавим функцию по вычислению дельты времени, от начала и конца исполнения. Она позволит наглядно увидеть скорость Python в линейном режиме, многопоточном и в многопроцессном.

Функция, внутри которой есть функция-обертчик, где есть вызов функции, которую мы декорируем. Чтобы протестить все, что мы написали, нам нужны три основных функции.

Линейная. Все вычисления будут происходить по очереди в одном потоке, как обычно работает Python. Асинхронность здесь не сработает, потому что все это нагружает именно процессор, а не любого рода ввод/вывод.

    def linear(N)
results = []
binary_search(N, results)

return results

Многопоточная. Создаем пять потоков по одному вызову в каждом. 

    def multithreaded(N):

results = []

Мультипроцессинговая. Функция с изолированным друг от друга процессами: shared.list.

    shared.list.

def multithreaded(N):

results = mp.Managed.lists()

Теперь перейдем к следующему конфигу, по настройке Cython. Назовем его setup.py. Что здесь должно быть?

  • файл, который нужно скомпилировать,
  • модуль, который мы создаем и будем использовать,
  • функция setup, которая будет все это структурировать.

Код выглядит так:

    from setuptools import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
 
sourceFiles = ['bot/nogil.pyx']
 
ext_modules = [
   Extension("nogil",
   sourceFiles
   ),
]
 
setup(
   name='google drive app',
   cmdclass={'build_ext': build_ext},
   ext_modules=ext_modules
)

Добавим простой bash-script, чтобы собирать все было удобнее.

    # !/bin/bash
 
[! -e "nogil.c" ] || rm "nogil.c"
find . -name "*.so" -type f -delete
 
python3 setup.py build_ext -b build
 
mv bot/*.c bot/lowlevel/

Теперь все должно адекватно пересобираться и раскладываться по своим местам.

Чтобы в самом Telegram-боте это выглядело красиво, добавим пару кнопок. В Inline-боте нужны две функции: описание и кнопка + события, которые будут происходить, когда мы будем на нее нажимать.

Вызываем функцию бинарного поиска в наших 3-х тестах. Линейном, многопоточном и многопроцессном, дергаем наши три функции. Добавляем обработчики  для кнопок и можно тестировать. 

Почти секунда у нас уйдет, чтобы выделить необходимую память для каждого вызова функции. Многопоточность обгоняет линейную более чем на 50%. Многопроцессность также быстрее, но все же уступает многопоточности.

В конце у нас должен получиться примерно такой код в bot.py:

    from __future__ import print_function
 
import os
 
 
import logging
from telegram import Update
from telegram.ext import ContextTypes
 
 
import os.path
 
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from bot.performance import linear, multithreaded, multiprocessed, linear_pi, mt_pi, mp_pi
 
 
 
logging.basicConfig(
   format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
   level=logging.INFO
)
 
 
def creds():
       # If modifying these scopes, delete the file token.json.
   SCOPES = ['https://www.googleapis.com/auth/drive']
 
   """Shows basic usage of the Drive v3 API.
   Prints the names and ids of the first 10 files the user has access to.
   """
   creds = None
   # The file token.json stores the user's access and refresh tokens, and is
   # created automatically when the authorization flow completes for the first
   # time.
   if os.path.exists('token.json'):
       creds = Credentials.from_authorized_user_file('token.json', SCOPES)
   # If there are no (valid) credentials available, let the user log in.
   if not creds or not creds.valid:
       if creds and creds.expired and creds.refresh_token:
           creds.refresh(Request())
       else:
           flow = InstalledAppFlow.from_client_secrets_file(
               'credentials.json', SCOPES)
           creds = flow.run_local_server(port=0)
       # Save the credentials for the next run
       with open('token.json', 'w') as token:
           token.write(creds.to_json())
   return creds
 
 
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
   await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!")
 
 
async def list_files(update: Update, context: ContextTypes.DEFAULT_TYPE):
   try:
       service = build('drive', 'v3', credentials=creds())
 
       # Call the Drive v3 API
       results = service.files().list(
           pageSize=10, fields="nextPageToken, files(id, name)").execute()
       items = results.get('files', [])
 
       if not items:
           print('No files found.')
           return
       #4.362e-06
       # l = []
       # for f in items:
       #     l.append(f.get("name"))
 
       print('Files:')
       #4.103e-06
       for item in items:
           await context.bot.send_message(chat_id=update.effective_chat.id, text=item.get("name"))
           print(u'{0} ({1})'.format(item['name'], item['id']))
 
   except HttpError as error:
       # TODO(developer) - Handle errors from drive API.
       print(f'An error occurred: {error}')
 
 
 
async def upload_file(update: Update, context: ContextTypes.DEFAULT_TYPE):
 
   newFile = await update.message.effective_attachment.get_file()
   await newFile.download(custom_path="downloaded/" + update.message.effective_attachment.file_name)
 
 
   try:
       # create drive api client
       service = build('drive', 'v3', credentials=creds())
 
       file_metadata = {'name': update.message.document.file_name}
       media = MediaFileUpload("downloaded/"+update.message.document.file_name,
                               mimetype=update.message.document.mime_type, resumable=True)
       # pylint: disable=maybe-no-member
       file = service.files().create(body=file_metadata, media_body=media,
                                     fields='id').execute()
       print(F'File ID: {file.get("id")}')
 
   except HttpError as error:
       await context.bot.send_message(chat_id=update.effective_chat.id, text="Failure")
       print(F'An error occurred: {error}')
       file = None
 
   await context.bot.send_message(chat_id=update.effective_chat.id, text="Success")
 
   return file.get('id')
 
 
FIRST, SECOND = range(2)
 
 
async def cython(update: Update, context: ContextTypes.DEFAULT_TYPE):
   keyboard = [
       [InlineKeyboardButton(u"Search", callback_data=str(FIRST))]
   ]
 
   reply_markup = InlineKeyboardMarkup(keyboard)
 
   await update.message.reply_text(u"First module", reply_markup=reply_markup)
 
   return FIRST
 
 
async def search_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
   query = update.callback_query
 
   keyboard = [
       [InlineKeyboardButton(u"Pi", callback_data=(SECOND))]
   ]
 
   index = ((2**31-1) // 32 - 1)
 
   await context.bot.send_message(chat_id=update.effective_chat.id, text="lin: "+linear(index))
   await context.bot.send_message(chat_id=update.effective_chat.id, text="mt: "+multithreaded(index))
   await context.bot.send_message(chat_id=update.effective_chat.id, text="mp: "+multiprocessed(index))
 
   #reply_markup = InlineKeyboardMarkup(keyboard)
 
   await context.bot.edit_message_text(chat_id=query.message.chat_id, message_id=query.message.message_id, text=u"Second module")
 
   reply_markup = InlineKeyboardMarkup(keyboard)
 
   await context.bot.edit_message_reply_markup(chat_id=query.message.chat_id, message_id=query.message.message_id, reply_markup=reply_markup)
 
   return SECOND
 
 
async def pi_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
   query = update.callback_query
 
 
   await context.bot.send_message(chat_id=update.effective_chat.id, text="lin: "+linear_pi(100000000))
   await context.bot.send_message(chat_id=update.effective_chat.id, text="mt: "+mt_pi(100000000))
   await context.bot.send_message(chat_id=update.effective_chat.id, text="mp: "+mp_pi(100000000))
 
 
   await context.bot.edit_message_text(chat_id=query.message.chat_id, message_id=query.message.message_id, text=u"You can add more CPU bound modules")
 

Тестируем на Linux сервере

Мы будем использовать последнюю версию Ubuntu и выберем самое простое железо. Кроме этого, с помощью Shared Line можно использовать только 10% ресурсов, поэтому это будет еще дешевле, но под наш pet-проект хватит. В день такая конфигурация будет стоить не больше 15 рублей.

Поднимаем и обновляем там все, потому что это  fresh Linux. Далее загружаем весь код бота на сервер, и можно приступать к тестированию: закинем пару файлов в наш Telegram-бот. Тестируем работу загрузки файлов. Вывода файлов, и сами функции по нагрузке CPU bound вычислений.

Заключение

Мы подключаемся к открытому API Google Drive, и вместо того, чтобы в самом диске нажимать разные кнопки, мы можем управлять им из-под Telegram-бота. Сделать это можно не только с диском, но и с любым сервисом, у которого есть открытый API.

Кроме этого, мы рассмотрели пару фич для работы с CPU Bound нагрузками, что может помочь разгрузить какие-то сложные для процессора вычисления.


Что дальше?

Зарегистрироваться в панели управления

Регистрируйте аккаунт в панели управления Selectel, пополняйте баланс удобным способом и подключайте наши продукты.
Перейти в панель

Узнать о продуктах больше

Все о принципах работы, задачах и фичах читайте на нашем сайте.
Перейти на сайт

Комментарии