St_Hakky’s blog

Data Science / Human Resources / Web Applicationについて書きます

【Airflow】KubernetesPodOperatorにdag_run.confを渡したい

こんにちは。

今日は、airflowと戯れていたら、なんかバグを踏んだか何かをしたので、それについて書きたいと思います。

やりたいこと

KubernetesPodOperatorにdag_run.confをenv_varsのパラメーター経由で渡して、そのenv_varsに、実行日を渡して、どの日の実行を行いたいかを環境変数経由でdockerコンテナの方に伝えたいことが今回やりたいことです。

ここで、実行日をUIからTrigger Dagをしたときに渡すために、 dag_run.conf経由で渡したいとします。デフォルトでは実行した時刻です。

ハマったこと

airflow2.0を今Cloud Composer経由で使っているのですが、airflowのバージョン1と変わったようで、うまくtemplateが動いてくれませんでした。具体的には、最初以下のようなコードを書いていました。

kubernetes_pod_operator.KubernetesPodOperator(
        task_id='hogehoge_task_id',
        name='hogehoge_name',
        cmds=['run'],
        env_vars={'ENV': env, 'EXECUTE_DATE': '{{ dag_run.conf["ymd"] if dag_run.conf["ymd"] else ds }}'},
        secrets=[secret_volume],
        namespace='default',
        startup_timeout_seconds=60 * 30,
        is_delete_operator_pod=True,
        image_pull_policy='Always',
        image='hogehoge')

templated(jinjaのテンプレートを読み取って処理してくれる)なパラメーターなはずなのに何故か、これが文字列として渡されていて、日付データじゃないよ、みたいなエラーが出てしまっていました。なんでだろうと思って調査してたのですが、意外にハマってしまったので、備忘録を残そうと思った次第です。

調査したこと

直接的な原因はわからなかったのですが、KubernetesPodOperatorのクラスを見ると、env_varsを変換している部分のコードが以下のようにあり、

https://github.com/apache/airflow/blob/c5e91418033c209089539ab29e6f8423f42fe1a2/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L236

その変換の関数が、以下にあることがわかりました。

https://github.com/apache/airflow/blob/c5e91418033c209089539ab29e6f8423f42fe1a2/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py#L92-L107

具体的には、以下のコードなのですが、ここを見ると、dictの時は k8s.V1EnvVarってやつにしていることがわかります。

def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]:
    """
    Converts a dictionary into a list of env_vars
    :param env_vars:
    :return:
    """
    if isinstance(env_vars, dict):
        res = []
        for k, v in env_vars.items():
            res.append(k8s.V1EnvVar(name=k, value=v))
        return res
    elif isinstance(env_vars, list):
        return env_vars
    else:
        raise AirflowException(f"Expected dict or list, got {type(env_vars)}")


dictで渡すと、jinjaのテンプレートがうまく変換されずに、そのままk8s.V1EnvVarの方に変換されてしまっている様子でした。なので、list形式に最終的にしてあげる必要があるのですが、そこでk8s.V1EnvVarを使うと、それを文字列とみなしてしまうためダメでした。

仕方ないので、V1EnvVarの定義を見て、それの形になるように自分で渡してあげると、うまくいきました。

最終的なコード

ということで、以下のような感じで渡すとうまくいきました。

kubernetes_pod_operator.KubernetesPodOperator(
        task_id='hogehoge_task_id',
        name='hogehoge_name',
        cmds=['run'],
        env_vars=[
            {'name': 'ENV', 'value':'hogehoge'},
            {'name': 'EXECUTE_DATE', 'value': '{{ dag_run.conf["ymd"] if dag_run.conf["ymd"] else ds }}'}
        ],
        secrets=[secret_volume],
        namespace='default',
        startup_timeout_seconds=60 * 30,
        is_delete_operator_pod=True,
        image_pull_policy='Always',
        image='hogehoge'
)


んー、謎。。。