こんにちは。
今日は、airflowと戯れていたら、なんかバグを踏んだか何かをしたので、それについて書きたいと思います。
やりたいこと
KubernetesPodOperatorにdag_run.confをenv_varsのパラメーター経由で渡して、そのenv_varsに、実行日を渡して、どの日の実行を行いたいかを環境変数経由でdockerコンテナの方に伝えたいことが今回やりたいことです。
ここで、実行日をUIからTrigger Dagをしたときに渡すために、 dag_run.conf
経由で渡したいとします。デフォルトでは実行した時刻です。
ハマったこと
airflow2.0を今Cloud Composer経由で使っているのですが、airflowのバージョン1と変わったようで、うまくtemplateが動いてくれませんでした。具体的には、最初以下のようなコードを書いていました。
kubernetes_pod_operator.KubernetesPodOperator( task_id='hogehoge_task_id', name='hogehoge_name', cmds=['run'], env_vars={'ENV': env, 'EXECUTE_DATE': '{{ dag_run.conf["ymd"] if dag_run.conf["ymd"] else ds }}'}, secrets=[secret_volume], namespace='default', startup_timeout_seconds=60 * 30, is_delete_operator_pod=True, image_pull_policy='Always', image='hogehoge')
templated(jinjaのテンプレートを読み取って処理してくれる)なパラメーターなはずなのに何故か、これが文字列として渡されていて、日付データじゃないよ、みたいなエラーが出てしまっていました。なんでだろうと思って調査してたのですが、意外にハマってしまったので、備忘録を残そうと思った次第です。
調査したこと
直接的な原因はわからなかったのですが、KubernetesPodOperatorのクラスを見ると、env_varsを変換している部分のコードが以下のようにあり、
その変換の関数が、以下にあることがわかりました。
具体的には、以下のコードなのですが、ここを見ると、dictの時は k8s.V1EnvVar
ってやつにしていることがわかります。
def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]: """ Converts a dictionary into a list of env_vars :param env_vars: :return: """ if isinstance(env_vars, dict): res = [] for k, v in env_vars.items(): res.append(k8s.V1EnvVar(name=k, value=v)) return res elif isinstance(env_vars, list): return env_vars else: raise AirflowException(f"Expected dict or list, got {type(env_vars)}")
dictで渡すと、jinjaのテンプレートがうまく変換されずに、そのままk8s.V1EnvVar
の方に変換されてしまっている様子でした。なので、list形式に最終的にしてあげる必要があるのですが、そこでk8s.V1EnvVar
を使うと、それを文字列とみなしてしまうためダメでした。
仕方ないので、V1EnvVarの定義を見て、それの形になるように自分で渡してあげると、うまくいきました。
最終的なコード
ということで、以下のような感じで渡すとうまくいきました。
kubernetes_pod_operator.KubernetesPodOperator( task_id='hogehoge_task_id', name='hogehoge_name', cmds=['run'], env_vars=[ {'name': 'ENV', 'value':'hogehoge'}, {'name': 'EXECUTE_DATE', 'value': '{{ dag_run.conf["ymd"] if dag_run.conf["ymd"] else ds }}'} ], secrets=[secret_volume], namespace='default', startup_timeout_seconds=60 * 30, is_delete_operator_pod=True, image_pull_policy='Always', image='hogehoge' )
んー、謎。。。