中野智文のブログ

データ・マエショリストのメモ

luigi で resources を使ってリソース毎に同時実行数を制限する

背景

luigi は同時 workers オプションで同時 worker 数を制限できるが、CPU的には同時に16走っても問題ないが、DB的には一つしか接続したくない、という要望がある。

対応 resources を使う。

リソース毎の worker 数の設定と、それぞれのタスクがどれほどリソースを使うかをコードとして書く必要がある。

リソースごとの worker 数の設定

以下のリンクを読む。

Configuration — Luigi 2.7.5 documentation

ここで書いてない重要なことは2つある。

  • 例にある、hive や mysql は組み込み(ビルトイン)でもなんでもない。

例に書いてあるから、ここだけを設定すれば、hive での同時実行される worker 数が制限されると思ったら大間違い。 postgres など、書いていないリソースも同じ。これはあくまで例であって、実装しているタスクのコードに直接書かなくては効果がない。

  • リソース名は、自分で勝手に作っていい。

mysql のDBが2台あって、それぞれのリソース的には1接続づつしたい場合は、mysql1 と mysql2 として、

[resources]
mysql1=1
mysql2=1

と設定する。ただし先に述べたが、タスクのコードに書かないと全く意味はない。

タスクごとのリソースの設定

以下を読む。

http://luigi.readthedocs.io/en/stable/api/luigi.task.html#luigi.task.Task.resources

静的に、

  resources = {'mysql1': 1}

と書いてもいいし、@property を使って、動的に設定してもいい。

http://luigi.readthedocs.io/en/stable/luigi_patterns.html#avoiding-concurrent-writes-to-a-single-file

他にも設定したほうがいいこと

一つのクライアントでスケジューラーを使っている時はいいのだが、別のクライアントからスケジューラーにタスクが入って来てリソースの制限にぶつかると、 次のような状態になってペンディングタスクの実行をあきらめ止まってしまう。

   was not granted run permission by the scheduler

そんなことを防ぐためには、

[core]
worker_keep_alive=True

もしくは

[worker]
keep_alive=True

に設定しておく。

上記の二つが同じことはドキュメントでは確認できないが、下記のコードで確認できる。

https://github.com/spotify/luigi/blob/4af1d2236825375cdd064ce5d1c2a34a8b1414c0/luigi/worker.py#L357-L358

その他の参考

http://luigi.readthedocs.io/en/stable/luigi_patterns.html#tasks-that-regularly-overwrite-the-same-data-source

s3上のgzip圧縮されたファイルの中身をイテレータで取得する

背景

s3上のgzip圧縮されたファイルの中身をイテレータで取得する方法がなかなか見つからない。

コード

import boto3
import tempfile
import gzip
import datetime as dt
from typing import Iterator


def s3_gz_cat(bucket: str, prefix='') -> Iterator[bytes]:
    s3 = boto3.resource('s3')
    s3_bucket = s3.Bucket(bucket)
    for s3_obj_sum in s3_bucket.objects.filter(Prefix=prefix):
        # s3_obj_sum.key がファイル名(path)なので、末尾が .gz 判定とかしたい場合はここで。                                                                                                                                                                   
        with tempfile.TemporaryFile() as temp:
            s3_obj = s3_obj_sum.Object()
            s3_obj.download_fileobj(temp)
            temp.seek(0)
            with gzip.GzipFile(fileobj=temp) as gz:
                for b in gz:
                    yield b


# 以下は使い方

if __name__ == '__main__':
    # my_bucket 内全て
    for b in s3_gz_cat(bucket='my_bucket'):
        print(b)

    # my_dir 内に限定
    for b in s3_gz_cat(bucket='my_bucket', prefix='my_dir'):
        print(b)

IOバッファでやる方法もあるみたいだが、バッファが溢れたときどうなるのか(詰まったりする?)心配なので、tempfile でやったほうがいいような気がする。

BigTable は東京リージョンでも使える!

背景

bigtable が東京リージョンでは使えないみたいな話が古い記事で見つかったりする。

GCPで東京リージョンにて使えるサービス&使えないサービス一覧|apps-gcp.com|G Suite(旧:Google Apps) やGoogle Cloud Platform サービスについて紹介します

本当は

使えるようになっている!!!

