Архитектура
Обработка входящих запросов
Коммуникация сервисов
Руководство пользователя
Получение данных из сервисов
Использование Панели управления
Использование инструментов из комплекта
Работа с Theia
Мониторинг Netdata
Работа с Jupyter
Работа с ClickHouse
Основные принципы
Модификация схемы
Подключение к VPN
Справочная информация
HTTP Redirect
Каналы получения данных
WebSocket
Загрузка больших файлов
JSON-RPC 2.0 RST
API сервисов
Director API
Front API
RockMe Framework (TypeScript)
Переменные окружения
Маппинг путей
Сетевая инфраструктура
Схема хранилища ClickHouse
Создание сервисов
Python + Band Framework
Организация сервиса
Коммуникация с другими сервисами
Работа с ClickHouse
Другие возможности
TypeScript + Rockme Framework
Организация сервиса
Туториалы
Получение данных из других сервисов
Сбор сырых данных Google Analytics
Создание динамического Calltracking
Построение истории отдельного пользователя
Классические модели атрибуции
Вероятностное прохождение воронки
Атрибуция по индексу активности
Воронки
Реализация Cookie-Sync
Сегментация пользователей
X
Выберите раздел

Band framework API

from band import settings, logger, expose, worker, cleanup, response, rpc, scheduler

band.response

redirect

Framework для Python, который заточен под простое создание маркетинговых и аналитических автоматизаций. Можно делать реактивную обработку событий, задачи по расписанию, реализовывать произвольную логику. Имеются специфичные виды ответов, такие, как pixel, redirect. Также имеется логирование, управление конфигурацией, врапперы для множества других функций.

response.redirect(location, statusCode=302, data={})

Вернуть HTTP редирект

Arguments:

  • location: полный адрес, куда необходимо совершить перенаправление (c http/https)
  • statusCode: код перенаправлениея 301, 302, 303, etc...
  • data: произвольные данные, будут доступны сервису front или при отключенной обработке, например при подключении по websocket
@expose.handler(name='*')
async def go(name, **params):
    return response.redirect(f'https://www.yandex.ru/search/?lr=213&text={name}')

pixel

response.pixel()

Вернуть прозрачный пиксель.

Arguments:

  • data: произвольные данные, будут доступны сервису front или при отключенной обработке, например при подключении по websocket
@expose.handler()
async def pix(**params):
    state.counter += 1
    return response.pixel()

error

response.error(message="", statusCode=500, data={})

Вернуть ошибку.

  • message: Сообщение об ошибки. Если не указать, будет подставлено на основе кода ошибки
  • statusCode: Код ошибки 400, 404, 500, etc...
  • data: Произвольные данные, будут доступны сервису front или при отключенной обработке, например при подключении по websocket
@expose.handler()
async def error(**params):
    return response.error('Some F*cking error')

data

response.data(data, statusCode=200) или просто response(data)

  • statusCode: Код ответа: 200, 204, etc.
  • data: Произвольные данные, которые будут отданы сервисом front. Можно отдавать строки.
@expose.handler()
async def get_state(**params):
    return response.data(state)

# short version
@expose.handler()
async def long_method(**params):
    await asyncio.sleep(15)
    return response(params)

# or without response
@expose.handler()
async def alias(**params):
    return {'message': 'you are catch me!'}
Work in progress

band.expose

Анонсирование методов и предоставление доступа к ним. Анонсирование производится в сервис front, где обрабатывается жизненный цикл входящих запросов подробне. Изначально все методы являются внутренними, но к ним может быть открыт доступ по RPC и HTTP (доступен извне только в особых случаях).

expose / предоставление доступа к методу

@expose()

Сделать метод доступным другим сервисам, но при этом не анонсировать. Обращаться к этому методу можно будет по имени и названию сервиса. Сервис front ничего не узнает о нем и не будет прокидвать запросы снаружи.

from band import expose

@expose()
async def main(**params):
    print(params)

expose.enricher / регистрация обогатителем

expose.enricher(props: dict, keys: list)

Arguments:

  • props: список интересующих параметеров входящих сообщений
  • keys: список ключей маршрутизации которые будут обрабатываться. В данный момент устройство таково, что приходить будут все сообщения, но указывать необходимо, чтобы сервис front знал, кто ему должен ответить на ключ.

Метод будет зарегистрирован в front service.

Метод обязательно должен называться enrich.

from band import expose

@expose.enricher(props={'ip': 'td.ip', 'ua': 'td.ua', 'uid': 'uid'}, ['in.gen.track']})
async def enrich(ip, ua, uid, **params):
    print(ip, ua, uid, params)
    return {'param1': 'value1'}

Возвращенное значение будет добавлено к входящему запросу. Ключом будет имя сервиса. После выполнения всех enrichers запрос будет передан в handler, а затем всем listener-ам.

В следующем примере фигурирует результат 4 обогатителей: uaparser, mmgeo, sxgeo, fpid.

