Оркестрация задач

Под оркестрацией понимается управление запусками по расписанию, отслеживанием зависимостей и мониторинг выполнения задач.

Так как dbt Core - это только инструмент трансформации данных и у него отсутствует функциональность автоматического запуска по расписанию, то для таких целей необходимо использовать дополнительные внешние системы или инструменты.

Примечание

В отличие от open-source версии (dbt Core) возможность запуска по расписанию есть в платной версии (dbt Cloud).

Основные варианты оркестрации

Можно выделить следующие основные варианты оркестрации задач dbt Core:

  • использование классического планировщика задач cron,

  • применение «современных» инструментов-оркестраторов.

Классический планировщик cron

Cron - это стандартный планировщик задач в Unix/Linux, который позволяет выполнять команды по расписанию. Выражение cron состоит из пяти полей, разделённых пробелами, а также bash-команды. Каждое поле отвечает за определенный временной интервал.

┌───────────── минута (0 - 59) │ ┌───────────── час (0 - 23) │ │ ┌───────────── день месяца (1 - 31) │ │ │ ┌───────────── месяц (1 - 12) │ │ │ │ ┌───────────── день недели (0 - 6, 0=воскресенье) │ │ │ │ │ │ │ │ │ │ * * * * * команда_для_выполнения

Звёздочка * означает «любое значение» (то есть «каждый»). Например, регулярный запуск dbt-задачи с помощью cron может выглядеть следующим образом:

# Запускать каждый день в 06:00 0 6 * * * cd /opt/dbt/project && dbt run

Конечно cron обладает некоторыми преимуществами (простой - необходимы минимальные настройки, надежный - проверен многими поколениями разработчиков, легкий - не требует дополнительных сервисов и т.д.), но в то же время имеет ряд недостатков (отсутствие визуального интерфейса, встроенных уведомлений, зависимостей между выполнением задач и др.). Поэтому для оркестрации dbt-задач все-таки лучше использовать какой-то из специализированных инструментов.

«Современные» инструменты-оркестраторы

В настоящее время существует несколько инструментов, которые можно внедрить в проект для оркестрации задач dbt™. Среди них можно выделить, например, Apache Airflow, Dagster, Prefect и многие другие.

Каждый из этих инструментов обладает определенными преимуществами, которые можно использовать, исходя из целей и потребностей проекта. Тем не менее в индустрии сложился некий негласный «стандарт» для оркестрации задач dbt Core через взаимодействие с Apache Airflow. Поэтому на интеграции именно с этим инструментом мы остановимся.

Оркестрация с помощью Airflow

Использование Astronomer Cosmos

Ранее было отмечено, что использование связки «Airflow + dbt Core» является «стандартом». Если быть более точным, то в техническом решении этого «стандарта» нужно дополнительно выделить применение библиотеки с открытым кодом Astronomer Cosmos. Она позволяет автоматически преобразовывать модели dbt Core в DAG Airflow, обеспечивая наглядное представление зависимостей dbt-моделей, планирование и оркестрацию задач, а также мониторинг их выполнения.

У данного технического решения можно отметить ряд преимуществ, среди которых основными, на мой взгляд, являются следующие:

  • запуск dbt-проекта внутри Airflow без необходимости написания кода вручную;

  • автоматическое создание задач Airflow для каждой dbt-модели.

Другими словами, Astronomer Cosmos помогает перенести фокус внимания разработчиков на проработку и создание dbt-моделей, а не на техническую реализацию их запуска в Airflow.

В сложных и продуктивных решениях необходимо использовать вариант интеграции с Astronomer Cosmos. Но так как в данном руководстве представлен учебный проект, то рассмотрим нативный подход интеграции dbt Core и Airflow через выполнение bash-команд для лучшего понимания жизненного цикла задач и небольшого погружения в Airflow.

Выстраивание процесса через bash-команды

Применение bash/shell-команд можно назвать минималистичным подходом интеграции dbt Core и Airflow.

Примечание

Как было отмечено ранее, в руководстве не рассматриваются процессы установки сопутствующего программного обеспечения, кроме dbt Core. Выберите для себя подходящий вариант установки Airflow на официальном сайте документации.

После успешной установки оркестратора откройте папку с Airflow (определяется переменной окружения $AIRFLOW_HOME) в среде разработки и создайте папку dags.

Рисунок 76. Структура домашнего каталога Apache Airflow

Вкратце рассмотрим основные объекты корневой папки:

  • dags - место хранения DAG-файлов (пайплайнов);

  • airflow.cfg - настроечный (конфигурационный) файл Airflow;

  • airflow.db - база данных для Airflow (по умолчанию SQLite);

  • logs - папка с логами выполнения задач;

  • simple_auth_manager_passwords.json.generated - автоматически генерируемый файл паролей для встроенной системы аутентификации Airflow (используйте только для разработки и тестирования).

Создайте в папке dags/ файл dbt_pipeline_carsharing.py и постепенно добавьте представленный далее код.

Примечание

