読者です 読者をやめる 読者になる 読者になる

nakano-tomofumiのブログ

データマエショリストのメモ

luigi と格闘中

背景

luigi と airflow と make を比較している。

luigi と格闘中

big query のクエリを書き換えたので再実行

あれ? 再実行されない。

ズバリな記事が見つかる。

datapipelinearchitect.com

なるほど…。というか、removing all intermediate and final outputs って…。 downstream のすべての出力を調べて削除しないといけないわけだよ。それが面倒だから、こういったツールを使っているのに。 ちなみに、make で touch 作戦でチマチマ依存関係書いていく作戦だと、依存関係のあるターゲットは全て実行され更新される。

一つの解決策としては、 upstream の依存ファイルのハッシュ値みたいなものをパラメータとして持たせる方法が考えられる。 パラメータが変われば、別のタスクとして見なされるので再実行される(はず。luigji も詳しくないのでよくわからないが) downstream では upstream の全ての依存ファイルのハッシュ値をパラメータとして渡す必要があるが、各クラスの依存ファイルが何であるかを示すような interface を持ち、かつ upstreamの全ての依存ファイルのハッシュ値を取得しパラメータとして自動的に渡す関数を実装した task class を使うことにすれば解決するかもしれない。 誰かそういうの作っていないかな…(自分で作れよ)。いやもっとマシな解決策があるのかもしれないが…

もうちょっと調べると、complete を再実装した例が見つかった。

stackoverflow.com

こちらは make のように出力のタイムスタンプを見て完了かどうかを判断する。 ただし、big query とかのタイムスタンプに対応させるには、色々頑張らないといけないかもしれない。

luigi の RangeDaily

luigi の backfill コマンドとして、RangeDaily というのがあり、 start と stop で日付を指定する。

luigi --module xxx RangeDaily --of XXX   --start 2017-02-07 --stop 2017-05-17 --task-limit 100 --workers 5

こんな感じ。ところが、

DEBUG: Checking if RangeDaily(of=XXX, of_params={}, reverse=False, task_limit=100, now=None, param_name=None, start=2017-02-07, stop=2017-05-17, days_back=100, days_forward=0) is complete
DEBUG: Actually checking if range [2017-02-09, 2017-05-16] of XXX is complete

と出て、 2017-02-09 と、2017-05-16 に勝手に狭められている…。stopがその日を含まない、としても、start は2日もずれてるだろ。

以下に書いてあった。

http://luigi.readthedocs.io/en/stable/api/luigi.tools.range.html#luigi.tools.range.RangeDailyBase.days_back

days_back 今日から何日前まで遡れるか、を決めているらしい。無限に過去に遡ったりするのを防止するために。 デフォルトは100ということで、それにひかかっただけの模様。ちなみに、stop の方は、exclusive だそうで、その日を含まない、ということを意味する。 stop でその日を含まないって、誰得なんだろうか。backfill としてはその日(特に当日)を含むと色々まずいのかな…。

Wilson score interval を使う。

背景

以前、

blog.goo.ne.jp

にて、Wilson score interval with continuity correction の式のテンプレートを書いたが、本当は外側に条件分岐 if が必要だったり、判別式の中が負になることもあったりと、ちょっと注意して使う必要があった(おいおい)。ちなみに、Wald法とよばれる信頼区間を求める方法があるが、これは正しくないかもしれない信頼区間(こんなものを信頼区間と呼べるのか)を求める方法なので絶対に使わないように。

そこで今回は単なる(with continuity correction が付かない) wilson score interval のテンプレートを作成する。

Wilson score interval テンプレート

前回は ‘P’ という標本平均をもとにテンプレートを作ったが、結局、P を計算する式が必要で、置換後もっと複雑になるので、成功した回数をX、試行回数をN、標準正規分布の分位数を Zとすると、

UCB: (X+Z*Z/2+Z*sqrt(X*(N-X)/N+Z*Z/4))/(N+Z*Z)
LCB: (X+Z*Z/2-Z*sqrt(X*(N-X)/N+Z*Z/4))/(N+Z*Z)

なんとシンプルな。

参考

数式

https://en.wikipedia.org/wiki/Binomial_proportion_confidence_interval#Wilson_score_interval

文献

Wilson の論文 http://www.barestatistics.nl/uploads/1/1/7/9/11797954/wilson_1927.pdf

airflow と格闘中 (11)

まだまだ

pysqlite2 がない

apt-get install sqlite3-dev
pip install pysqlite

click がない?

次のエラー