{ 
  "key": "in.gen.track.page", 
  "service": "track", 
  "name": "page",
  //....
  "uid": "6450101900745375744", 
  "data": { 
    "service": "track", 
    "name": "page", 
    "uid": "6450101900745375744", 
    "page": { "title": "ODL Shop", "ref": "https://app.test.rstat.org/logs", ... }, 
    "sess": { "pageNum": 1, "eventNum": 0, "type": "referral", "marks": {}, ... }, 
    //...
    "uaparser": { "os_family": "Mac OS X", "os_version": [ 10, 13, 6 ], ... }, 
    "mmgeo": { "country_en": "Russia", "country_ru": "Россия", ... }, 
    "sxgeo": { "country_en": "United Kingdom", "country_ru": "Великобритания", ... },
    "fpid": "14290956575723105017",
    "id": "6462522520963645440", 
  }

expose.handler / регистрация обработчиком

expose.handler(name=None, path=None, alias=None, timeout=None)

Arguments:

  • name: уточнить имя под которым будет доступен метод. Возможна wildscard регистрация. Делается это путем установки name='*'.
  • path: уточнить http путь, можно использовать параметр, например /mymethod/:id. По http доступны только системные сервисы.
  • alias: псевдомним сервиса. С его помощью можно зарегистрироваться и получать запросы под другим именем.
  • timeout: сколько времени front service будет ждать ответа от этого метода. По умолчанию 200ms (1/5 секунды).

Метод будет зарегистрирован в front service. Запросы с совпадающим именем сервиса будут направлены на него. Возвращаемое значение будет возвращено наружу.

from band import expose

@expose.handler()
async def my_method(**params):
    pass

с wildcard

from band import expose

# method accessible on https://your.domain/go/*any_key*
@expose.handler(alias='go', name='*')
async def wildcard(name, **params):
    url = state.urls.get(name, None)
    return response.redirect(url) if url else response.error('Key not found')

expose.listener / регистрация слушателем

expose.listener()

Метод будет зарегистрирован в front service. Попадут все сообщения на финальной стадии обработки. Метод обязательно должен называться broadcast. Возвращаемое значение не учитывается.

from band import expose

@expose.listener()
async def broadcast(**params):
    pass

band.worker / Startup handler decorator

Mark function as startup handler, which will be executed when application starts. Otherside can be user for periodicaly operations like flushes, counters, etc.

@worker()

Метод будет запущен при старте приложения и будет синхронно выполдняться до тех пор, пока не завершится. Чтобы метод работал посстоянно используйте бесконечный цикл. Крайне желательно ставить паузу между циклами.

from band import worker

@worker()
async def do_job(**params):
    while True:
        state.counter += 1
        await asyncio.sleep(10)

Продвинутый пример

from itertools import count
from band import expose

@worker()
async def service_worker():
    for num in count():
        # Обрабатываем ошибки, избегая падения
        try:
            # Перовый проход цикла для инициализации
            if num == 0:
                state.loaded = True
                # ...
            # Код выполняющийся каждый цикл
            state.loop = num
            # ...
        except asyncio.CancelledError:
            # Обработка остановки
            break
        except Exception:
            # Другие исключения
            logger.exception('my service exeption')
        # Ждем 5 секунд перед следующим проходом
        await asyncio.sleep(5)

band.cleanup / Shotdown handler decorator

Mark function as shutdown handler which will be executed on stop.

@cleanup()

Метод будет заапущен при остановке сервиса. Никаких тяжелых операций выполнять нельзя. На выполнение дается 1/10 секунды, после чего метод будет насильно завершен.

from band import cleanup

@cleanup()
async def service_cleanup():
    # Операции, которые будут выполнены при остановке сервиса
    state.loaded = False

band.blocking - Decorator that wrap function in thread executor

Function blocking_sync_code will be executed in separated thread when you call them and your async code running in main executor thead will no block.

from band import blocking

@blocking()
def blocking_sync_code(param1, param2):
    executing_long_time_code()

@expose()
async def yor_fast_async_func():
    await blocking_sync_code(10, 30)

Remember! Thread execution should not access data outside executed function! All data must be passed as function arguments!

Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second. More information [https://docs.python.org/3/library/asyncio-dev.html#running-blocking-code](Running Blocking Code)

band.rpc / взимодействие с другими сервисами

await rpc.request(service, name, param1='value1', param2='value2')

Work in progress

band.scheduler / отделение выполнение задач

await scheduler.spawn(coro)

Work in progress

band.logger / логи

from band import logger

uid = '12312321312'

logger.debug('message', data={'user': uid})

другие методы

logger.debug('message')
logger.info('message')
logger.warn('message')
logger.error('message')
logger.exception('message') # будет показан stack trace

Для поддержки логов с параметрами используются structlog и python-json-logger. Формат логов зависит от значения переменной JSON_LOGS.

band.settings / настройки сервиса

Справочник содержащий данные из config.yml, при обработке конфига поставляются переменные откружения, как систеемные, так и пользовательсткие.

# config.yml
pool:
  - 74994031705
  - 74994040139
  - 74994040921
  - 74994033095
  - 74994040239
extra:
  - 74994041156
# Подстановка переменной окружения API_KEY
api_key: "{{API_KEY}}"

В коде сервиса

# main.py
from band import settings

print(settings.extra)

['74994041156']

band.redis_factory / Redis client factory

from band import redis_factory
Work in progress