中野智文のブログ

データ・マエショリストのメモ

luigi の RangeDaily をコードで使う

背景

範囲日の繰り返し DateIntervalParameter

指定した範囲の日の処理を指定したい場合は、チュートリアルには次のようなコード例がある。

http://luigi.readthedocs.io/en/stable/example_top_artists.html#step-1b-running-this-in-hadoop

    def requires(self):
        return [StreamsHdfs(date) for date in self.date_interval]

この date_interval はこれ以降説明など出てこない。もっと細かい仕様を調べたい場合は、 DateIntervalParameter のドキュメントを読む必要がある。

http://luigi.readthedocs.io/en/stable/api/luigi.parameter.html#luigi.parameter.DateIntervalParameter

何かちょっとドキュメントとして読みにくい。また DateIntervalParameter はタスクのパラメータであるので、 本当にほしいのは、渡された別のパラメータから日付の範囲を生成する DateInterval のような気がするのだが、

http://luigi.readthedocs.io/en/stable/api/luigi.date_interval.html#luigi.date_interval.DateInterval

ドキュメントは徐々に何も書かれていなくなってくる。

範囲日の繰り返し RangeDaily

ところで、luigi コマンド上で上記のような範囲指定を行いたい場合コードが書けないので、タスクのコードをDateParameterから、DateIntervalParameterに変えるとか、 それを呼び出す範囲日指定のラッパータスクを作成しなければならない。 そこでコマンドレベルでも特にコードを書かずに上記のような範囲の実行を行う一般的なラッパータスク RangeDaily が用意されている。

Luigi Patterns — Luigi 2.6.2 documentation

luigi --module all_reports RangeDaily --of AllReportsV2 --start 2014-10-31 --stop 2014-12-25

本題

RangeDaily はコードでも使えるよ、という話。RangeDaily であればドキュメントは割とあるのと、広範囲を誤って指定しても実行されないような仕組みがあるので、 そういった意味で比較的安全に範囲日を指定できるというメリットがある。

コード例

import luigi
from luigi.tools.range import RangeDaily
from my_sub_task import MySubTask

class MyTask(luigi.Task):
    date = luigi.DateParameter()
    start_date = luigi.DateParameter()
    opt1 = luigi.Parameter()
    opt2 = luigi.Parameter()

    def requires(self):
        return [RangeDaily(of=MySubTask,
                           start=self.start_date,
                           stop=self.date,
                           task_limit=100,
                           days_back=200,
                           of_params={"opt1": self.opt1,
                                      "opt2": self.opt2})]

関数の outputrun は必要。

task_limit days_back は安全のためのオプションで、それぞれデフォルトでは 50 と100 に設定されている。 詳しくは次で確認できる。

http://luigi.readthedocs.io/en/stable/api/luigi.tools.range.html#luigi.tools.range.RangeBase

http://luigi.readthedocs.io/en/stable/api/luigi.tools.range.html#luigi.tools.range.RangeDailyBase

of_paramsdate 以外のパラメータを渡すための、dict。 注意しなくてはいけないのは、 date型などを直接この op_params の value に入れると、JSON 化できないというエラーとなる。 文字列に変換してから入れる必要があるが、文字列に変換すると今度は DateParameter の方が日付型に変換してくれないというオチ。 (そんなときは無理してこの RagenDaily を使う必要はない)

コマンドライン

コマンドラインで呼び出す場合は次のようになる。

luigi --module my_task MyTask --start-date 2017-07-22  --date 2017-07-25 --opt1=aaa opt2=bbb --workers=5

--workers のオプションはここに来る。(RangeDailyのオプションではないので)。 また 今回のコードにする話に限ったことではないが、--stop はその日を含まないので注意。

(おまけ)その日を含むコードを実行

require にその日を追加する方法。

    def requires(self):
        return [MySubTask(date=self.date,
                          opt1=self.opt1,
                          opt2=self.opt2),
                RangeDaily(of=MySubTask,
                           start=self.start_date,
                           stop=self.date,
                           task_limit=100,
                           days_back=200,
                           of_params={"opt1": self.opt1,
                                      "opt2": self.opt2})]

stop に一日後を指定する方法。

timedelta を、 import する必要がある。

from datetime import timedelta # (←追加)
...
(中略)
...

    def requires(self):
        stop_date = self.date + timedelta(days=1)   
        return [RangeDaily(of=MySubTask,
                           start=self.start_date,
                           stop=stop_date,
                           task_limit=100,
                           days_back=200,
                           of_params={"opt1": self.opt1,
                                      "opt2": self.opt2})]