[2017-05-15 10:12:53,877] {__init__.py:57} INFO - Using executor SequentialExecutor
Traceback (most recent call last):
  ....
  File "/xxxxxxxxx/python/venv/lib/python2.7/site-packages/flask/cli.py", line 17, in <module>
    import click
pip install flask

を実行しても、インストール済みと表示される。

次を参考に、

python flask import error - Stack Overflow

sudo apt-get install python-flask

を実行。virtualenv を使っているからか、解決せず。 pip install flask のメッセージをもう一度確認してみる。

Requirement already satisfied (use --upgrade to upgrade): flask in /xxxxxxxxx/python/venv/lib/python2.7/site-packages

そういうこと?

pip install --upgrade flask

で無事インストール。私の python 環境構築力が問われる。

gdrive の import ができなくなった。

背景

もともと、

github.com

の問題に対応するため、パッチを作った(つもりだった)。

症状

別環境で試すことになったので、もう一度 build してみると、なんと動かなくなっているんだよね。 (すなわち issue の状態)

調査

バックアップ

とりあえず、現行のうまく動くバイナリ(?)をバックアップした。(これは大正解)

オリジナルのレポジトリの upstream merge してbuild

オリジナルのレポジトリを upstream merge して(これが余分だった) build してみてもNG。

Merge remote-tracking branch 'upstream/master' · tomofumi-nakano/gdrive@eef35d2 · GitHub

以前の環境で上記の merge を反映させたソースで build

以前の環境で最新のレポジトリに反映させて build を試みるも次のようなエラーが。

handlers_drive.go:12:2: cannot find package "github.com/prasmussen/gdrive/auth" in any of:
        /usr/local/Cellar/go/1.8.1/libexec/src/github.com/prasmussen/gdrive/auth (from $GOROOT)
        /Users/t-nakano/src/github.com/prasmussen/gdrive/auth (from $GOPATH)
gdrive.go:7:2: cannot find package "github.com/prasmussen/gdrive/cli" in any of:
        /usr/local/Cellar/go/1.8.1/libexec/src/github.com/prasmussen/gdrive/cli (from $GOROOT)
        /Users/t-nakano/src/github.com/prasmussen/gdrive/cli (from $GOPATH)
compare.go:5:2: cannot find package "github.com/prasmussen/gdrive/drive" in any of:
        /usr/local/Cellar/go/1.8.1/libexec/src/github.com/prasmussen/gdrive/drive (from $GOROOT)
        /Users/t-nakano/src/github.com/prasmussen/gdrive/drive (from $GOPATH)

うまくいかなったので、

go get github.com/tomofumi-nakano/gdrive
go get golang.org/x/net/context golang.org/x/oauth2 golang.org/x/oauth2/google google.golang.org/api/sheets/v4

ということをやってしまった。すると、build は通ったが、issue の症状となってしまう。

fix時の状態に戻して build してみる。

fix時の状態に戻して build してみる。それでも issue の症状になってしまう。

ところが、バックアップしたバイナリではうまく行った。

  • gdrive のソースが悪いわけではない。となると外部ソース関係(ライブラリなど)が怪しい。
  • 外部ソースは現行のバイナリの中に含まれている。

ということがわかる。

gdb でやってみる。

このコマンドは基本的には、googleapi と通信しているだけなので、その通信内容の違いを見れば、何が違うのかが分かる。 違いが判明すれば、どの部分に問題があるのかが分かるだろう。 当初は wireshark なども検討した。ところが、https で通信していれば、中身を見ることはできないことに気がついた。

というわけで、gdb で見ることにした。

懸念点としては、デバッグ用の文字列が埋め込まれているかどうかであるが…

まず gdb で run すると次のようなありがたいメッセージが

During startup program terminated with signal ?, Unknown signal.

c - Unknown ending signal when using debugger gdb - Stack Overflow

はい、すみません、インストール後、次のようなありがたいメッセージを頂いておりました。

gdb requires special privileges to access Mach ports.
You will need to codesign the binary. For instructions, see:

  https://sourceware.org/gdb/wiki/BuildingOnDarwin

On 10.12 (Sierra) or later with SIP, you need to run this:

  echo "set startup-with-shell off" >> ~/.gdbinit

一つ目は sudo しただけだったけど、二つ目は無視していました。

sudo gdb gdrive.ok
...
(gdb) run import  --mime text/csv file.csv
...
warning: unhandled dyld version (15)
Imported xxxx with mime type: 'application/vnd.google-apps.spreadsheet'
...

おおうまく行った。やはり ok バージョンは問題ない。

