from band import settings, logger, expose, worker, cleanup, response, rpc, scheduler
Framework для Python, который заточен под простое создание маркетинговых и аналитических автоматизаций. Можно делать реактивную обработку событий, задачи по расписанию, реализовывать произвольную логику. Имеются специфичные виды ответов, такие, как pixel, redirect. Также имеется логирование, управление конфигурацией, врапперы для множества других функций.
response.redirect(location, statusCode=302, data={})
Вернуть HTTP редирект
Arguments:
front
или при отключенной обработке,
например при подключении по websocket@expose.handler(name='*')
async def go(name, **params):
return response.redirect(f'https://www.yandex.ru/search/?lr=213&text={name}')
response.pixel()
Вернуть прозрачный пиксель.
Arguments:
front
или при отключенной обработке,
например при подключении по websocket@expose.handler()
async def pix(**params):
state.counter += 1
return response.pixel()
response.error(message="", statusCode=500, data={})
Вернуть ошибку.
front
или при отключенной обработке,
например при подключении по websocket@expose.handler()
async def error(**params):
return response.error('Some F*cking error')
response.data(data, statusCode=200)
или просто response(data)
@expose.handler()
async def get_state(**params):
return response.data(state)
# short version
@expose.handler()
async def long_method(**params):
await asyncio.sleep(15)
return response(params)
# or without response
@expose.handler()
async def alias(**params):
return {'message': 'you are catch me!'}
Анонсирование методов и предоставление доступа к ним. Анонсирование производится в сервис front
,
где обрабатывается жизненный цикл входящих запросов подробне.
Изначально все методы являются внутренними, но к ним может быть открыт доступ по RPC и HTTP
(доступен извне только в особых случаях).
@expose()
Сделать метод доступным другим сервисам, но при этом не анонсировать.
Обращаться к этому методу можно будет по имени и названию сервиса.
Сервис front
ничего не узнает о нем и не будет прокидвать запросы снаружи.
from band import expose
@expose()
async def main(**params):
print(params)
expose.enricher(props: dict, keys: list)
Arguments:
front
знал, кто ему должен ответить на ключ.Метод будет зарегистрирован в front
service.
Метод обязательно должен называться enrich
.
from band import expose
@expose.enricher(props={'ip': 'td.ip', 'ua': 'td.ua', 'uid': 'uid'}, ['in.gen.track']})
async def enrich(ip, ua, uid, **params):
print(ip, ua, uid, params)
return {'param1': 'value1'}
Возвращенное значение будет добавлено к входящему запросу. Ключом будет имя сервиса.
После выполнения всех enrichers
запрос будет передан в handler
, а затем всем listener
-ам.
В следующем примере фигурирует результат 4 обогатителей: uaparser
, mmgeo
, sxgeo
, fpid
.
{
"key": "in.gen.track.page",
"service": "track",
"name": "page",
//....
"uid": "6450101900745375744",
"data": {
"service": "track",
"name": "page",
"uid": "6450101900745375744",
"page": { "title": "ODL Shop", "ref": "https://app.test.rstat.org/logs", ... },
"sess": { "pageNum": 1, "eventNum": 0, "type": "referral", "marks": {}, ... },
//...
"uaparser": { "os_family": "Mac OS X", "os_version": [ 10, 13, 6 ], ... },
"mmgeo": { "country_en": "Russia", "country_ru": "Россия", ... },
"sxgeo": { "country_en": "United Kingdom", "country_ru": "Великобритания", ... },
"fpid": "14290956575723105017",
"id": "6462522520963645440",
}
expose.handler(name=None, path=None, alias=None, timeout=None)
Arguments:
name='*'
./mymethod/:id
. По http доступны только системные сервисы.front
service будет ждать ответа от этого метода. По умолчанию 200ms (1/5 секунды).Метод будет зарегистрирован в front
service. Запросы с совпадающим именем сервиса будут направлены на него. Возвращаемое значение будет возвращено наружу.
from band import expose
@expose.handler()
async def my_method(**params):
pass
с wildcard
from band import expose
# method accessible on https://your.domain/go/*any_key*
@expose.handler(alias='go', name='*')
async def wildcard(name, **params):
url = state.urls.get(name, None)
return response.redirect(url) if url else response.error('Key not found')
expose.listener()
Метод будет зарегистрирован в front
service. Попадут все сообщения на финальной стадии обработки. Метод обязательно должен называться broadcast
. Возвращаемое значение не учитывается.
from band import expose
@expose.listener()
async def broadcast(**params):
pass
Mark function as startup handler, which will be executed when application starts. Otherside can be user for periodicaly operations like flushes, counters, etc.
@worker()
Метод будет запущен при старте приложения и будет синхронно выполдняться до тех пор, пока не завершится. Чтобы метод работал посстоянно используйте бесконечный цикл. Крайне желательно ставить паузу между циклами.
from band import worker
@worker()
async def do_job(**params):
while True:
state.counter += 1
await asyncio.sleep(10)
Продвинутый пример
from itertools import count
from band import expose
@worker()
async def service_worker():
for num in count():
# Обрабатываем ошибки, избегая падения
try:
# Перовый проход цикла для инициализации
if num == 0:
state.loaded = True
# ...
# Код выполняющийся каждый цикл
state.loop = num
# ...
except asyncio.CancelledError:
# Обработка остановки
break
except Exception:
# Другие исключения
logger.exception('my service exeption')
# Ждем 5 секунд перед следующим проходом
await asyncio.sleep(5)
Mark function as shutdown handler which will be executed on stop.
@cleanup()
Метод будет заапущен при остановке сервиса. Никаких тяжелых операций выполнять нельзя. На выполнение дается 1/10 секунды, после чего метод будет насильно завершен.
from band import cleanup
@cleanup()
async def service_cleanup():
# Операции, которые будут выполнены при остановке сервиса
state.loaded = False
Function blocking_sync_code
will be executed in separated thread when you call them
and your async code running in main executor thead will no block.
from band import blocking
@blocking()
def blocking_sync_code(param1, param2):
executing_long_time_code()
@expose()
async def yor_fast_async_func():
await blocking_sync_code(10, 30)
Remember! Thread execution should not access data outside executed function! All data must be passed as function arguments!
Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second. More information [https://docs.python.org/3/library/asyncio-dev.html#running-blocking-code](Running Blocking Code)
await rpc.request(service, name, param1='value1', param2='value2')
await scheduler.spawn(coro)
from band import logger
uid = '12312321312'
logger.debug('message', data={'user': uid})
другие методы
logger.debug('message')
logger.info('message')
logger.warn('message')
logger.error('message')
logger.exception('message') # будет показан stack trace
Для поддержки логов с параметрами используются structlog
и python-json-logger
.
Формат логов зависит от значения переменной JSON_LOGS
.
Справочник содержащий данные из config.yml
, при обработке конфига поставляются переменные откружения,
как систеемные, так и пользовательсткие.
# config.yml
pool:
- 74994031705
- 74994040139
- 74994040921
- 74994033095
- 74994040239
extra:
- 74994041156
# Подстановка переменной окружения API_KEY
api_key: "{{API_KEY}}"
В коде сервиса
# main.py
from band import settings
print(settings.extra)
['74994041156']
from band import redis_factory