https://groups.google.com/forum/#!topic/google-cloud-bigtable-discuss/vxVAifl5jUM

下のページが日本語で表示される場合、Webページのフッターの言語をEnglishにすると真実が見える!

cloud.google.com

日本語でみると、まだサポートされていないことになっている!!!(2018/04/17現在)

Googleさん、何とかしてください!!!

numpy array を aws s3 に格納する。

背景

jupyter notebook を使っていて、データをローカルのファイルシステムに保存するようにしていると、 環境が変わったときにそのデータまで一緒に持ってこないといけないが、これは jupyter の利便性を失う一つの要因となる。 例えば github でjupyter notebook を管理する場合、ローカルにあるデータのファイルまで github 上に格納しなければならない。 google Colaboratory を使う場合、クラウド上で実行されるので、そのローカルにはデータは存在しない。

長々と書いてしまったが、要は、クラウド上に保存し、クラウドからローカルに落とせばよいのである。

あまり単純な例がない、numpy array を aws s3 に格納する例を書いておきたい。

numpy array を aws s3 に save し、load する例

import os
import boto3

os.environ["AWS_ACCESS_KEY_ID"] = "AAAAAAAAAAAAAAA"
os.environ["AWS_SECRET_ACCESS_KEY"] = "XXXXXXXXXXXXXXXXXXXXXXX"

s3 = boto3.resource('s3')
# 書き込み

import numpy as np
import tempfile

a = np.array([1,2,3,4,5,6])

with tempfile.TemporaryFile() as temp:
  np.save(temp, a)
  temp.seek(0)
  res = s3.Object(bucket_name="your-bucket", key="aaa.npy").upload_fileobj(temp)
# 読み込み

import tempfile

with tempfile.TemporaryFile() as temp:
  res = s3.Object(bucket_name="your-bucket", key="aaa.npy").download_fileobj(temp)
  temp.seek(0)
  aa = np.load(temp)

temp.seek(0)temp ファイルオブジェクトに書き込まれたデータを先頭から読み直すために、rewind している。 Webをみると、boto3 でなく boto を使った方法などいろいろ紹介されているが boto を使った方法はうまく動かなかった。 ちなみに、上記は、google colaboratory を使って確認した。

まとまりが悪いが、これにて。

ImportError: No module named '_tkinter' が出た場合

背景

毎回見る次のエラー

ImportError: No module named '_tkinter'

の対応だが、OSや python のバージョンにより対応が異なるので、メモする。

解決法

ubuntu (on Windows) の python3 の場合

sudo apt install python3-tk

Windows 用 github ツールにて、the repository does not seem to exist anymore

背景

Windowsgithub ツールで、次のようなエラーが出た。

the repository does not seem to exist anymore you may not have access or it may have been deleted or renamed

もちろん、そのようなリポジトリは実際存在する。

なぜか?

原因と解決

自分の場合は、github の権限の設定忘れだった。

チームの以下の設定を、

Repository permissions

Choose default permissions for user roles.
Organization members

を ”Wrie” 以上に設定する。

Windows10 に WSL を入れようとして、The term 'Enable-WindowsOptionalFeature' is not recognized …

背景

Windows 10 に WSL を入れようとして、

PS C:\Program Files\PowerShell\6.0.2> Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Windows-Subsystem-Linux
Enable-WindowsOptionalFeature : The term 'Enable-WindowsOptionalFeature' is not recognized as the name of a cmdlet, function, script file, or operable program.
Check the spelling of the name, or if a path was included, verify that the path is correct and try again.
At line:1 char:1
+ Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Windows- ...
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ CategoryInfo          : ObjectNotFound: (Enable-WindowsOptionalFeature:String) [], CommandNotFoundException
+ FullyQualifiedErrorId : CommandNotFoundException

みたいなエラーが出たあなた。

www.atmarkit.co.jp

や、

Windows Subsystem for Linux - Wikipedia

の通りに正しくやったのに。納得がいかない。

解決法

Windowsの機能の有効化または無効化」の中から、「Windows Subsystem for Linux」にチェックを入れて、再起動。

で、「Windowsの機能の有効化または無効化」は「コントロールパネル」にあるらしいのだが、その「コントロールパネル」が見つからなかったりするのはお約束。