次にプログラムが埋め込まれているかどうか。

(gdb) list
warning: Source file is more recent than executable.
11      const Version = "2.1.0"
12
13      const DefaultMaxFiles = 30
14      const DefaultMaxChanges = 100
....

おーっ、埋め込まれている模様!!!

warning: Source file is more recent than executable.

は多少気になるがとりあえずOK。

(gdb) list drive/upload.go:178
warning: Source file is more recent than executable.
173             dstFile.Parents = args.Parents
174
175             // Chunk size option
176             chunkSize := googleapi.ChunkSize(int(args.ChunkSize))
177
178             // Wrap file in progress reader
179             progressReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
180
181             // Wrap reader in timeout reader
182             reader, ctx := getTimeoutReaderContext(progressReader, args.Timeout)

おやおや?

https://github.com/tomofumi-nakano/gdrive/commit/4540c5c5398d072613cf8c5efba5f0d12dbc8947#diff-e6d188e26b532dc776d44b31e5db6b5aR178

ここで修正されているはずの内容が修正されていませんね…。どういうことなのでしょうか…

strings で各バイナリの違いを見てみる

strings ~/bin/gdrive.ok> ok.txt
strings ~/bin/gdrive.ng> ng.txt
vim -d ok.txt ng.txt

多すぎて判明せず。とりあえず、別のバイナリと思って良さそう。

もう一度原因を見てみる

google-api-go-client/drive-gen.go at 650535c7d6201e8304c92f38c922a9a3a36c6877 · google/google-api-go-client · GitHub

// Media specifies the media to upload in one or more chunks. The chunk
// size may be controlled by supplying a MediaOption generated by
// googleapi.ChunkSize. The chunk size defaults to
// googleapi.DefaultUploadChunkSize.The Content-Type header used in the
// upload request will be determined by sniffing the contents of r,
// unless a MediaOption generated by googleapi.ContentType is
// supplied.
// At most one of Media and ResumableMedia may be set.
func (c *FilesCreateCall) Media(r io.Reader, options ...googleapi.MediaOption) *FilesCreateCall {
    opts := googleapi.ProcessMediaOptions(options)
    chunkSize := opts.ChunkSize
    if !opts.ForceEmptyContentType {
        r, c.mediaType_ = gensupport.DetermineContentType(r, opts.ContentType)
    }
    c.media_, c.mediaBuffer_ = gensupport.PrepareUpload(r, chunkSize)
    return c
}

ここで、DetermineContentType をするために、 opts.ContentTypeを渡している。 これで、変換をするわけだが、それが、

https://github.com/google/google-api-go-client/blob/a69f0f19d246419bb931b0ac8f4f8d3f3e6d4feb/gensupport/media.go#L72-L99

// DetermineContentType determines the content type of the supplied reader.
// If the content type is already known, it can be specified via ctype.
// Otherwise, the content of media will be sniffed to determine the content type.
// If media implements googleapi.ContentTyper (deprecated), this will be used
// instead of sniffing the content.
// After calling DetectContentType the caller must not perform further reads on
// media, but rather read from the Reader that is returned.
func DetermineContentType(media io.Reader, ctype string) (io.Reader, string) {
    // Note: callers could avoid calling DetectContentType if ctype != "",
    // but doing the check inside this function reduces the amount of
    // generated code.
    if ctype != "" {
        return media, ctype
    }

    // For backwards compatability, allow clients to set content
    // type by providing a ContentTyper for media.
    if typer, ok := media.(googleapi.ContentTyper); ok {
        return media, typer.ContentType()
    }

    sniffer := newContentSniffer(media)
    if ctype, ok := sniffer.ContentType(); ok {
        return sniffer, ctype
    }
    // If content type could not be sniffed, reads from sniffer will eventually fail with an error.
    return sniffer, ""
}

により、単に、c.mediaType_opts.ContentType が入ることになる。

とりあえず、デバッグしてみる

$ sudo gdb gdrive.ok
...
(gdb) break upload.go:187
...
(gdb) run import --mime text/csv file.csv
...
Thread 3 hit Breakpoint 1, github.com/prasmussen/gdrive/drive.(*Drive).uploadFile (self=0xc42002e1e8, args=..., ~r1=0x0, ~r2=0, ~r3=...) at /Users/t-nakano/src/github.com/prasmussen/gdrive/drive/upload.go:187
...
(gdb) p args.Mime
$2 = 0xc4203485d0 "application/vnd.google-apps.spreadsheet"

上記は正常な方。

そして、次は、修正されている方…。

