Введение в Metaflow — Netflix Python / R Framework для Data Science

Spread the love

Тратьте больше времени на моделирование и меньше времени на управление инфраструктурой. Практическое руководство о том как это сделать.

Недавно в AWS re: Invent компания Netflix открыла одну из своих собственных систем для создания и управления проектами в области данных — Metaflow.

В течение последних двух лет она быстро внедряется в их внутренние команды по Data Science, что позволяет многим проектам ускорить свою работу.

Metaflow adoption at Netflix

Что такое Metaflow?

Metaflow является фреймворком для создания и выполнения рабочих процессов в области Data Science и оснащен встроенными функциями для:

  •      управления вычислительными ресурсами,
  •      выполнения запуска контейнера,
  •      управления внешними зависимостями,
  •      версионности, воспроизведение и возобновление рабочего процесса,
  •      Клиентского API для проверки прошлых запусков, подходящих для скриптов,
  •      перемещения вперед и назад между локальным (например, на ноутбуке) и удаленным (в облаке) режимами выполнения

 

Metaflow автоматически создает снимки кода, данных и зависимостей в хранилище данных с адресным содержимым, которое обычно поддерживается S3, хотя также поддерживается локальная файловая система. Это позволяет вам возобновить рабочие процессы, воспроизвести прошлые результаты и проверить что-либо о рабочем процессе, например, в тетради.

— vtuulo, Ycombinator

По сути, он направлен на повышение производительности data scientists, позволяя им сосредоточиться на фактической работе над данными и содействия ускорению производства своих результатов.

Actual Data Science vs Infrastructure Concerns

Ежедневные сценарии, которые предоставляет Metaflow:

  • Сотрудничество. Вы хотите помочь другому специалисту по обработке данных отладить ошибку. Вы хотите, чтобы вы могли проверить его состояние неудачного запуска на вашем ноутбуке как есть.
  • Возобновление прогона: прогон не удался (или был намеренно остановлен). Вы исправили ошибку в своем коде. Вы хотели бы перезапустить рабочий процесс с того места, где он потерпел неудачу / остановился.
  • Гибридные запуски: вы хотите запустить один шаг вашего рабочего процесса локально (возможно, шаг загрузки данных, поскольку набор данных находится в папке загрузок), но хотите запустить еще один интенсивный для вычислений шаг (обучение модели) в облаке.
  • Проверка метаданных прогона: три исследователя данных настраивали гиперпараметры, чтобы повысить точность одной и той же модели. Теперь вы хотите проанализировать все их тренировочные заезды и выбрать наиболее эффективный набор гиперпараметров.
  • Несколько версий одного и того же пакета. Вы хотели бы использовать несколько версий библиотеки sklearn в своем проекте — 0,20 для этапов предварительной обработки и 0,22 для моделирования.

Итак, как выглядит типичный мета поток?

Концептуально, рабочие процессы метапотока по своей природе представляют собой DAG (Направленные ациклические графы), которые проще всего описать с помощью изображений ниже. Каждый узел на графике представляет этап обработки в рабочем процессе.

A linear acyclic graph
A branched acyclic graph

Простыми словами, Metaflow выполняет код Python на каждом шаге рабочего процесса в отдельных контейнерах, упакованных с соответствующими зависимостями.

Этот ключевой аспект в архитектуре Metaflow позволяет вводить практически любые внешние библиотеки из экосистемы conda в Metaflow без использования плагинов. Это также отличает Metaflow от других решений общего назначения, таких как Airflow.

Скрипты

Каждый поток может быть записан как стандартный класс Python, если он удовлетворяет следующим минимальным условиям:

  •      Унаследован от класса Metaflow FlowSpec.
  •      Каждая функция, представляющая шаг, помечается декоратором @step.
  •      Каждая пошаговая функция должна заканчиваться указанием на свою последующую пошаговую функцию. Вы можете сделать это с помощью self.next(self.function_name_here)
  •      Реализует начальную и конечную функцию.

Ниже приведен пример минимального потока из трех узлов:

start → process_message → end

from metaflow import FlowSpec, step

class LinearFlow(FlowSpec):
      
    """
    A flow to verify you can run a basic metaflow flow.
    """
    
    # Global initializations here
    
    @step
    def start(self):
        self.message = 'Thanks for reading.'
        self.next(self.process_message)

    @step
    def process_message(self):
        print('the message is: %s' % self.message)
        self.next(self.end)

    @step
    def end(self):
        print('the message is still: %s' % self.message)

if __name__ == '__main__':
    LinearFlow()

Инструкция по настройке

Установка и запуск

  •      Установите Metaflow (рекомендуется Python3): pip3 install metaflow
  •      Поместите пример кода из фрагмента 1 выше в файл linear_flow.py
  •      Чтобы увидеть дизайн этого потока: python3 linear_flow.py show
  •      Чтобы запустить поток: python3 linear_flow.py run
