中野智文のブログ

データ・マエショリストのメモ

airflow と格闘中(10)

nakano-tomofumi.hatenablog.com

のつづき。

いろいろあって、現在の mac 上でいろいろ検証するよりも、本番マシンに近い、仮想マシンで色々やったほうが良さそう、ということになった。

vm上で install しようとするも、numpyコンパイルらしきものが始まる

え?

次のような記事が見つかる…

kounoike.github.io

とりあえず、上記のようにしてもうまくいかない。(仮想環境の違いはあるだろう。)

そこで、numpy をインストール。

 pip install numpy

これはこのVMの環境ではうまく行った。 分析するわけでもないのに、numpy をインストールするハメになるとは…

ただし、まだ libxml 関連のエラーが出ている。これは python 関係ではないからつらいな。。。

libxml2-dev libxslt1-dev

apt-get update インストールしてやっと解決。

 pip install airflow

の出力の最後は、

Successfully installed lxml pandas psutil python-nvd3 tabulate thrift zope.deprecation Mako python-editor wtforms python-slugify Unidecode

だった。

つづきは、 nakano-tomofumi.hatenablog.com

airflow と格闘中(9)

nakano-tomofumi.hatenablog.com

のつづき。

いやーもう本当に終わりにしたい。

前回のデッドロックの原因は、基本的には DAG ファイルの方のバグだった。しかしDAGファイルは最低でも dry run しとけ、ということだろう。 しかしデッドロックが表示されるとは分かりにくい。

バグも治ったので、trigger_dag に再挑戦

[2017-05-09 17:52:00,076] {models.py:3414} DagFileProcessor0 INFO - Creating ORM DAG for xxxxxx
/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_stats.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to
evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to ev
aluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)

これは一体…。

明日は別の環境で実験予定。仮想環境整っていないけど…

つづきは、

nakano-tomofumi.hatenablog.com

airflow と格闘中(8)

nakano-tomofumi.hatenablog.com

のつづき。

GWも明けたが、先週に引き続き、会社のノートPCを交換中。なので更新遅め。

backfill に戻したら、BackfillJob is deadlocked.

うーむ。。。 trigger にするために終了を確認するファイルを touch していたのだが、そこで発生しているらしい。該当のエラーログはこんな感じ。

[2017-05-08 15:46:39,338] {jobs.py:1935} WARNING - Deadlock discovered for tasks_to_run=[<TaskInstance: xxxxxx.yyyyyy 2017-05-06 00:00:00 [up_for_retry]>]

間違いが見つかった。これはもしかして、 trigger がうまく実行されなかった原因か…

つづきは、

nakano-tomofumi.hatenablog.com

airflow と格闘中(7)

nakano-tomofumi.hatenablog.com

のつづき。

結局 trigger_dag によって未だに成功はしていない。

trigger_dag はDAG(タスク)の終了までコマンドを待たない

さらに大事なことに気がついた。luigi の実行では、その実行コマンドがタスクが全て完了するまで待つことになるが、 airflow では、 trigger_dag はタスクが終了するまで待つものではない。よってmakefile の一部の処理を airflow で行うという使い方は triger_dag ではできない。 もしそのように部分的に使う場合には、backfill で行うか、airflowで処理する対象が処理の後半で最後まで完結している必要がある。

謎のエラー

いついつから heart beat がないよ、というエラーだが、

[2017-05-02 14:04:23,791] {models.py:328} DagFileProcessor8398 INFO - Failing jobs without heartbeat after 2017-05-02 13:59:23.791645
[2017-05-02 14:04:25,917] {models.py:328} DagFileProcessor8399 INFO - Failing jobs without heartbeat after 2017-05-02 13:59:25.916644
[2017-05-02 14:04:28,002] {models.py:328} DagFileProcessor8400 INFO - Failing jobs without heartbeat after 2017-05-02 13:59:28.002362

heart beat がないという時刻も現在時刻に合わせ徐々に増えつつあるので、この処理自体、kill_zombies という関数でよばれているのだが、一向に ‘kill’ される気配もない。

つづきは、

nakano-tomofumi.hatenablog.com

Airflow と格闘中(6)

皆様に置きましてはGW中かもしれませんが、小学生を持つ親は、小学校が休みでない以上、普通に出勤となります。

nakano-tomofumi.hatenablog.com

のつづき。

scheduler が正解。

