中野智文のブログ

データ・マエショリストのメモ

Airflow と格闘中(14)

nakano-tomofumi.hatenablog.com

のつづき。

エラーで scheduler が落ちる。

次のエラーで落ちる。

Traceback (most recent call last):
  File "/XXX/python/venv/bin/airflow", line 28, in <module>
    args.func(args)
  File "/XXX/python/venv/lib/python2.7/site-packages/airflow/bin/cli.py", line 831, in scheduler
    job.run()
  File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute
    self._execute_helper(processor_manager)
  File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 1439, in _execute_helper
    (State.SCHEDULED,))
  File "/XXX/python/venv/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/XXX/python/venv/lib/python2.7/site-packages/airflow/jobs.py", line 1027, in _execute_task_instances
    open_slots = pools[pool].open_slots(session=session)
KeyError: u'backfill'

普通に考えると、scheduler っていうのは絶対落ちてはNGなのだが、落ちる。

airflow の DAG は設定と実行を同じ pythonファイルで管理している。 scheduler は設定を知るために一旦 DAGファイル・フォルダのpython をすべて読み込む。 backfill では何を読むべきかを指定するのだが、scheduler は何を読むべきかは知らない。よってすべて読むわけである。 これはあくまで予想だが、その読み込んだ全てのファイルのうち、一つでも問題があると、scheduler ごと落ちる。

scheduler は各プロセスの死活監視などを行っているが、自身が死んだら何もならない。crond が死ぬようなものである。

ところで、上記のエラーは DAG ファイルのpool の設定が backfill と書いてあるところをコメントアウトしたら解決した。

webserver をデーモンで起動する

-D オプションをつければいいらしいが、ずっと stdin を待っている。すなわち、デーモンになってない。なぜ? 分からない

とりあえず、コマンドの最後に & をつけよう!

ユニーク制約のエラー

トリガーで試みるも 次のようなエラーが吐かれていた。

[2017-05-31 16:07:15,817] {jobs.py:354} DagFileProcessor3855 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
...
  File "/XXXX/python/venv/lib/python2.7/site-packages/airflow/models.py", line 4117, in verify_integrity
    session.commit()
...
IntegrityError: (pysqlite2.dbapi2.IntegrityError) UNIQUE constraint failed: task_instance.task_id, task_instance.dag_id, task_instance.execution_date [SQL: u'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('task_name', 'dag_name', '2017-05-22 00:00:00.000000', None, None, None, None, 0, u'', 'user_name', None, None, 'bash_queue', 10, None, None, None)]

リトライ中になり、

[2017-05-31 16:12:31,562] {models.py:1126} DagFileProcessor4099 INFO - Dependencies all met for <TaskInstance: dag_name.task_name 2017-05-22 00:00:00 [up_for_retry]>
[2017-05-31 16:12:31,563] {jobs.py:1618} DagFileProcessor4099 INFO - Creating / updating <TaskInstance: dag_name.task_name 2017-05-22 00:00:00 [scheduled]> in ORM

となった後は、 fail した模様。

sqlite> .sch task_instance
CREATE TABLE "task_instance" (
        task_id VARCHAR(250) NOT NULL,
        dag_id VARCHAR(250) NOT NULL,
        execution_date DATETIME NOT NULL,
        start_date DATETIME,
        end_date DATETIME,
        duration FLOAT,
        state VARCHAR(20),
        try_number INTEGER,
        hostname VARCHAR(1000),
        unixname VARCHAR(1000),
        job_id INTEGER,
        pool VARCHAR(50),
        queue VARCHAR(50),
        priority_weight INTEGER,
        operator VARCHAR(1000),
        queued_dttm DATETIME, pid INTEGER,
        PRIMARY KEY (task_id, dag_id, execution_date)
);
CREATE INDEX ti_state_lkp ON task_instance (dag_id, task_id, execution_date, state);
CREATE INDEX ti_pool ON task_instance (pool, state, priority_weight);
CREATE INDEX ti_dag_state ON task_instance (dag_id, state);
CREATE INDEX ti_state ON task_instance (state);

から、UNIQUE制約は、primary key のみといったところか。中身を見てみると、

sqlite> select * from task_instance where task_id = 'task_name' and dag_id = 'dag_name';
task_name|dag_name|2017-05-22 00:00:00.000000|2017-05-31 16:12:34.099778|2017-05-3116:12:44.141016|10.041238|failed|2|host_name|user_name|161||bash_queue|10|BashOperator|2017-05-31 16:07:19.593213|25148

ということらしい。キューに入ったのは、先程のユニーク制約のエラーの後??? よくわからない。

[2017-05-31 16:07:18,138] {jobs.py:343} DagFileProcessor3857 INFO - Started process (PID=24635) to work on /dag_name.py
[2017-05-31 16:07:18,139] {jobs.py:516} DagFileProcessor3857 INFO - dag_id: None
[2017-05-31 16:07:18,141] {jobs.py:1523} DagFileProcessor3857 INFO - Processing file /dag_name.py for tasks to queue

とはなっている。

つづきは、

nakano-tomofumi.hatenablog.com