Для написания Airflow DAG воспользуемся TaskFlow API, который упрощает написание кода рабочих процессов, делает его короче и легче в обслуживании. Суть в том, что вы пишете функции Python, декорируете их, а Airflow создает задачи, подключает зависимости и передачу данных между задачами.

Для начала импортируйте необходимые пакеты и классы:

from airflow import DAG from airflow.decorators import task from airflow.providers.standard.operators.empty import EmptyOperator from datetime import datetime

Немного поясню каждую из строчек кода.

Самая первая строчка from airflow import DAG - импорт основного класса из библиотеки Apache Airflow, без которого просто невозможно создать рабочий процесс (в Airflow).

Следующая строка from airflow.decorators import task импортирует функцию-декоратор, которая переводит обычные функции Python в задачи (Tasks) Airflow.

Строка from airflow.providers.standard.operators.empty import EmptyOperator импортирует оператор-«пустышку» (или «заглушку»), который не выполняет никаких действий и операций. Чаще всего он используется для визуального структурирования рабочих процессов. Например, в данном проекте он будет использоваться для обозначения начала или окончания пайплайна.

from datetime import datetime импортирует класс стандартного Python модуля для работы со временем. Как правило, используется для определения даты, с которой DAG должен начать работу.

Добавьте следующий блок кода:

with DAG( dag_id='dbt_pipeline_carsharing', description='dbt пайплайн для проекта carsharing', schedule='@daily', start_date=datetime(2026, 2, 17), catchup=False, tags=['dbt', 'carsharing'], doc_md=""" # Пайплайн dbt для Carsharing ## Этапы рабочего процесса: - Начало рабочего процесса - Установка зависимостей - Запуск моделей - Запуск тестов - Формирование документации - Окончание рабочего процесса """ ) as dag:

with DAG() as dag: является основной конструкцией для создания и настройки графа. Хотя у класса DAG может быть множество аргументов, строго обязательным является только один dag_id. Тем не менее для корректной работы в планировщике Airflow нужно указать три аргумента - dag_id, schedule, start_date.

Но пройдемся по всем добавленным в код аргументам:

  • dag_id (обязательный) - уникальное имя рабочего процесса, которое используется для идентификации в базе данных и веб-интерфейсе Airflow;

  • description (необязательный) - краткий описание рабочего процесса;

  • schedule (условно обязательный) - интервал автоматического запуска, определяющий расписание. По умолчанию имеет значение None для ручных запусков, поэтому может быть не указан. Но для автоматических запусков необходим. Значение указывается в виде cron-выражений (например, '0 0 * * *') или пресетов (например, '@daily');

  • start_date (обязательный) - дата (и время), с которой DAG должен начать выполнение. Это. параметр необходим планировщику Airflow для определения момента первого запуска.

  • catchup (необязательный, рекомендуемый) - «коварный» параметр DAG, который определяет, должен ли Airflow запустить все прошлые экземпляры процесса с момента start_date до текущего момента. По умолчанию имеет значение True. Почему «коварный»? Поясню на небольшом примере. Представьте, что сегодня 17 февраля 2026 года и вы планировали выполнять ежедневные запуски с этой даты (то есть должны были указать такие настройки: start_date=datetime(2026, 2, 17) и schedule='@daily'). Но по каким-то причинам ошиблись в значении года и получилось start_date=datetime(2025, 2, 17). При этом catchup никак не переопределили. Стало быть, его значение осталось True. Airflow увидит, что между сегодняшней датой и значением start_date уже должно было пройти 365 запусков, и немедленно начнет планировать и запускать 365 экземпляров один за другим (или одновременно, если позволяют ресурсы). Естественно, это приведет к негативным последствиям. Поэтому рекомендую добавлять catchup=False.

  • tags (необязательный) - список меток, которые помогают классифицировать и быстро находить в веб-интерфейсе Airflow нужные процессы;

  • doc_md (необязательный) - параметр, который помогает встроить полноценную документацию прямо в веб-интерфейс Airflow. В отличие от description, представляющий собой небольшую заметку в одну строку, doc_md позволяет написать подробную инструкцию с использованием Markdown (заголовки, списки, ссылки и т.д.).

Как видно из небольшой документации (в параметре doc_md) рабочий процесс Airflow будет состоять из нескольких последовательных этапов:

  1. Начало рабочего процесса.

  2. Установка зависимостей.

  3. Запуск моделей.

  4. Запуск тестов.

  5. Формирование документации.

  6. Окончание рабочего процесса.

Исходя из понимания этапов процесса, перейдем к написанию задач. Добавьте следующий блок кода (с отступом):

# Начало рабочего процесса start = EmptyOperator(task_id='start')

Ранее уже было отмечено, что EmptyOperator ничего не делает и в проекте используется для визуализации начала пайплайна.

Теперь настало время автоматизации dbt-задач с помощью декоратора @task.bash, который превращает обычную Python-функцию в задачу Airflow, выполняющую bash-команды. @task.bash является, скажем так, более современной альтернативой классическому оператору BashOperator предыдущих версий Airflow. При выполнении функции возвращается строка с командой.

