Airflow と格闘中(14)
nakano-tomofumi.hatenablog.com
のつづき。
エラーで scheduler が落ちる。
次のエラーで落ちる。
Traceback (most recent call last): File "/XXX/python/venv/bin/airflow", line 28, in <module> args.func(args) File "/XXX/python/venv/lib/python2.7/site-packages/airflow/bin/cli.py", line 831, in scheduler job.run() File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run self._execute() File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute self._execute_helper(processor_manager) File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 1439, in _execute_helper (State.SCHEDULED,)) File "/XXX/python/venv/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs) File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 1027, in _execute_task_instances open_slots = pools[pool].open_slots(session=session) KeyError: u'backfill'
普通に考えると、scheduler っていうのは絶対落ちてはNGなのだが、落ちる。
airflow の DAG は設定と実行を同じ pythonファイルで管理している。 scheduler は設定を知るために一旦 DAGファイル・フォルダのpython をすべて読み込む。 backfill では何を読むべきかを指定するのだが、scheduler は何を読むべきかは知らない。よってすべて読むわけである。 これはあくまで予想だが、その読み込んだ全てのファイルのうち、一つでも問題があると、scheduler ごと落ちる。
scheduler は各プロセスの死活監視などを行っているが、自身が死んだら何もならない。crond が死ぬようなものである。
ところで、上記のエラーは DAG ファイルのpool の設定が backfill と書いてあるところをコメントアウトしたら解決した。
webserver をデーモンで起動する
-D
オプションをつければいいらしいが、ずっと stdin を待っている。すなわち、デーモンになってない。なぜ? 分からない
とりあえず、コマンドの最後に &
をつけよう!
ユニーク制約のエラー
トリガーで試みるも 次のようなエラーが吐かれていた。
[2017-05-31 16:07:15,817] {jobs.py:354} DagFileProcessor3855 ERROR - Got an exception! Propagating... Traceback (most recent call last): ... File "/XXXX/python/venv/lib/python2.7/site-packages/airflow/models.py", line 4117, in verify_integrity session.commit() ... IntegrityError: (pysqlite2.dbapi2.IntegrityError) UNIQUE constraint failed: task_instance.task_id, task_instance.dag_id, task_instance.execution_date [SQL: u'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('task_name', 'dag_name', '2017-05-22 00:00:00.000000', None, None, None, None, 0, u'', 'user_name', None, None, 'bash_queue', 10, None, None, None)]
リトライ中になり、
[2017-05-31 16:12:31,562] {models.py:1126} DagFileProcessor4099 INFO - Dependencies all met for <TaskInstance: dag_name.task_name 2017-05-22 00:00:00 [up_for_retry]> [2017-05-31 16:12:31,563] {jobs.py:1618} DagFileProcessor4099 INFO - Creating / updating <TaskInstance: dag_name.task_name 2017-05-22 00:00:00 [scheduled]> in ORM
となった後は、 fail した模様。
sqlite> .sch task_instance CREATE TABLE "task_instance" ( task_id VARCHAR(250) NOT NULL, dag_id VARCHAR(250) NOT NULL, execution_date DATETIME NOT NULL, start_date DATETIME, end_date DATETIME, duration FLOAT, state VARCHAR(20), try_number INTEGER, hostname VARCHAR(1000), unixname VARCHAR(1000), job_id INTEGER, pool VARCHAR(50), queue VARCHAR(50), priority_weight INTEGER, operator VARCHAR(1000), queued_dttm DATETIME, pid INTEGER, PRIMARY KEY (task_id, dag_id, execution_date) ); CREATE INDEX ti_state_lkp ON task_instance (dag_id, task_id, execution_date, state); CREATE INDEX ti_pool ON task_instance (pool, state, priority_weight); CREATE INDEX ti_dag_state ON task_instance (dag_id, state); CREATE INDEX ti_state ON task_instance (state);
から、UNIQUE制約は、primary key のみといったところか。中身を見てみると、
sqlite> select * from task_instance where task_id = 'task_name' and dag_id = 'dag_name'; task_name|dag_name|2017-05-22 00:00:00.000000|2017-05-31 16:12:34.099778|2017-05-3116:12:44.141016|10.041238|failed|2|host_name|user_name|161||bash_queue|10|BashOperator|2017-05-31 16:07:19.593213|25148
ということらしい。キューに入ったのは、先程のユニーク制約のエラーの後??? よくわからない。
[2017-05-31 16:07:18,138] {jobs.py:343} DagFileProcessor3857 INFO - Started process (PID=24635) to work on /dag_name.py [2017-05-31 16:07:18,139] {jobs.py:516} DagFileProcessor3857 INFO - dag_id: None [2017-05-31 16:07:18,141] {jobs.py:1523} DagFileProcessor3857 INFO - Processing file /dag_name.py for tasks to queue
とはなっている。
つづきは、