airflow と格闘中(8)
nakano-tomofumi.hatenablog.com
のつづき。
GWも明けたが、先週に引き続き、会社のノートPCを交換中。なので更新遅め。
backfill
に戻したら、BackfillJob is deadlocked
.
うーむ。。。
trigger
にするために終了を確認するファイルを touch していたのだが、そこで発生しているらしい。該当のエラーログはこんな感じ。
[2017-05-08 15:46:39,338] {jobs.py:1935} WARNING - Deadlock discovered for tasks_to_run=[<TaskInstance: xxxxxx.yyyyyy 2017-05-06 00:00:00 [up_for_retry]>]
間違いが見つかった。これはもしかして、 trigger
がうまく実行されなかった原因か…
つづきは、
airflow と格闘中(7)
nakano-tomofumi.hatenablog.com
のつづき。
結局 trigger_dag
によって未だに成功はしていない。
trigger_dag
はDAG(タスク)の終了までコマンドを待たない
さらに大事なことに気がついた。luigi の実行では、その実行コマンドがタスクが全て完了するまで待つことになるが、
airflow では、 trigger_dag
はタスクが終了するまで待つものではない。よってmakefile の一部の処理を airflow で行うという使い方は triger_dag
ではできない。
もしそのように部分的に使う場合には、backfill
で行うか、airflowで処理する対象が処理の後半で最後まで完結している必要がある。
謎のエラー
いついつから heart beat がないよ、というエラーだが、
[2017-05-02 14:04:23,791] {models.py:328} DagFileProcessor8398 INFO - Failing jobs without heartbeat after 2017-05-02 13:59:23.791645 [2017-05-02 14:04:25,917] {models.py:328} DagFileProcessor8399 INFO - Failing jobs without heartbeat after 2017-05-02 13:59:25.916644 [2017-05-02 14:04:28,002] {models.py:328} DagFileProcessor8400 INFO - Failing jobs without heartbeat after 2017-05-02 13:59:28.002362
heart beat がないという時刻も現在時刻に合わせ徐々に増えつつあるので、この処理自体、kill_zombies
という関数でよばれているのだが、一向に ‘kill’ される気配もない。
つづきは、
Airflow と格闘中(6)
皆様に置きましてはGW中かもしれませんが、小学生を持つ親は、小学校が休みでない以上、普通に出勤となります。
nakano-tomofumi.hatenablog.com
のつづき。
scheduler
が正解。
というかドキュメントは最後まで読もう。scheduler
を起動しておいて、 trigger_dag
で実行。 これが makefile に近い。
次の記事なども読みノウハウ吸収。
scheduler
の立ち上げ方
scheduler をオプションなしで起動すると、そのハートビートのためか、意味なく大量のログが画面に出力する。バックグランドで実行されているわけでもなさそうだし、適切なオプション例を調べてみる。
によると、-D
オプションでデーモンとして起動させ、標準出力・エラーともログディレクトリに出力するのが普通らしい。
自分は、scheduler
の log ディレクトリである、$(AIRFLOW_HOME)/logs/scheduler/
配下に標準出力・エラーを出力することにした。
trigger_dag
による実行
- 初めての実行(既に実行されていない)の場合:
INFO - Created <XxxXxx xxxxxxx @ 2017-04-27 00:00:00: manual__2017-04-27T00:00:00, externally triggered: True>
というメッセージが表示され、内部で実行される?
- 既に実行した場合:
** おそらく backfill
で実行した場合
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed:
というユニーク制約のエラーが出力される。また、次のようなファイル名のログファイルがある。
`$(AIRFLOW_HOME)/logs/xxxx/yyyyyy/2017-04-27T00:00:00
** trigger_dag
で実行した場合:
airflow.exceptions.AirflowException: Run id manual__2017-04-27T00:00:00 already exists for dag id xxxxx
とでる。
clear
が上手くいかない。
clear
により、成功・失敗の state は消されるものだと信じたい。ところが
airflow.exceptions.AirflowException: Run id manual__2017-04-27T00:00:00 already exists for dag id xxxxx
とエラー再び表示される。何を消したのだ?
よろしい。ならば resetdb
だ
エラーログ
OperationalError: (sqlite3.OperationalError) no such table: dag
scheduler
は停止してから実行すべきだったか…。
scheduler
の止め方。
一時停止コマンドはあっても、停止用のコマンドはない。
調べると、
Airflowを使ってワークフロー管理を行う — | サイオスOSS | サイオステクノロジー
にて、次のようなコマンドが示されていた。
pgrep -f '/usr/local/bin/airflow scheduler' | xargs kill -s SIGKILL
再び、起動。
ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
といういつものエラーが表示されるが起動できた模様。
max_threads
を 1 にしろ
次のようなエラーが出る。実行に失敗しているのか…
{jobs.py:534} DagFileProcessor61 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
デフォルトで 1 に設定して欲しい。(デフォルトは2)
ついでだから、max_active_runs_per_dag
も 1 にしてしまおう。
cat $(AIRFLOW_HOME)/airflow.cfg \ | sed -e 's/^max_threads = 2$$/max_threads = 1/g' \ | sed -e 's/^max_active_runs_per_dag = 16$$/max_active_runs_per_dag = 1/g' \ > $(AIRFLOW_HOME)/airflow.cfg.tmp
まだうっとおしいエラーが
こんなエラーが出続けて、ログが見にくい。
/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance. 'strategies for improved performance.' % expr)
実行確認が取れない
running 中になっているが、実行されていない模様…
Airflow と格闘中(5)
nakano-tomofumi.hatenablog.com
のつづき。
今日で終わりにしたい。
airflow のコマンドを調べる
render
タスクの実行内容を表示する。BashOperator
なら test -dr
とほぼ同等。
trigger_dags
DAG の実行のトリガーを引く。求めていた機能の可能性。 -e
オプションで日付指定できるが、既に ‘backfill’ で実行した DAG に対しては次のようなエラーとなる。
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: dag_run.dag_id, dag_run.execution_date
こういうSQLのエラーじゃなくて、既に実行したよ、的なメッセージを表示してほしいのだが。 まだ実行していない日付を指定すると、
[2017-04-28 11:34:54,967] {__init__.py:57} INFO - Using executor SequentialExecutor [2017-04-28 11:34:55,326] {models.py:167} INFO - Filling up the DagBag from /work_dir/airflow/dags [2017-04-28 11:34:55,618] {cli.py:185} INFO - Created <DagRun DAG_ID @ 2017-04-23 00:00:00: manual__2017-04-23T00:00:00, externally triggered: True>
となるが、実行はされていない模様。誰が実行するのだろう…。あ、スケジューラを起動しないといけないのか。
Airflow と格闘中(4)
nakano-tomofumi.hatenablog.com
のつづき。
多重実行は同時実行も完了タスクの実行もされちゃう
backfill
であるからか、普通に多重実行はされてしまう。多重実行と言っても、同時に実行と、既に完了したタスクの実行の二種類があると思うが、両方共実行されてしまう。
このままだと、タスクが途中でエラーで止まった。問題点は修正したいので、再開したい、といういうような使い方はできないみたいだし、タスクの依存関係で前処理のタスクが現在実行中で、それが終わってから実行、みたいなインテリジェントな実行も出来ないということなのか…。 run
では既に実行し成功したタスクは処理しなかったのだが…
AirFlow と格闘中(3)
nakano-tomofumi.hatenablog.com
のつづき。
run で確認し、backfill で全体確認する
Makefile から呼び出す想定だと(スケジューラであるはずの AirFlow をそのような形で呼び出すこと自体おかしいという話もあるが、まずは移行から始まるので…)、コマンドは、 run
か backfill
になるはず。以前にも述べたが、run
自体には依存タスクの実行は行わない(依存タスクの状態をみて実行するかどうかは決める)ので、普通に考えると、backfill
になるだろう。ただし、backfill
は日付の範囲指定を行うので、開始日と終了日を同じに日に指定するという、AirFlow としては想定外の使い方になるだろう(想定外の使い方というのは、想定外のエラーを引き起こしたりするので注意が必要)
また、backfill
では特にどのタスクをトリガーとして実行するか、みたいな指定はできない。全て実行されると思ったほうがいい(このあたりも特にドキュメントにはないが、使わないタスクなど DAG ファイルには書いてない前提なのだろう)。
複数コマンドを連続的にかつ失敗したらその行で終了したい
BashOperator
の説明には、次のようにある。
bash_command (string) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed
これを見ると複数のコマンドが書けるらしい。単に複数の行で書いてあるだけのかもしれない。
Makefile では、複数のコマンド行と、行中にセミコロンで複数のコマンドを結合した場合(Makefileのしようというより、shの仕様かもしれないが)では結果が異なる。セミコロンで結合した場合には最後の exit 値がその行の exit 値となる。途中で失敗しても最後で成功したらその行は成功したことになる。
BashOperator
ではどうなるのだろう。まだ複数行の検証をしていないので、検証をしてみたい。
複数行なら、python で複数のタスクに分割した方が、いいような気もするが…。明日検証する(このブログはメモなんでまとめてから出すことはしない予定。あしからず)。
AirFlow と格闘中(2)
nakano-tomofumi.hatenablog.com
のつづき。
upstream と downstream の違い
upstream と downstream の違いが分からないわけではないが、両側から設定できるのは何の意味があるのか。 特に書いてないから、気になるは気になる…。(おそらくどっちでもいい)
BashOperator のカレントディレクトリが、一時ディレクトリ
なのだが、airflow の実行ディレクトリというのはないらしい。 とりあえず、DAG ファイルの中で、
import os ... cwd = os.getcwd() ... task1 = BashOperator( task_id='task1', bash_command="ls {{params.cwd}}", params={'cwd': cwd}, dag=dag)
みたいな感じで、対応。(上記はairflowが実行されたディレクトリを ls
するだけというもの)
つづきは、