こんにちは。
最近、Kedroと言う機械学習向けのパイプライン構築用のツールを使ってみたので、それについてまとめます。
Kedroとは?
概要
Kedro は QuantumBlack というデータ分析企業 が公開している、プロダクションレディなデータ分析用ワークフロー構築ツールです。結構いろんなパイプラインツールがありますし、全部を触ったことがあるわけではないですが、今のところ*1Kedroはすごくすごく良い感じです。
ツールの紹介動画は以下*2。
公式周りのサイトのURLは以下の通り。
特徴
めちゃめちゃたくさんの特徴がありますが、以下あたりはすごく便利です。
- Airflowなどと同じく、Pythonで全てのワークフローを書くことができる
- DAG形式でパイプラインを定義でき、 Sequentialな実行とParallelな実行の切り替えや、パイプラインの途中から実行するなどできる
- yamlで定義することができるデータカタログの機能があり、csv, pickle, feather, parquet, DB上のテーブル など様々なデータ形式に対応することができる
- データセットや学習モデルをバージョン管理し、指定のバージョンでいつでも実行できるよう再現性を担保する
- Cookiecutter によるテンプレートを利用することで、複数人での作業を管理できる
- モデルのパラメーターをyamlで管理することができる
- Jupyter Notebook, Jupyter Lab とのインテグレーション
- 本番環境への移行がしやすく、複数の環境(AWS, GCPのようなマネージドサービスなど)にデプロイすることができる。具体的にはPythonのパッケージとしてであったり、kubeflowやArgo Workflows、AWS Batchにデプロイなどもできる
- リッチなパイプラインの可視化がKedro vizでできる
とりあえずやりたいことは一通りできそうな雰囲気を感じ取っていただけると思います笑
Kedroの大まかな構成要素
Kedroは、大まかには次の4つから構成されています。
- Node
- Pipeline
- DataCatalog
- Runner
Node
- 実行される処理の単位で、前処理や学習といった処理本体になるもの
- 入力、出力のデータセットと、それを処理するロジックを定義して、パイプラインに組み込む
Pipeline
- Nodeの依存関係や実行順序を管理するもの。
- decorator機能がkedroにはあり、これによって、パイプライン全体の処理に対して機能を付加することもできる
DataCatalog
- パイプラインで使用するデータを定義するカタログ
- データセット名、形式、ファイルパス、ロードやセーブ時のオプションなどを指定することが可能
Runner
- パイプラインを実行するもの。パラメーターを指定して実行することができ、例えば特定のパイプラインだけ実行するとかもできる。
- SequentialRunner、ParallelRunnerの二つのRunnerがある。
Install
普通にpipとかcondaでインストールできます。
# pipとかcondaでインストールできます $ pip install kedro $ conda install -c conda-forge kedro # installされたかの確認。kedroっていう文字が出てきたら成功 $ kedro info _ _ | | _____ __| |_ __ ___ | |/ / _ \/ _` | '__/ _ \ | < __/ (_| | | | (_) | |_|\_\___|\__,_|_| \___/ v0.16.6 kedro allows teams to create analytics projects. It is developed as part of the Kedro initiative at QuantumBlack. No plugins installed
新規のプロジェクトを作成する
kedroのデフォルトのテンプレートを使う場合は、以下のコマンドでやります。いくつか質問が出てきますが、それぞれ読んで答えればオッケー。
$ kedro new
テンプレートをオンにすると、以下のような感じでフォルダが構成されます。
$ tree . . ├── README.md ├── conf │ ├── README.md │ ├── base │ │ ├── catalog.yml │ │ ├── credentials.yml │ │ ├── logging.yml │ │ └── parameters.yml │ └── local ├── data │ ├── 01_raw │ │ └── iris.csv │ ├── 02_intermediate │ ├── 03_primary │ ├── 04_feature │ ├── 05_model_input │ ├── 06_models │ ├── 07_model_output │ └── 08_reporting ├── docs │ └── source │ ├── conf.py │ └── index.rst ├── kedro_cli.py ├── logs │ └── journals ├── notebooks ├── setup.cfg └── src ├── requirements.txt ├── sample │ ├── __init__.py │ ├── hooks.py │ ├── pipelines │ │ ├── __init__.py │ │ ├── data_engineering │ │ │ ├── README.md │ │ │ ├── __init__.py │ │ │ ├── nodes.py │ │ │ └── pipeline.py │ │ └── data_science │ │ ├── README.md │ │ ├── __init__.py │ │ ├── nodes.py │ │ └── pipeline.py │ └── run.py ├── setup.py └── tests ├── __init__.py ├── pipelines │ └── __init__.py └── test_run.py
プロジェクトをgitの管理下にするには、以下のような感じでやります。
$ git init $ git add ./ $ git commit -m "init" $ git branch -M main $ git remote add origin <hogehoge> $ git push origin main
フォルダ構成からわかると思いますが、どこに何を書くかが明確で、サンプルのコードを追いかけるだけで、大体何すればいいかわかります笑*3
とりあえず動かしてみる
kedro new
をしたときに、irisのデータセットでサンプルのモデルを動かすためのパイプラインやノード、データカタログが用意されているので、とりあえずこいつをローカルで動かしてみたいと思います。
$ cd <プロジェクトルート> # まずはプロジェクトの依存関係をインストール $ kedro install # 実行 $ kedro run
これだけです。簡単ですね。実行すると、logのフォルダにログが吐き出されるのがわかります。
次からは、処理を実際に追加していくのをどうするか見ていきます。
データソースを追加する(Data Catalogを追加する)
Nodeなどでデータを実際に使うために、データカタログにデータソースを追加します。conf/base/catalog.yml
というファイルに記述していきます。
csvとかであれば、以下のような感じで追加できます。他にも、xlsxやparquet, sqlTableなど様々なデータソースに対応できます。
companies: type: pandas.CSVDataSet filepath: data/01_raw/companies.csv reviews: type: pandas.CSVDataSet filepath: data/01_raw/reviews.csv
data
のディレクトリに、それぞれのデータの状態に合わせてフォルダにデータを入れます。ここら辺も人によってフォルダの分け方が別れることがほとんどですが、事前に定義がされているのでやりやすいです。
処理を追加する(Nodeを編集する)
nodeの処理は単純で、普通に関数を追加するだけです(完)
試しに、テンプレートで出てくるnodes.pyの処理を見て見ます。
from typing import Any, Dict import pandas as pd def split_data(data: pd.DataFrame, example_test_data_ratio: float) -> Dict[str, Any]: """Node for splitting the classical Iris data set into training and test sets, each split into features and labels. The split ratio parameter is taken from conf/project/parameters.yml. The data and the parameters will be loaded and provided to your function automatically when the pipeline is executed and it is time to run this node. """ data.columns = [ "sepal_length", "sepal_width", "petal_length", "petal_width", "target", ] classes = sorted(data["target"].unique()) # One-hot encoding for the target variable data = pd.get_dummies(data, columns=["target"], prefix="", prefix_sep="") # Shuffle all the data data = data.sample(frac=1).reset_index(drop=True) # Split to training and testing data n = data.shape[0] n_test = int(n * example_test_data_ratio) training_data = data.iloc[n_test:, :].reset_index(drop=True) test_data = data.iloc[:n_test, :].reset_index(drop=True) # Split the data to features and labels train_data_x = training_data.loc[:, "sepal_length":"petal_width"] train_data_y = training_data[classes] test_data_x = test_data.loc[:, "sepal_length":"petal_width"] test_data_y = test_data[classes] # When returning many variables, it is a good practice to give them names: return dict( train_x=train_data_x, train_y=train_data_y, test_x=test_data_x, test_y=test_data_y, )
この後出てくるパイプラインとデータの入出力を合わせる必要がありますが、それ以外は普通の関数の処理であることがわかると思います。書く場所をある程度縛るだけで、ここら辺に特にルールがないのは助かりますね。
パイプラインを編集する
パイプラインの処理で編集するのは二箇所で、pipeline.py
とhooks.py
の二つです。
pipeline.py
まずpipeline.py
ですが、自分で実装したnodeの関数を、kedro.pipeline.node
を使って、パイプラインに組み込みます。組み込むときには、第一引数に関数、第二引数に入力で渡すデータ、第三引数に出力のデータを渡します。
from kedro.pipeline import Pipeline, node from .nodes import split_data def create_pipeline(**kwargs): return Pipeline( [ node( split_data, ["example_iris_data", "params:example_test_data_ratio"], dict( train_x="example_train_x", train_y="example_train_y", test_x="example_test_x", test_y="example_test_y", ), ) ] )
これだけです。
hooks.py
pipeline.py
で作ったパイプラインを実行できるようにします。また、パイプライン間に依存関係がある場合がほとんどだと思いますので、その処理も書きます。
from typing import Any, Dict, Iterable, Optional from kedro.config import ConfigLoader from kedro.framework.hooks import hook_impl from kedro.io import DataCatalog from kedro.pipeline import Pipeline from kedro.versioning import Journal from ab_recommender.pipelines import data_engineering as de from ab_recommender.pipelines import data_science as ds class ProjectHooks: @hook_impl def register_pipelines(self) -> Dict[str, Pipeline]: """Register the project's pipeline. Returns: A mapping from a pipeline name to a ``Pipeline`` object. """ data_engineering_pipeline = de.create_pipeline() data_science_pipeline = ds.create_pipeline() return { "de": data_engineering_pipeline, "ds": data_science_pipeline, "__default__": data_engineering_pipeline + data_science_pipeline, } @hook_impl def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader: return ConfigLoader(conf_paths) @hook_impl def register_catalog( self, catalog: Optional[Dict[str, Dict[str, Any]]], credentials: Dict[str, Dict[str, Any]], load_versions: Dict[str, str], save_version: str, journal: Journal, ) -> DataCatalog: return DataCatalog.from_config( catalog, credentials, load_versions, save_version, journal ) project_hooks = ProjectHooks()
これだけで、実行順序の依存関係を記述することができます。
まとめ
以上がざっくりとしたkedroの使い方です。まだ試せていないのですが、kedroのフレームに則って書いておけば、AWS Batchやkubeflowなどにデプロイすることができるようになるなど、他にも良さげな感じの物が多いので、使ってみたいと思います。
それでは!