Добавьте в файл dbt_pipeline_carsharing.py dbt-команды по каждому из этапов.

Примечание

Укажите вместо /opt/dbt/carsharing путь к проекту на вашем компьютере.

# Установка зависимостей @task.bash def install_dbt_deps() -> str: """ Установка зависимостей dbt """ return """ echo "Установка зависимостей dbt..." /opt/dbt/carsharing && \ dbt deps echo "Зависимости установлены" """ # Запуск моделей @task.bash def run_dbt_models() -> str: """ Запуск всех моделей dbt """ return """ echo "Запуск моделей dbt..." cd /opt/dbt/carsharing && \ dbt run --full-refresh echo "Модели обновлены" """ # Запуск тестов @task.bash def run_dbt_tests() -> str: """ Запуск тестов для моделей """ return """ echo "Запуск тестов dbt..." cd /opt/dbt/carsharing && \ dbt test echo "Тесты пройдены" """ # Генерация документации @task.bash def generate_docs() -> str: """ Формирование документации dbt """ return """ echo "Формирование документации..." cd /opt/dbt/carsharing && \ dbt docs generate echo "Документация создана" """

Для визуального окончания пайплайна добавьте этап «Окончание рабочего процесса»:

# Окончание рабочего процесса end = EmptyOperator(task_id='end')

В завершение укажите порядок выполнения задач:

# Определение последовательности задач ( start >> install_dbt_deps() >> run_dbt_models() >> run_dbt_tests() >> generate_docs() >> end )

Финальный файл dbt_pipeline_carsharing.py имеет следующий вид:

from airflow import DAG from airflow.decorators import task from airflow.providers.standard.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id='dbt_pipeline_carsharing', description='dbt пайплайн для проекта carsharing', schedule='@daily', start_date=datetime(2026, 2, 17), catchup=False, tags=['dbt', 'carsharing'], doc_md=""" # Пайплайн dbt для Carsharing ## Этапы рабочего процесса: - Начало рабочего процесса - Установка зависимостей - Запуск моделей - Запуск тестов - Формирование документации - Окончание рабочего процесса """ ) as dag: # Начало рабочего процесса start = EmptyOperator(task_id='start') # Установка зависимостей @task.bash def install_dbt_deps() -> str: """ Установка зависимостей dbt """ return """ echo "Установка зависимостей dbt..." /opt/dbt/carsharing && \ dbt deps echo "Зависимости установлены" """ # Запуск моделей @task.bash def run_dbt_models() -> str: """ Запуск всех моделей dbt """ return """ echo "Запуск моделей dbt..." cd /opt/dbt/carsharing && \ dbt run --full-refresh echo "Модели обновлены" """ # Запуск тестов @task.bash def run_dbt_tests() -> str: """ Запуск тестов для моделей """ return """ echo "Запуск тестов dbt..." cd /opt/dbt/carsharing && \ dbt test echo "Тесты пройдены" """ # Генерация документации @task.bash def generate_docs() -> str: """ Формирование документации dbt """ return """ echo "Формирование документации..." cd /opt/dbt/carsharing && \ dbt docs generate echo "Документация сформирована" """ # Окончание рабочего процесса end = EmptyOperator(task_id='end') # Определение последовательности задач ( start >> install_dbt_deps() >> run_dbt_models() >> run_dbt_tests() >> generate_docs() >> end )

Осталось дело за малым - посмотреть на результат.

Запустите Airflow и перейдите в Airflow UI (http://localhost:8080/). В разделе Dags найдите пайплайн проекта и откройте его:

1

2

3

Рисунок 77. dbt-пайплайн проекта в Airflow UI

Процесс содержит задачи в определенной последовательности из файла dbt_pipeline_carsharing.py.

Рисунок 78. Задачи dbt-проекта

Так выглядит успешный результат выполнения рабочего процесса:

Рисунок 79. Успешный запуск dbt-проекта с помощью Airflow

Но прошу обратить внимание, что это учебный проект. Он достаточно простой и ломаться практически нечему. Но даже в такой реализации при возникновении ошибок появятся определенные трудности в отладке и поддержке этого решения.

Если возникнет ошибка при запуске хотя бы одной из моделей, то весь процесс свалится в ошибку. Потребуется время, чтобы выяснить место и причину ошибки. Конечно можно прописать в DAG более детально этапы процесса (например, отдельные этапы формирования каждой сущности или же каждой модели проекта). Это позволит более оперативно находить место ошибки, устранить ее и перезапустить задачу. Но такой подход оркестрации достаточно трудоемкий. Поэтому рассмотрите его просто в учебных и ознакомительных целях. В продуктивных решениях используйте вариант с Astronomer Cosmos.

Что же, основную функциональность dbt Core вы усвоили, также получили представление об оркестрации dbt-задач для автоматизации преобразования данных в хранилище. Логичным шагом будет добавление визуализации имеющихся данных. Этим мы и займемся в следующем разделе руководства.