luigi の RangeDaily をコードで使う
背景
範囲日の繰り返し DateIntervalParameter
指定した範囲の日の処理を指定したい場合は、チュートリアルには次のようなコード例がある。
http://luigi.readthedocs.io/en/stable/example_top_artists.html#step-1b-running-this-in-hadoop
def requires(self): return [StreamsHdfs(date) for date in self.date_interval]
この date_interval
はこれ以降説明など出てこない。もっと細かい仕様を調べたい場合は、
DateIntervalParameter
のドキュメントを読む必要がある。
http://luigi.readthedocs.io/en/stable/api/luigi.parameter.html#luigi.parameter.DateIntervalParameter
何かちょっとドキュメントとして読みにくい。また DateIntervalParameter
はタスクのパラメータであるので、
本当にほしいのは、渡された別のパラメータから日付の範囲を生成する DateInterval
のような気がするのだが、
http://luigi.readthedocs.io/en/stable/api/luigi.date_interval.html#luigi.date_interval.DateInterval
ドキュメントは徐々に何も書かれていなくなってくる。
範囲日の繰り返し RangeDaily
ところで、luigi コマンド上で上記のような範囲指定を行いたい場合コードが書けないので、タスクのコードをDateParameter
から、DateIntervalParameter
に変えるとか、
それを呼び出す範囲日指定のラッパータスクを作成しなければならない。
そこでコマンドレベルでも特にコードを書かずに上記のような範囲の実行を行う一般的なラッパータスク RangeDaily
が用意されている。
Luigi Patterns — Luigi 2.6.2 documentation
luigi --module all_reports RangeDaily --of AllReportsV2 --start 2014-10-31 --stop 2014-12-25
本題
RangeDaily
はコードでも使えるよ、という話。RangeDaily
であればドキュメントは割とあるのと、広範囲を誤って指定しても実行されないような仕組みがあるので、
そういった意味で比較的安全に範囲日を指定できるというメリットがある。
コード例
import luigi from luigi.tools.range import RangeDaily from my_sub_task import MySubTask class MyTask(luigi.Task): date = luigi.DateParameter() start_date = luigi.DateParameter() opt1 = luigi.Parameter() opt2 = luigi.Parameter() def requires(self): return [RangeDaily(of=MySubTask, start=self.start_date, stop=self.date, task_limit=100, days_back=200, of_params={"opt1": self.opt1, "opt2": self.opt2})]
関数の output
や run
は必要。
task_limit
days_back
は安全のためのオプションで、それぞれデフォルトでは 50 と100 に設定されている。
詳しくは次で確認できる。
http://luigi.readthedocs.io/en/stable/api/luigi.tools.range.html#luigi.tools.range.RangeBase
http://luigi.readthedocs.io/en/stable/api/luigi.tools.range.html#luigi.tools.range.RangeDailyBase
of_params
は date
以外のパラメータを渡すための、dict。
注意しなくてはいけないのは、 date
型などを直接この op_params
の value に入れると、JSON 化できないというエラーとなる。
文字列に変換してから入れる必要があるが、文字列に変換すると今度は DateParameter
の方が日付型に変換してくれないというオチ。
(そんなときは無理してこの RagenDaily
を使う必要はない)
コマンドライン例
コマンドラインで呼び出す場合は次のようになる。
luigi --module my_task MyTask --start-date 2017-07-22 --date 2017-07-25 --opt1=aaa opt2=bbb --workers=5
--workers
のオプションはここに来る。(RangeDaily
のオプションではないので)。
また 今回のコードにする話に限ったことではないが、--stop
はその日を含まないので注意。
(おまけ)その日を含むコードを実行
require
にその日を追加する方法。
def requires(self): return [MySubTask(date=self.date, opt1=self.opt1, opt2=self.opt2), RangeDaily(of=MySubTask, start=self.start_date, stop=self.date, task_limit=100, days_back=200, of_params={"opt1": self.opt1, "opt2": self.opt2})]
stop
に一日後を指定する方法。
timedelta
を、 import する必要がある。
from datetime import timedelta # (←追加) ... (中略) ... def requires(self): stop_date = self.date + timedelta(days=1) return [RangeDaily(of=MySubTask, start=self.start_date, stop=stop_date, task_limit=100, days_back=200, of_params={"opt1": self.opt1, "opt2": self.opt2})]