(gdb) list upload.go:187
182             reader, ctx := getTimeoutReaderContext(progressReader, args.Timeout)
183
184             fmt.Fprintf(args.Out, "Uploading %s\n", args.Path)
185             started := time.Now()
186
187             f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum", "webContentLink").Context(ctx).Media(reader, chunkSize).Do()
188             if err != nil {
189                     if isTimeoutError(err) {
190                             return nil, 0, fmt.Errorf("Failed to upload file: timeout, no data was transferred for %v", args.Timeout)
191                     }

修正されていない!

いったいどうなっているのか…。なぜ修正されていない…。

もう一度はじめのエラーを思い出してみる。

go get とかで、色々取得していたが…。

そして upload.go を見つける。

見つかった。とりあえず、github.com/prasmussen/を消してbuild してみる。

go build -gcflags "-N -l"
handlers_drive.go:12:2: cannot find package "github.com/prasmussen/gdrive/auth" in any of:
        /usr/local/Cellar/go/1.8.1/libexec/src/github.com/prasmussen/gdrive/auth (from $GOROOT)
        /Users/t-nakano/src/github.com/prasmussen/gdrive/auth (from $GOPATH)
gdrive.go:7:2: cannot find package "github.com/prasmussen/gdrive/cli" in any of:
        /usr/local/Cellar/go/1.8.1/libexec/src/github.com/prasmussen/gdrive/cli (from $GOROOT)
        /Users/t-nakano/src/github.com/prasmussen/gdrive/cli (from $GOPATH)
compare.go:5:2: cannot find package "github.com/prasmussen/gdrive/drive" in any of:
        /usr/local/Cellar/go/1.8.1/libexec/src/github.com/prasmussen/gdrive/drive (from $GOROOT)
        /Users/t-nakano/src/github.com/prasmussen/gdrive/drive (from $GOPATH)
make: *** [gdrive] Error 1

きた、これだ。handlers_drive.go:12: に何が書いてあるんだっけ。

     12         "github.com/prasmussen/gdrive/auth"
     13         "github.com/prasmussen/gdrive/cli"
     14         "github.com/prasmussen/gdrive/drive"

ぐふっ。

そして golang の import のショボさについて知ることになる…

  • 相対パスによる local package はNG(理由はよくわらないが)
  • github 上の push してから go get により、使うのが良いとされる。
  • github で fork した場合(今回の自分)、対象なるパッケージ名も github.com/<user_name> となるはず。
  • そこを修正していなかった!

あらためて、import 先を修正。すると、

(gdb) list upload.go:187
182             progressReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
183
184             // Wrap reader in timeout reader
185             reader, ctx := getTimeoutReaderContext(progressReader, args.Timeout)
186
187             fmt.Fprintf(args.Out, "Uploading %s\n", args.Path)
188             started := time.Now()
189
190             f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum", "webContentLink").Context(ctx).Media(reader, chunkSize, contentType).Do()
191             if err != nil {

おー!。思った通りに反映されている!

うーん、去年の11月にPR送ったのに、こういった話があってか、うまくできんといわれて、PRが採用されないんだよね…

airflow と格闘中(10)

vm上で install しようとするも、numpyコンパイルらしきものが始まる

え?

次のような記事が見つかる…

kounoike.github.io

とりあえず、上記のようにしてもうまくいかない。(仮想環境の違いはあるだろう。)

そこで、numpy をインストール。

 pip install numpy

これはこのVMの環境ではうまく行った。 分析するわけでもないのに、numpy をインストールするハメになるとは…

ただし、まだ libxml 関連のエラーが出ている。これは python 関係ではないからつらいな。。。

libxml2-dev libxslt1-dev

apt-get update インストールしてやっと解決。

 pip install airflow

の出力の最後は、

Successfully installed lxml pandas psutil python-nvd3 tabulate thrift zope.deprecation Mako python-editor wtforms python-slugify Unidecode

だった。

vagrant の Synced Folders で gest 側は強制的にディレクトリが作成される

これを抑制するオプションはなさそう。

www.vagrantup.com

airflow と格闘中(9)

いやーもう本当に終わりにしたい。

バグも治ったので、trigger_dag に再挑戦

[2017-05-09 17:52:00,076] {models.py:3414} DagFileProcessor0 INFO - Creating ORM DAG for xxxxxx
/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_stats.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to
evaluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)
/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to ev
aluate.  Consider alternative strategies for improved performance.
  'strategies for improved performance.' % expr)

これは一体…。

明日は別の環境で実験予定。仮想環境整っていないけど…