中野智文のブログ

データ・マエショリストのメモ

luigi で resources を使ってリソース毎に同時実行数を制限する

背景

luigi は同時 workers オプションで同時 worker 数を制限できるが、CPU的には同時に16走っても問題ないが、DB的には一つしか接続したくない、という要望がある。

対応 resources を使う。

リソース毎の worker 数の設定と、それぞれのタスクがどれほどリソースを使うかをコードとして書く必要がある。

リソースごとの worker 数の設定

以下のリンクを読む。

Configuration — Luigi 2.7.5 documentation

ここで書いてない重要なことは2つある。

  • 例にある、hive や mysql は組み込み(ビルトイン)でもなんでもない。

例に書いてあるから、ここだけを設定すれば、hive での同時実行される worker 数が制限されると思ったら大間違い。 postgres など、書いていないリソースも同じ。これはあくまで例であって、実装しているタスクのコードに直接書かなくては効果がない。

  • リソース名は、自分で勝手に作っていい。

mysql のDBが2台あって、それぞれのリソース的には1接続づつしたい場合は、mysql1 と mysql2 として、

[resources]
mysql1=1
mysql2=1

と設定する。ただし先に述べたが、タスクのコードに書かないと全く意味はない。

タスクごとのリソースの設定

以下を読む。

http://luigi.readthedocs.io/en/stable/api/luigi.task.html#luigi.task.Task.resources

静的に、

  resources = {'mysql1': 1}

と書いてもいいし、@property を使って、動的に設定してもいい。

http://luigi.readthedocs.io/en/stable/luigi_patterns.html#avoiding-concurrent-writes-to-a-single-file

他にも設定したほうがいいこと

一つのクライアントでスケジューラーを使っている時はいいのだが、別のクライアントからスケジューラーにタスクが入って来てリソースの制限にぶつかると、 次のような状態になってペンディングタスクの実行をあきらめ止まってしまう。

   was not granted run permission by the scheduler

そんなことを防ぐためには、

[core]
worker_keep_alive=True

もしくは

[worker]
keep_alive=True

に設定しておく。

上記の二つが同じことはドキュメントでは確認できないが、下記のコードで確認できる。

https://github.com/spotify/luigi/blob/4af1d2236825375cdd064ce5d1c2a34a8b1414c0/luigi/worker.py#L357-L358

その他の参考

http://luigi.readthedocs.io/en/stable/luigi_patterns.html#tasks-that-regularly-overwrite-the-same-data-source