St_Hakky’s blog

Data Science / Human Resources / Web Applicationについて書きます

【Python】Apache Beamを使ってデータパイプラインを実装してみる

こんにちは。

先日、Apache Beamに関する概要の記事を書きました。

www.st-hakky-blog.com

今回は、Apache Beam SDKのうち、Pythonを使って、実際にデータパイプラインを実装してみようと思います*1

f:id:St_Hakky:20200429173622p:plain

今回のお題

今回は、Apache Beamの公式のドキュメントをベースにやっていこうと思います

Runnerとしては、今回はとりあえず動かして、プログラミングの仕方の雰囲気をつかむことを主眼にするため、ローカル環境を使っていきます。

Apache Beam SDKのインストール

普通にpipでインストールすることができます。RunnerとしてDataflowも使用する場合は、それも合わせて追加します*2

$ pip install apache-beam
$ pip install apache-beam[gcp]

ちなみに、私はzshを使っているのですが、gcpの方をインストールしようとしたら、以下のようなエラーが出ました

zsh: no matches found: apache-beam[gcp]

以下のような感じで対応することができました。

$ pip install 'apache-beam[gcp]'

Getting Started

まずは、ドキュメントにあった簡単なサンプルを動かしてみたいと思います。

全体感

コードは以下のような感じになっています。

import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

それぞれ解説していきます。

Create Pipeline

パイプラインをインスタンス化します。本当は、Optionなどを指定する感じなのですが、それは一旦ここでは省いています。

with beam.Pipeline() as pipeline:

パイプライン自体は、「|」で繋いで、PTransformをそれぞれのPCollectionsに対して適用していきます。形式としては、以下のような形で、指定することができます。

[Output PCollection] = [Input PCollection] | [Transform]

複数個のPTransformsを適用するには、「|」で複数個繋げてあげれば大丈夫です。

I/O Transforms

実際のInputとなるファイルと出力先をI/O Transformsを使って指定しています。これにより、データのInputからデータを受け取り、処理結果を出力することができます。

・・・略・・・

      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)

・・・略・・・

      | 'Write results' >> beam.io.WriteToText(outputs_prefix)

・・・略・・・

ここで使われている以外にも、用意されているI/O Transoformsはたくさんあり、以下のページで見ることができます。

Transforms

以下の部分が、Transformの箇所になります。Apache Beamのメソッドを使って、処理をPCollectionに対して行うことができます。

・・・略・・・
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
・・・略・・・


Map, CombinePerKeyなどは、Apache Beamで用意されているメソッドで、CombinePerKeyは、キーごとにGroupbyして、それごとに引数で適用した値を生成します。このように、各Transformで処理をいくつか行なった後に、最終的な出力を得ます。

これについての詳しい解説は、Apache BeamのProgramming GuideのTransformの処理をみてもらえればと思います*3

実行する

今回のPythonコードは、 main.pyとして保存し、このプログラムと同じファイルをinputとしました。

実行前

実行前のフォルダ構成は以下の通り。

$ tree                          
.
├── data
│   └── sample.txt
├── main.py
└── outputs
実行後

新しいファイルが生成されています。

$ tree          
.
├── data
│   └── sample.txt
├── main.py
└── outputs
    └── part-00000-of-00001

中身を見ると、ちゃんと文字の出現回数がカウントされています。

('import', 2)
('apache', 1)
('beam', 9)
('as', 2)
('re', 2)
('inputs', 2)
('pattern', 2)
("'data", 1)
("'", 2)
・・・略・・・

それでは

*1:本当は、Scalaベースのやつを使うのでそれの方がいいのですが、Scalaまだ慣れず笑

*2:他のRunnerも同じく指定する

*3:手抜きじゃないよ