Airflow と格闘中(6)
皆様に置きましてはGW中かもしれませんが、小学生を持つ親は、小学校が休みでない以上、普通に出勤となります。
nakano-tomofumi.hatenablog.com
のつづき。
scheduler
が正解。
というかドキュメントは最後まで読もう。scheduler
を起動しておいて、 trigger_dag
で実行。 これが makefile に近い。
次の記事なども読みノウハウ吸収。
scheduler
の立ち上げ方
scheduler をオプションなしで起動すると、そのハートビートのためか、意味なく大量のログが画面に出力する。バックグランドで実行されているわけでもなさそうだし、適切なオプション例を調べてみる。
によると、-D
オプションでデーモンとして起動させ、標準出力・エラーともログディレクトリに出力するのが普通らしい。
自分は、scheduler
の log ディレクトリである、$(AIRFLOW_HOME)/logs/scheduler/
配下に標準出力・エラーを出力することにした。
trigger_dag
による実行
- 初めての実行(既に実行されていない)の場合:
INFO - Created <XxxXxx xxxxxxx @ 2017-04-27 00:00:00: manual__2017-04-27T00:00:00, externally triggered: True>
というメッセージが表示され、内部で実行される?
- 既に実行した場合:
** おそらく backfill
で実行した場合
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed:
というユニーク制約のエラーが出力される。また、次のようなファイル名のログファイルがある。
`$(AIRFLOW_HOME)/logs/xxxx/yyyyyy/2017-04-27T00:00:00
** trigger_dag
で実行した場合:
airflow.exceptions.AirflowException: Run id manual__2017-04-27T00:00:00 already exists for dag id xxxxx
とでる。
clear
が上手くいかない。
clear
により、成功・失敗の state は消されるものだと信じたい。ところが
airflow.exceptions.AirflowException: Run id manual__2017-04-27T00:00:00 already exists for dag id xxxxx
とエラー再び表示される。何を消したのだ?
よろしい。ならば resetdb
だ
エラーログ
OperationalError: (sqlite3.OperationalError) no such table: dag
scheduler
は停止してから実行すべきだったか…。
scheduler
の止め方。
一時停止コマンドはあっても、停止用のコマンドはない。
調べると、
Airflowを使ってワークフロー管理を行う — | サイオスOSS | サイオステクノロジー
にて、次のようなコマンドが示されていた。
pgrep -f '/usr/local/bin/airflow scheduler' | xargs kill -s SIGKILL
再び、起動。
ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
といういつものエラーが表示されるが起動できた模様。
max_threads
を 1 にしろ
次のようなエラーが出る。実行に失敗しているのか…
{jobs.py:534} DagFileProcessor61 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
デフォルトで 1 に設定して欲しい。(デフォルトは2)
ついでだから、max_active_runs_per_dag
も 1 にしてしまおう。
cat $(AIRFLOW_HOME)/airflow.cfg \ | sed -e 's/^max_threads = 2$$/max_threads = 1/g' \ | sed -e 's/^max_active_runs_per_dag = 16$$/max_active_runs_per_dag = 1/g' \ > $(AIRFLOW_HOME)/airflow.cfg.tmp
まだうっとおしいエラーが
こんなエラーが出続けて、ログが見にくい。
/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance. 'strategies for improved performance.' % expr)
実行確認が取れない
running 中になっているが、実行されていない模様…