というかドキュメントは最後まで読もう。scheduler を起動しておいて、 trigger_dag で実行。 これが makefile に近い。

次の記事なども読みノウハウ吸収。

hafizbadrie.com

scheduler の立ち上げ方

scheduler をオプションなしで起動すると、そのハートビートのためか、意味なく大量のログが画面に出力する。バックグランドで実行されているわけでもなさそうだし、適切なオプション例を調べてみる。

qiita.com

によると、-D オプションでデーモンとして起動させ、標準出力・エラーともログディレクトリに出力するのが普通らしい。 自分は、scheduler の log ディレクトリである、$(AIRFLOW_HOME)/logs/scheduler/ 配下に標準出力・エラーを出力することにした。

trigger_dag による実行

  • 初めての実行(既に実行されていない)の場合:
INFO - Created <XxxXxx xxxxxxx @ 2017-04-27 00:00:00: manual__2017-04-27T00:00:00, externally triggered: True>

というメッセージが表示され、内部で実行される?

  • 既に実行した場合:

** おそらく backfill で実行した場合

sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed:

というユニーク制約のエラーが出力される。また、次のようなファイル名のログファイルがある。

`$(AIRFLOW_HOME)/logs/xxxx/yyyyyy/2017-04-27T00:00:00

** trigger_dag で実行した場合:

airflow.exceptions.AirflowException: Run id manual__2017-04-27T00:00:00 already exists for dag id xxxxx

とでる。

clear が上手くいかない。

clear により、成功・失敗の state は消されるものだと信じたい。ところが

airflow.exceptions.AirflowException: Run id manual__2017-04-27T00:00:00 already exists for dag id xxxxx

とエラー再び表示される。何を消したのだ?

よろしい。ならば resetdb

エラーログ

OperationalError: (sqlite3.OperationalError) no such table: dag

scheduler は停止してから実行すべきだったか…。

scheduler の止め方。

一時停止コマンドはあっても、停止用のコマンドはない。

調べると、

Airflowを使ってワークフロー管理を行う — | サイオスOSS | サイオステクノロジー

にて、次のようなコマンドが示されていた。

pgrep -f '/usr/local/bin/airflow scheduler' | xargs kill -s SIGKILL

再び、起動。

ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1

といういつものエラーが表示されるが起動できた模様。

max_threads を 1 にしろ

次のようなエラーが出る。実行に失敗しているのか…

 {jobs.py:534} DagFileProcessor61 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1

デフォルトで 1 に設定して欲しい。(デフォルトは2) ついでだから、max_active_runs_per_dag も 1 にしてしまおう。

        cat $(AIRFLOW_HOME)/airflow.cfg \
        | sed -e 's/^max_threads = 2$$/max_threads = 1/g' \
        | sed -e 's/^max_active_runs_per_dag = 16$$/max_active_runs_per_dag = 1/g' \
        > $(AIRFLOW_HOME)/airflow.cfg.tmp

まだうっとおしいエラーが

こんなエラーが出続けて、ログが見にくい。

/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)

実行確認が取れない

running 中になっているが、実行されていない模様…

つづきは、 nakano-tomofumi.hatenablog.com

Airflow と格闘中(5)

nakano-tomofumi.hatenablog.com

のつづき。

今日で終わりにしたい。

airflow のコマンドを調べる

  • render

タスクの実行内容を表示する。BashOperator なら test -dr とほぼ同等。

  • trigger_dags

DAG の実行のトリガーを引く。求めていた機能の可能性-e オプションで日付指定できるが、既に ‘backfill’ で実行した DAG に対しては次のようなエラーとなる。

sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: dag_run.dag_id, dag_run.execution_date

こういうSQLのエラーじゃなくて、既に実行したよ、的なメッセージを表示してほしいのだが。 まだ実行していない日付を指定すると、

[2017-04-28 11:34:54,967] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-04-28 11:34:55,326] {models.py:167} INFO - Filling up the DagBag from /work_dir/airflow/dags
[2017-04-28 11:34:55,618] {cli.py:185} INFO - Created <DagRun DAG_ID @ 2017-04-23 00:00:00: manual__2017-04-23T00:00:00, externally triggered: True>

となるが、実行はされていない模様。誰が実行するのだろう…。あ、スケジューラを起動しないといけないのか。

Scheduling & Triggers — Airflow Documentation

つづきは、 nakano-tomofumi.hatenablog.com