中野智文のブログ

データ・マエショリストのメモ

aws s3 のファイルを圧縮しながらストリーミングでgcsに置く

背景

早い話が、gsutil の -z オプションの挙動(静的ホスティングじゃないとき)がよくわからなかったから。

cp - Copy files and objects  |  Cloud Storage Documentation  |  Google Cloud

コマンド

aws s3 cp s3://your-s3-bucket/your-file.txt - | gzip -c | gsutil cp - gs://your-gs-bucket/your-file.txt.gz

ちなみにこういう場合、通信料金的には s3 が置いてあるリージョンのEC2の(マイクロ)インスタンスがおおすすめ。

注意

aws のプロファイルが一つじゃなく、.boto ファイルとかを頑張れ、-z オプションの挙動に納得しているなら gsutil だけでいけそう。 もっというと、複数同時に走らせたりとかも可能みたいなので、できる方はそちらで。

google cloud storage の権限は defacl でバケット単位で設定する

背景

google cloud storage を使って共有しようと思ったが、設定はどうやるのか?

方法

  • gsutil defacl コマンドで(バケット単位で)設定する。
  • 既存のファイルについては、gsutil acl コマンドの -r オプションで追加する

例(user@example.comさんにread 権限を与えるケース):

gsutil defacl ch -u user@example.com:R gs://your-bucket
gsutil acl ch -u user@example.com:R -r gs://your-bucket

どうしてか

単なる ACL の設定だと、ファイル単位でしか設定できない。 よって新規のファイルが現われたときに、その都度設定する必要がある。

新規ファイルのデフォルトのACLを設定をするのが、gsutil の defacl コマンド。 Webのコンソール画面からは、デフォルトのACLの設定ができない(確認さえできない。糞)。

詳しくは以下のページが参考になる。

iga-ninja.hatenablog.com

その他

可能ならば、ユーザ単位で権限を設定するのでなくグループ単位で設定したい。

ec2 でシンプルな mail クライアント mailx

背景

ec2 の cron を実行したとき、標準出力に何か出ると、メールで送られてくる仕組みになっている。 多くの tips では、ハードディスクがいっぱいになるから、これを停止するものが多いが、 停止する前にエラーなどが出ていないか、確認したいものである。

ところが、デフォルトではメールクライアントは入っていない。 単なる cron の結果を読むだけなのだが、何が良いのだろうか。 スプールを直接読み書きするという方法もあると思うが…

とりあえず、 mailx

シンプルなものはよくわからないが、mailx というものがあるのでこれを入れてみる。

$ sudo yum install mailx
読み込んだプラグイン:priorities, update-motd, upgrade-helper
依存性の解決をしています
--> トランザクションの確認を実行しています。
---> パッケージ mailx.x86_64 0:12.4-8.8.amzn1 を インストール
--> 依存性解決を終了しました。

依存性を解決しました

============================================================================================
 Package           アーキテクチャー   バージョン                  リポジトリー           容量
============================================================================================
インストール中:
 mailx             x86_64             12.4-8.8.amzn1              amzn-main            254 k

トランザクションの要約
============================================================================================
インストール  1 パッケージ

総ダウンロード容量: 254 k
インストール容量: 451 k
Is this ok [y/d/N]: 

消費される容量は 451k である。

早速 cron の実行結果が何なのか見てみる

....
make: pipenv: Command not found
...

なんと、pipenv が見つからずに失敗していた。

メールのヘッダーの下の方に何か書いてある。

X-Cron-Env: <LANG=en_US.UTF-8>
X-Cron-Env: <SHELL=/bin/sh>
X-Cron-Env: <HOME=/home/ec2-user>
X-Cron-Env: <PATH=/usr/bin:/bin>
X-Cron-Env: <LOGNAME=ec2-user>
X-Cron-Env: <USER=ec2-user>
Status: RO

環境変数がずらりと並んでいる。 おや? PATHが、/usr/bin:/bin だけしか設定されていない。これが原因だ。

…というわけで、イケてる mail クライアントだと、ヘッダーの部分は隠してしまうかもしれないが、 mailx だと cron のデバッグにちょうどいいのである。ただし、操作方法が分かりにくいのが難点である。

gcloud 系コマンド(gsutil bqなど)で、 which no python in ... というエラー

背景

gcloud 系のコマンドを実行すると、

which: no python2 in 自分のパス

というエラーが出る。

ただエラーが出るからと言って実行できないわけではなく、 python2.7 とか、python2 とかはパス上にあるからか、結局それを見つけに行って実行される。 ちなみに、たんなる python は python2.7 である。

その探しに行く時間(数秒)と上記のエラーメッセージがうっとうしい。

