Оркестрация задач
Под оркестрацией понимается управление запусками по расписанию, отслеживанием зависимостей и мониторинг выполнения задач.
Так как dbt Core - это только инструмент трансформации данных и у него отсутствует функциональность автоматического запуска по расписанию, то для таких целей необходимо использовать дополнительные внешние системы или инструменты.
В отличие от open-source версии (dbt Core) возможность запуска по расписанию есть в платной версии (dbt Cloud).
Основные варианты оркестрации
Можно выделить следующие основные варианты оркестрации задач dbt Core:
использование классического планировщика задач cron,
применение «современных» инструментов-оркестраторов.
Классический планировщик cron
Cron - это стандартный планировщик задач в Unix/Linux, который позволяет выполнять команды по расписанию. Выражение cron состоит из пяти полей, разделённых пробелами, а также bash-команды. Каждое поле отвечает за определенный временной интервал.
┌───────────── минута (0 - 59) │ ┌───────────── час (0 - 23) │ │ ┌───────────── день месяца (1 - 31) │ │ │ ┌───────────── месяц (1 - 12) │ │ │ │ ┌───────────── день недели (0 - 6, 0=воскресенье) │ │ │ │ │ │ │ │ │ │ * * * * * команда_для_выполнения
Звёздочка * означает «любое значение» (то есть «каждый»). Например, регулярный запуск dbt-задачи с помощью cron может выглядеть следующим образом:
# Запускать каждый день в 06:00 0 6 * * * cd /opt/dbt/project && dbt run
Конечно cron обладает некоторыми преимуществами (простой - необходимы минимальные настройки, надежный - проверен многими поколениями разработчиков, легкий - не требует дополнительных сервисов и т.д.), но в то же время имеет ряд недостатков (отсутствие визуального интерфейса, встроенных уведомлений, зависимостей между выполнением задач и др.). Поэтому для оркестрации dbt-задач все-таки лучше использовать какой-то из специализированных инструментов.
«Современные» инструменты-оркестраторы
В настоящее время существует несколько инструментов, которые можно внедрить в проект для оркестрации задач dbt™. Среди них можно выделить, например, Apache Airflow, Dagster, Prefect и многие другие.
Каждый из этих инструментов обладает определенными преимуществами, которые можно использовать, исходя из целей и потребностей проекта. Тем не менее в индустрии сложился некий негласный «стандарт» для оркестрации задач dbt Core через взаимодействие с Apache Airflow. Поэтому на интеграции именно с этим инструментом мы остановимся.
Оркестрация с помощью Airflow
Использование Astronomer Cosmos
Ранее было отмечено, что использование связки «Airflow + dbt Core» является «стандартом». Если быть более точным, то в техническом решении этого «стандарта» нужно дополнительно выделить применение библиотеки с открытым кодом Astronomer Cosmos. Она позволяет автоматически преобразовывать модели dbt Core в DAG Airflow, обеспечивая наглядное представление зависимостей dbt-моделей, планирование и оркестрацию задач, а также мониторинг их выполнения.
У данного технического решения можно отметить ряд преимуществ, среди которых основными, на мой взгляд, являются следующие:
запуск dbt-проекта внутри Airflow без необходимости написания кода вручную;
автоматическое создание задач Airflow для каждой dbt-модели.
Другими словами, Astronomer Cosmos помогает перенести фокус внимания разработчиков на проработку и создание dbt-моделей, а не на техническую реализацию их запуска в Airflow.
В сложных и продуктивных решениях необходимо использовать вариант интеграции с Astronomer Cosmos. Но так как в данном руководстве представлен учебный проект, то рассмотрим нативный подход интеграции dbt Core и Airflow через выполнение bash-команд для лучшего понимания жизненного цикла задач и небольшого погружения в Airflow.
Выстраивание процесса через bash-команды
Применение bash/shell-команд можно назвать минималистичным подходом интеграции dbt Core и Airflow.
Как было отмечено ранее, в руководстве не рассматриваются процессы установки сопутствующего программного обеспечения, кроме dbt Core. Выберите для себя подходящий вариант установки Airflow на официальном сайте документации.
После успешной установки оркестратора откройте папку с Airflow (определяется переменной окружения $AIRFLOW_HOME) в среде разработки и создайте папку dags.
Вкратце рассмотрим основные объекты корневой папки:
dags- место хранения DAG-файлов (пайплайнов);airflow.cfg- настроечный (конфигурационный) файл Airflow;airflow.db- база данных для Airflow (по умолчанию SQLite);logs- папка с логами выполнения задач;simple_auth_manager_passwords.json.generated- автоматически генерируемый файл паролей для встроенной системы аутентификации Airflow (используйте только для разработки и тестирования).
Создайте в папке dags/ файл dbt_pipeline_carsharing.py и постепенно добавьте представленный далее код.
Для написания Airflow DAG воспользуемся TaskFlow API, который упрощает написание кода рабочих процессов, делает его короче и легче в обслуживании. Суть в том, что вы пишете функции Python, декорируете их, а Airflow создает задачи, подключает зависимости и передачу данных между задачами.
Для начала импортируйте необходимые пакеты и классы:
from airflow import DAG from airflow.decorators import task from airflow.providers.standard.operators.empty import EmptyOperator from datetime import datetime
Немного поясню каждую из строчек кода.
Самая первая строчка from airflow import DAG - импорт основного класса из библиотеки Apache Airflow, без которого просто невозможно создать рабочий процесс (в Airflow).
Следующая строка from airflow.decorators import task импортирует функцию-декоратор, которая переводит обычные функции Python в задачи (Tasks) Airflow.
Строка from airflow.providers.standard.operators.empty import EmptyOperator импортирует оператор-«пустышку» (или «заглушку»), который не выполняет никаких действий и операций. Чаще всего он используется для визуального структурирования рабочих процессов. Например, в данном проекте он будет использоваться для обозначения начала или окончания пайплайна.
from datetime import datetime импортирует класс стандартного Python модуля для работы со временем. Как правило, используется для определения даты, с которой DAG должен начать работу.
Добавьте следующий блок кода:
with DAG( dag_id='dbt_pipeline_carsharing', description='dbt пайплайн для проекта carsharing', schedule='@daily', start_date=datetime(2026, 2, 17), catchup=False, tags=['dbt', 'carsharing'], doc_md=""" # Пайплайн dbt для Carsharing ## Этапы рабочего процесса: - Начало рабочего процесса - Установка зависимостей - Запуск моделей - Запуск тестов - Формирование документации - Окончание рабочего процесса """ ) as dag:
with DAG() as dag: является основной конструкцией для создания и настройки графа. Хотя у класса DAG может быть множество аргументов, строго обязательным является только один dag_id. Тем не менее для корректной работы в планировщике Airflow нужно указать три аргумента - dag_id, schedule, start_date.
Но пройдемся по всем добавленным в код аргументам:
dag_id(обязательный) - уникальное имя рабочего процесса, которое используется для идентификации в базе данных и веб-интерфейсе Airflow;description(необязательный) - краткий описание рабочего процесса;schedule(условно обязательный) - интервал автоматического запуска, определяющий расписание. По умолчанию имеет значениеNoneдля ручных запусков, поэтому может быть не указан. Но для автоматических запусков необходим. Значение указывается в виде cron-выражений (например,'0 0 * * *') или пресетов (например,'@daily');start_date(обязательный) - дата (и время), с которой DAG должен начать выполнение. Это. параметр необходим планировщику Airflow для определения момента первого запуска.catchup(необязательный, рекомендуемый) - «коварный» параметр DAG, который определяет, должен ли Airflow запустить все прошлые экземпляры процесса с моментаstart_dateдо текущего момента. По умолчанию имеет значениеTrue. Почему «коварный»? Поясню на небольшом примере. Представьте, что сегодня 17 февраля 2026 года и вы планировали выполнять ежедневные запуски с этой даты (то есть должны были указать такие настройки:start_date=datetime(2026, 2, 17)иschedule='@daily'). Но по каким-то причинам ошиблись в значении года и получилосьstart_date=datetime(2025, 2, 17). При этомcatchupникак не переопределили. Стало быть, его значение осталосьTrue. Airflow увидит, что между сегодняшней датой и значениемstart_dateуже должно было пройти 365 запусков, и немедленно начнет планировать и запускать 365 экземпляров один за другим (или одновременно, если позволяют ресурсы). Естественно, это приведет к негативным последствиям. Поэтому рекомендую добавлятьcatchup=False.tags(необязательный) - список меток, которые помогают классифицировать и быстро находить в веб-интерфейсе Airflow нужные процессы;doc_md(необязательный) - параметр, который помогает встроить полноценную документацию прямо в веб-интерфейс Airflow. В отличие отdescription, представляющий собой небольшую заметку в одну строку,doc_mdпозволяет написать подробную инструкцию с использованием Markdown (заголовки, списки, ссылки и т.д.).
Как видно из небольшой документации (в параметре doc_md) рабочий процесс Airflow будет состоять из нескольких последовательных этапов:
Начало рабочего процесса.
Установка зависимостей.
Запуск моделей.
Запуск тестов.
Формирование документации.
Окончание рабочего процесса.
Исходя из понимания этапов процесса, перейдем к написанию задач. Добавьте следующий блок кода (с отступом):
# Начало рабочего процесса start = EmptyOperator(task_id='start')
Ранее уже было отмечено, что EmptyOperator ничего не делает и в проекте используется для визуализации начала пайплайна.
Теперь настало время автоматизации dbt-задач с помощью декоратора @task.bash, который превращает обычную Python-функцию в задачу Airflow, выполняющую bash-команды. @task.bash является, скажем так, более современной альтернативой классическому оператору BashOperator предыдущих версий Airflow. При выполнении функции возвращается строка с командой.
Добавьте в файл dbt_pipeline_carsharing.py dbt-команды по каждому из этапов.
Укажите вместо /opt/dbt/carsharing путь к проекту на вашем компьютере.
# Установка зависимостей @task.bash def install_dbt_deps() -> str: """ Установка зависимостей dbt """ return """ echo "Установка зависимостей dbt..." /opt/dbt/carsharing && \ dbt deps echo "Зависимости установлены" """ # Запуск моделей @task.bash def run_dbt_models() -> str: """ Запуск всех моделей dbt """ return """ echo "Запуск моделей dbt..." cd /opt/dbt/carsharing && \ dbt run --full-refresh echo "Модели обновлены" """ # Запуск тестов @task.bash def run_dbt_tests() -> str: """ Запуск тестов для моделей """ return """ echo "Запуск тестов dbt..." cd /opt/dbt/carsharing && \ dbt test echo "Тесты пройдены" """ # Генерация документации @task.bash def generate_docs() -> str: """ Формирование документации dbt """ return """ echo "Формирование документации..." cd /opt/dbt/carsharing && \ dbt docs generate echo "Документация создана" """
Для визуального окончания пайплайна добавьте этап «Окончание рабочего процесса»:
# Окончание рабочего процесса end = EmptyOperator(task_id='end')
В завершение укажите порядок выполнения задач:
# Определение последовательности задач ( start >> install_dbt_deps() >> run_dbt_models() >> run_dbt_tests() >> generate_docs() >> end )
Финальный файл dbt_pipeline_carsharing.py имеет следующий вид:
from airflow import DAG from airflow.decorators import task from airflow.providers.standard.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id='dbt_pipeline_carsharing', description='dbt пайплайн для проекта carsharing', schedule='@daily', start_date=datetime(2026, 2, 17), catchup=False, tags=['dbt', 'carsharing'], doc_md=""" # Пайплайн dbt для Carsharing ## Этапы рабочего процесса: - Начало рабочего процесса - Установка зависимостей - Запуск моделей - Запуск тестов - Формирование документации - Окончание рабочего процесса """ ) as dag: # Начало рабочего процесса start = EmptyOperator(task_id='start') # Установка зависимостей @task.bash def install_dbt_deps() -> str: """ Установка зависимостей dbt """ return """ echo "Установка зависимостей dbt..." /opt/dbt/carsharing && \ dbt deps echo "Зависимости установлены" """ # Запуск моделей @task.bash def run_dbt_models() -> str: """ Запуск всех моделей dbt """ return """ echo "Запуск моделей dbt..." cd /opt/dbt/carsharing && \ dbt run --full-refresh echo "Модели обновлены" """ # Запуск тестов @task.bash def run_dbt_tests() -> str: """ Запуск тестов для моделей """ return """ echo "Запуск тестов dbt..." cd /opt/dbt/carsharing && \ dbt test echo "Тесты пройдены" """ # Генерация документации @task.bash def generate_docs() -> str: """ Формирование документации dbt """ return """ echo "Формирование документации..." cd /opt/dbt/carsharing && \ dbt docs generate echo "Документация сформирована" """ # Окончание рабочего процесса end = EmptyOperator(task_id='end') # Определение последовательности задач ( start >> install_dbt_deps() >> run_dbt_models() >> run_dbt_tests() >> generate_docs() >> end )
Осталось дело за малым - посмотреть на результат.
Запустите Airflow и перейдите в Airflow UI (http://localhost:8080/). В разделе Dags найдите пайплайн проекта и откройте его:
1
2
3
Процесс содержит задачи в определенной последовательности из файла dbt_pipeline_carsharing.py.
Так выглядит успешный результат выполнения рабочего процесса:
Но прошу обратить внимание, что это учебный проект. Он достаточно простой и ломаться практически нечему. Но даже в такой реализации при возникновении ошибок появятся определенные трудности в отладке и поддержке этого решения.
Если возникнет ошибка при запуске хотя бы одной из моделей, то весь процесс свалится в ошибку. Потребуется время, чтобы выяснить место и причину ошибки. Конечно можно прописать в DAG более детально этапы процесса (например, отдельные этапы формирования каждой сущности или же каждой модели проекта). Это позволит более оперативно находить место ошибки, устранить ее и перезапустить задачу. Но такой подход оркестрации достаточно трудоемкий. Поэтому рассмотрите его просто в учебных и ознакомительных целях. В продуктивных решениях используйте вариант с Astronomer Cosmos.
Что же, основную функциональность dbt Core вы усвоили, также получили представление об оркестрации dbt-задач для автоматизации преобразования данных в хранилище. Логичным шагом будет добавление визуализации имеющихся данных. Этим мы и займемся в следующем разделе руководства.