A sample flow run

Metaflow создает локальное хранилище данных .metaflow, в котором хранятся все метаданные запуска и моментальные снимки данных, связанные с ними. Если у вас установлены облачные настройки, моментальные снимки данных будут существовать в AWS S3 Bucket, а метаданные запуска будут загружены в службу метаданных, основанную на RDS (Relational Data Store). Позже мы увидим, как можем легко проверить эти метаданные запуска с помощью клиентского API. Еще одна тонкая, но важная вещь, на которую стоит обратить внимание, это то, что pid (идентификатор процесса, прикрепленный к каждому шагу, различен. Это возвращает меня к более раннему моменту, когда Metaflow контейнеризует каждый шаг потока отдельно и запускает их в своей среде (только для передачи данных), назад и вперед.

Настройка Conda (если вы планируете вводить зависимости)

Загрузите и установите Miniconda (если вы этого еще не сделали)

Добавить канал conda: conda config —add channels conda-forge

Теперь вы можете добавить зависимости conda в свои рабочие процессы. Я покажу подробности ниже.

Реализация более реалистичного рабочего процесса

По мере выполнения этой задачи, новые концепции Metaflow вводятся и объясняются, где это применимо.

Задание

В этом потоке мы реализуем рабочий процесс, который

  •      Загружает CSV в фрейм Pandas
  •      Параллельно вычисляет статистику квартилей для различных жанров
  •      Сохраняет словарь жанровой статистики.

Потоки

Ниже приведен базовый класс, который поможет вам увидеть общий ход вещей.

from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter

class GenreStatsFlow(FlowSpec):
  """
    A flow to generate some statistics about the movie genres.
    The flow performs the following steps:
    1) Ingests a CSV into a Pandas Dataframe.
    2) Compute quartiles for each genre in parallel
    3) Save a dictionary of genre specific statistics.
  """
  
  @step
  def start(self):
    """
        The start step:
        1) Loads the movie metadata into pandas dataframe.
        2) Finds all the unique genres.
        3) Launches parallel statistics computation for each genre.
    """
    
    # TODO: Loading the CSV and getting unique genres
    self.genres = []
    self.next(self.compute_statistics, foreach='genres')
    
  @catch(var='compute_failed')
  @retry(times=1)
  @step
  def compute_statistics(self):
    """Compute statistics for a single genre. Run in cloud"""
    self.genre = self.input
    # TODO: Computing statistics for a genre
    self.next(self.join)
    
  @step
  def join(self, inputs):
    """Join our parallel branches and merge results into a dictionary."""
    # TODO: Joining the results
    self.next(self.end)
    
  @step
  def end(self):
      """End the flow."""
      pass
    
if __name__ == '__main__':
  GenreStatsFlow()

Несколько слов о классе:

  •     В строке 24 на начальном этапе обратите внимание на параметр foreach? foreach выполняет параллельные копии шагов compute_statistics внутри цикла для каждого цикла для каждой записи в списке жанров.
  •     В строке 26 декоратор @catch (var = ‘compute_failed’) перехватит любое исключение, произошедшее на шаге compute_statistics, и назначит его переменной compute_failed (которая может быть прочитана его преемником)
  •     В строке 27 декоратор @retry (times = 1) делает то, что он подразумевает — повторяет шаг в случае возникновения каких-либо ошибок.
  •     В строке 31 в compute_statistics, откуда волшебным образом появляется self.input? input — это переменная класса, предоставляемая Metaflow, которая содержит данные, применимые для этого экземпляра compute_statistics (когда параллельно выполняется несколько копий этой функции?). Metaflow добавляется только тогда, когда узел разветвляется на несколько параллельных процессов, или когда несколько узлов объединены в один.

В этом примере задачи показаны только несколько параллельных запусков одной и той же функции compute_statistics. Но для любопытных можно параллельно запускать совершенно разные и не связанные между собой функции. Для этого вы можете изменить строку 24 на self.next (self.func1, self.function2, self.function3). Конечно, вам придется обновить шаг соединения соответственно для обработки этого случая.

Вот визуальное представление класса

Visual Flow

Чтение в файле данных и пользовательские аргументы

  •      Загрузите этот CSV-файл с данными фильма, подготовленными Metaflow.
  •      Теперь мы хотим поддерживать динамическую передачу пути к файлу movie_data и значения max_genres в наш рабочий процесс в качестве внешних аргументов. Metaflow позволяет передавать аргументы, добавляя дополнительные флаги в команду запуска. Например, python3 tutorial_flow.py run —movie_data = path/to/movies.csv —max_genres = 5
  •      Для чтения таких пользовательских входов в рабочем процессе Metaflow предоставляет объекты IncludeFile и Parameter. Мы получаем доступ к переданным аргументам, назначая объект IncludeFile или Parameter переменной класса, в зависимости от того, читаем ли мы файл или обычное значение.
Reading in custom parameters passed through CLI

Инжектирование Conda в поток

Выполните шаги, описанные в разделе настройки Conda выше.
Добавьте декоратор @conda_base, предоставленный Metaflow, в класс Flow. Он ожидает, что будет передана версия Python, которая может быть жестко закодирована или предоставлена с помощью функции, как это делается ниже.

Injecting Conda to the Flow

Теперь вы можете добавить декоратор @conda к любому этапу вашего процесса. Он ожидает объект с зависимостями, переданный через параметр библиотеки. Metaflow позаботится о подготовке контейнеров с этими зависимостями перед выполнением шага. Можно использовать разные версии пакета на разных этапах, поскольку Metaflow запускает каждый шаг в отдельных контейнерах.

Новая команда python3 tutorial_flow.py —environment=conda run

Implementation of `start` step

Обратите внимание, как оператор импорта pandas существует внутри функции шага? Это потому, что он вводится conda только в рамках этого шага.
Однако определенные здесь переменные (датафреймы и жанры) доступны даже по шагам, которые выполняются после этого. Это связано с тем, что Metaflow работает по принципу изоляции среды выполнения, но обеспечивает естественный поток данных.

Implementation of `compute_statistics` step

Обратите внимание, что на этом этапе осуществляется доступ к переменной dataframe, определенной на предыдущем этапе запуска, и ее изменение. Переходя к следующим шагам, этот новый измененный фрейм данных будет эффективным.

Implementation of `join` step

На этом этапе мы используем совершенно другую версию библиотеки Pandas. Каждый индекс в массиве input представляет копию compute_statistics, которая была запущена до этого. Он содержит состояние этого прогона, то есть значения различных переменных. Таким образом, входные [0] квартили могут содержать квартили для жанра комедии, тогда как входные [1] квартили могут содержать квартили для жанра научной фантастики.

Финальный код

Чтобы увидеть дизайн потока:
python3 tutorial_flow.py —environment=conda show

Запуск:
python3 tutorial_flow.py —environment=conda run —movie_data=path/to/movies.csv —max_genres=7

Проверка выполняется через клиентский API

Вы можете использовать клиентский API, предоставляемый Metaflow, для проверки данных и снимков состояния ваших прошлых запусков. Он идеально подходит для изучения подробностей ваших исторических прогонов на ноутбуке.

Ниже приведен простой фрагмент, где я печатаю переменную genre_stats из последнего успешного запуска GenreStatsFlow.

from metaflow import Flow, get_metadata

# Print metadata provider
print("Using metadata provider: %s" % get_metadata())

# Load the analysis from the MovieStatsFlow.
run = Flow('GenreStatsFlow').latest_successful_run
print("Using analysis from '%s'" % str(run))

genre_stats = run.data.genre_stats
print(genre_stats)

Заупуск в облаке

После  прототипирования в вашей локальной системе, скорее всего, вы захотите использовать облачные ресурсы для ускорения процесса.

В настоящее время Metaflow поддерживает только интеграцию с AWS и ниже приводится параллельное сравнение того, какие сервисы AWS использует Metaflow.

Integration between Metaflow and AWS

 

  • Сначала вам нужно выполнить однократную настройку в AWS, чтобы создать ресурсы для запуска Metaflow. Ваша команда может использовать одни и те же ресурсы, если вы хотите делиться и получать доступ друг к другу. Следуйте инструкциям на этой странице. Это должно быть довольно быстро, так как Metaflow предоставляет шаблон Cloudformation для установки.
  • Затем запустите metaflow configure aws в вашей локальной системе и предоставьте запрашиваемую информацию. Это необходимо для того, чтобы Metaflow мог использовать хранилище данных и метастазов, которые вы только что установили в AWS.
  • Теперь для запуска любых рабочих процессов в локальной системе в облаке все, что вам нужно сделать, это добавить —with batch к вашей команде запуска. python3 sample_flow.py run —with batch
  • Для гибридного запуска, то есть для запуска нескольких шагов локально и нескольких шагов в облаке — добавьте декоратор @batch к шагам в вашем рабочем процессе, которые вы хотите запустить в облаке. Например, @batch(cpu=1, memory=500)

Заключение

Несмотря на преимущества, есть и недостаток в тесной интеграции с AWS, хотя в дорожной карте есть планы по поддержке большего количества облачных провайдеров.
Metaflow полностью основан на CLI и не поставляется с графическим интерфейсом (в отличие от других сред рабочего процесса общего назначения, таких как Airflow).

Добавить комментарий