とりあえずの解決策

とりあえずシンボリックリンク

sudo ln -s /usr/bin/python2.7 /usr/bin/python2

これまでやったこと

  • gcloud config の確認
  • alias

luigi で resources を使ってリソース毎に同時実行数を制限する

背景

luigi は同時 workers オプションで同時 worker 数を制限できるが、CPU的には同時に16走っても問題ないが、DB的には一つしか接続したくない、という要望がある。

対応 resources を使う。

リソース毎の worker 数の設定と、それぞれのタスクがどれほどリソースを使うかをコードとして書く必要がある。

リソースごとの worker 数の設定

以下のリンクを読む。

Configuration — Luigi 2.7.5 documentation

ここで書いてない重要なことは2つある。

  • 例にある、hive や mysql は組み込み(ビルトイン)でもなんでもない。

例に書いてあるから、ここだけを設定すれば、hive での同時実行される worker 数が制限されると思ったら大間違い。 postgres など、書いていないリソースも同じ。これはあくまで例であって、実装しているタスクのコードに直接書かなくては効果がない。

  • リソース名は、自分で勝手に作っていい。

mysql のDBが2台あって、それぞれのリソース的には1接続づつしたい場合は、mysql1 と mysql2 として、

[resources]
mysql1=1
mysql2=1

と設定する。ただし先に述べたが、タスクのコードに書かないと全く意味はない。

タスクごとのリソースの設定

以下を読む。

http://luigi.readthedocs.io/en/stable/api/luigi.task.html#luigi.task.Task.resources

静的に、

  resources = {'mysql1': 1}

と書いてもいいし、@property を使って、動的に設定してもいい。

http://luigi.readthedocs.io/en/stable/luigi_patterns.html#avoiding-concurrent-writes-to-a-single-file

他にも設定したほうがいいこと

一つのクライアントでスケジューラーを使っている時はいいのだが、別のクライアントからスケジューラーにタスクが入って来てリソースの制限にぶつかると、 次のような状態になってペンディングタスクの実行をあきらめ止まってしまう。

   was not granted run permission by the scheduler

そんなことを防ぐためには、

[core]
worker_keep_alive=True

もしくは

[worker]
keep_alive=True

に設定しておく。

上記の二つが同じことはドキュメントでは確認できないが、下記のコードで確認できる。

https://github.com/spotify/luigi/blob/4af1d2236825375cdd064ce5d1c2a34a8b1414c0/luigi/worker.py#L357-L358

その他の参考

http://luigi.readthedocs.io/en/stable/luigi_patterns.html#tasks-that-regularly-overwrite-the-same-data-source

s3上のgzip圧縮されたファイルの中身をイテレータで取得する

背景

s3上のgzip圧縮されたファイルの中身をイテレータで取得する方法がなかなか見つからない。

コード

import boto3
import tempfile
import gzip
import datetime as dt
from typing import Iterator


def s3_gz_cat(bucket: str, prefix='') -> Iterator[bytes]:
    s3 = boto3.resource('s3')
    s3_bucket = s3.Bucket(bucket)
    for s3_obj_sum in s3_bucket.objects.filter(Prefix=prefix):
        # s3_obj_sum.key がファイル名(path)なので、末尾が .gz 判定とかしたい場合はここで。                                                                                                                                                                   
        with tempfile.TemporaryFile() as temp:
            s3_obj = s3_obj_sum.Object()
            s3_obj.download_fileobj(temp)
            temp.seek(0)
            with gzip.GzipFile(fileobj=temp) as gz:
                for b in gz:
                    yield b


# 以下は使い方

if __name__ == '__main__':
    # my_bucket 内全て
    for b in s3_gz_cat(bucket='my_bucket'):
        print(b)

    # my_dir 内に限定
    for b in s3_gz_cat(bucket='my_bucket', prefix='my_dir'):
        print(b)

IOバッファでやる方法もあるみたいだが、バッファが溢れたときどうなるのか(詰まったりする?)心配なので、tempfile でやったほうがいいような気がする。

BigTable は東京リージョンでも使える!

背景

bigtable が東京リージョンでは使えないみたいな話が古い記事で見つかったりする。

GCPで東京リージョンにて使えるサービス&使えないサービス一覧 | apps-gcp.com

本当は

使えるようになっている!!!

https://groups.google.com/forum/#!topic/google-cloud-bigtable-discuss/vxVAifl5jUM

下のページが日本語で表示される場合、Webページのフッターの言語をEnglishにすると真実が見える!

cloud.google.com

日本語でみると、まだサポートされていないことになっている!!!(2018/04/17)

Googleさん、何とかしてください!!!

修正されていました!!! (2018/06/21現在)