こんにちは。
最近、Kedroと言う機械学習向けのパイプライン構築用のツールを使ってみたので、それについてまとめます。
Kedroとは?
概要
Kedro は QuantumBlack というデータ分析企業 が公開している、プロダクションレディなデータ分析用ワークフロー構築ツールです。結構いろんなパイプラインツールがありますし、全部を触ったことがあるわけではないですが、今のところ*1 Kedroはすごくすごく良い感じです。
ツールの紹介動画は以下*2 。
VIDEO youtu.be
公式周りのサイトのURLは以下の通り。
特徴
めちゃめちゃたくさんの特徴がありますが、以下あたりはすごく便利です。
Airflowなどと同じく、Pythonで全てのワークフローを書くことができる
DAG形式でパイプラインを定義でき、 Sequentialな実行とParallelな実行の切り替えや、パイプラインの途中から実行するなどできる
yamlで定義することができるデータカタログの機能があり、csv, pickle, feather, parquet, DB上のテーブル など様々なデータ形式に対応することができる
データセットや学習モデルをバージョン管理し、指定のバージョンでいつでも実行できるよう再現性を担保する
Cookiecutter によるテンプレートを利用することで、複数人での作業を管理できる
モデルのパラメーターをyamlで管理することができる
Jupyter Notebook, Jupyter Lab とのインテグレーション
本番環境への移行がしやすく、複数の環境(AWS, GCPのようなマネージドサービスなど)にデプロイすることができる。具体的にはPythonのパッケージとしてであったり、kubeflowやArgo Workflows、AWS Batchにデプロイなどもできる
リッチなパイプラインの可視化がKedro vizでできる
とりあえずやりたいことは一通りできそうな雰囲気を感じ取っていただけると思います笑
Kedroの大まかな構成要素
Kedroは、大まかには次の4つから構成されています。
Node
Pipeline
DataCatalog
Runner
Node
実行される処理の単位で、前処理や学習といった処理本体になるもの
入力、出力のデータセットと、それを処理するロジックを定義して、パイプラインに組み込む
Pipeline
Nodeの依存関係や実行順序を管理するもの。
decorator機能がkedroにはあり、これによって、パイプライン全体の処理に対して機能を付加することもできる
DataCatalog
パイプラインで使用するデータを定義するカタログ
データセット名、形式、ファイルパス、ロードやセーブ時のオプションなどを指定することが可能
Runner
パイプラインを実行するもの。パラメーターを指定して実行することができ、例えば特定のパイプラインだけ実行するとかもできる。
SequentialRunner、ParallelRunnerの二つのRunnerがある。
Install
普通にpipとかcondaでインストールできます。
# pipとかcondaでインストールできます
$ pip install kedro
$ conda install -c conda-forge kedro
# installされたかの確認。kedroっていう文字が出てきたら成功
$ kedro info
_ _
| | _____ __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
| < __/ (_| | | | (_) |
|_|\_\___|\__,_|_| \___/
v0.16.6
kedro allows teams to create analytics
projects. It is developed as part of
the Kedro initiative at QuantumBlack.
No plugins installed
新規のプロジェクトを作成する
kedroのデフォルトのテンプレートを使う場合は、以下のコマンドでやります。いくつか質問が出てきますが、それぞれ読んで答えればオッケー。
$ kedro new テンプレートをオンにすると、以下のような感じでフォルダが構成されます。
$ tree .
.
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── credentials.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
├── data
│ ├── 01_raw
│ │ └── iris.csv
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── kedro_cli.py
├── logs
│ └── journals
├── notebooks
├── setup.cfg
└── src
├── requirements.txt
├── sample
│ ├── __init__.py
│ ├── hooks.py
│ ├── pipelines
│ │ ├── __init__.py
│ │ ├── data_engineering
│ │ │ ├── README.md
│ │ │ ├── __init__.py
│ │ │ ├── nodes.py
│ │ │ └── pipeline.py
│ │ └── data_science
│ │ ├── README.md
│ │ ├── __init__.py
│ │ ├── nodes.py
│ │ └── pipeline.py
│ └── run.py
├── setup.py
└── tests
├── __init__.py
├── pipelines
│ └── __init__.py
└── test_run.py
プロジェクトをgitの管理下にするには、以下のような感じでやります。
$ git init
$ git add ./
$ git commit -m "init"
$ git branch -M main
$ git remote add origin <hogehoge>
$ git push origin main フォルダ構成からわかると思いますが、どこに何を書くかが明確で、サンプルのコードを追いかけるだけで、大体何すればいいかわかります笑*3
とりあえず動かしてみる
kedro new
をしたときに、irisのデータセットでサンプルのモデルを動かすためのパイプラインやノード、データカタログが用意されているので、とりあえずこいつをローカルで動かしてみたいと思います。
$ cd <プロジェクトルート>
# まずはプロジェクトの依存関係をインストール
$ kedro install
# 実行
$ kedro run これだけです。簡単ですね。実行すると、logのフォルダにログが吐き出されるのがわかります。
次からは、処理を実際に追加していくのをどうするか見ていきます。
データソースを追加する(Data Catalogを追加する)
Nodeなどでデータを実際に使うために、データカタログにデータソースを追加します。conf/base/catalog.yml
というファイルに記述していきます。
csvとかであれば、以下のような感じで追加できます。他にも、xlsxやparquet, sqlTableなど様々なデータソースに対応できます。
companies :
type : pandas.CSVDataSet
filepath : data/01_raw/companies.csv
reviews :
type : pandas.CSVDataSet
filepath : data/01_raw/reviews.csv
data
のディレクトリに、それぞれのデータの状態に合わせてフォルダにデータを入れます。ここら辺も人によってフォルダの分け方が別れることがほとんどですが、事前に定義がされているのでやりやすいです。
処理を追加する(Nodeを編集する)
nodeの処理は単純で、普通に関数を追加するだけです(完)
試しに、テンプレートで出てくるnodes.pyの処理を見て見ます。
from typing import Any, Dict
import pandas as pd
def split_data (data: pd.DataFrame, example_test_data_ratio: float ) -> Dict[str , Any]:
"""Node for splitting the classical Iris data set into training and test
sets, each split into features and labels.
The split ratio parameter is taken from conf/project/parameters.yml.
The data and the parameters will be loaded and provided to your function
automatically when the pipeline is executed and it is time to run this node.
"""
data.columns = [
"sepal_length" ,
"sepal_width" ,
"petal_length" ,
"petal_width" ,
"target" ,
]
classes = sorted (data["target" ].unique())
data = pd.get_dummies(data, columns=["target" ], prefix="" , prefix_sep="" )
data = data.sample(frac=1 ).reset_index(drop=True )
n = data.shape[0 ]
n_test = int (n * example_test_data_ratio)
training_data = data.iloc[n_test:, :].reset_index(drop=True )
test_data = data.iloc[:n_test, :].reset_index(drop=True )
train_data_x = training_data.loc[:, "sepal_length" :"petal_width" ]
train_data_y = training_data[classes]
test_data_x = test_data.loc[:, "sepal_length" :"petal_width" ]
test_data_y = test_data[classes]
return dict (
train_x=train_data_x,
train_y=train_data_y,
test_x=test_data_x,
test_y=test_data_y,
)
この後出てくるパイプラインとデータの入出力を合わせる必要がありますが、それ以外は普通の関数の処理であることがわかると思います。書く場所をある程度縛るだけで、ここら辺に特にルールがないのは助かりますね。
パイプラインを編集する
パイプラインの処理で編集するのは二箇所で、pipeline.py
とhooks.py
の二つです。
pipeline.py
まずpipeline.py
ですが、自分で実装したnodeの関数を、kedro.pipeline.node
を使って、パイプラインに組み込みます。組み込むときには、第一引数に関数、第二引数に入力で渡すデータ、第三引数に出力のデータを渡します。
from kedro.pipeline import Pipeline, node
from .nodes import split_data
def create_pipeline (**kwargs):
return Pipeline(
[
node(
split_data,
["example_iris_data" , "params:example_test_data_ratio" ],
dict (
train_x="example_train_x" ,
train_y="example_train_y" ,
test_x="example_test_x" ,
test_y="example_test_y" ,
),
)
]
)
これだけです。
hooks.py
pipeline.py
で作ったパイプラインを実行できるようにします。また、パイプライン間に依存関係がある場合がほとんどだと思いますので、その処理も書きます。
from typing import Any, Dict, Iterable, Optional
from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning import Journal
from ab_recommender.pipelines import data_engineering as de
from ab_recommender.pipelines import data_science as ds
class ProjectHooks :
@ hook_impl
def register_pipelines (self) -> Dict[str , Pipeline]:
"""Register the project's pipeline.
Returns:
A mapping from a pipeline name to a ``Pipeline`` object.
"""
data_engineering_pipeline = de.create_pipeline()
data_science_pipeline = ds.create_pipeline()
return {
"de" : data_engineering_pipeline,
"ds" : data_science_pipeline,
"__default__" : data_engineering_pipeline + data_science_pipeline,
}
@ hook_impl
def register_config_loader (self, conf_paths: Iterable[str ]) -> ConfigLoader:
return ConfigLoader(conf_paths)
@ hook_impl
def register_catalog (
self,
catalog: Optional[Dict[str , Dict[str , Any]]],
credentials: Dict[str , Dict[str , Any]],
load_versions: Dict[str , str ],
save_version: str ,
journal: Journal,
) -> DataCatalog:
return DataCatalog.from_config(
catalog, credentials, load_versions, save_version, journal
)
project_hooks = ProjectHooks()
これだけで、実行順序の依存関係を記述することができます。
まとめ
以上がざっくりとしたkedroの使い方です。まだ試せていないのですが、kedroのフレームに則って書いておけば、AWS Batchやkubeflowなどにデプロイすることができるようになるなど、他にも良さげな感じの物が多いので、使ってみたいと思います。
それでは!