AI

Rabbitmq+Python Pikaのcallbackで長時間processするときのframework

Web server & client system において、Webブラウザーからサーバーにファイルをアップロードし、Rabbitmq でキューイングされたファイルを Python Pika 経由で次々に読みだして処理し、完了したものから結果をダウンロードできる、というようなよくあるシステムでの話しです。このあたりご存じない方は、このあたりを見てみるとよりイメージが湧くかも知れません:

いきさつ

すでに動作するプログラムがあったのですが、どうやら突貫で作らないといけなかったようで、長大な1ファイルに実装の全てが書かれていました。特に後述する Callback 内でやるべき処理がパイプライン的にいくつもあり、その各処理に対して Pika の connection を渡して heartbeat を定期的に発動し connection が close しないように維持するような作りになっていました。

少し前に書いた記事(ffmpeg-pythonでgevent monkey patchを使わずにshow progressしてみた)もこの各処理のひとつで、gevent monkey patch が周囲に悪影響を及ぼしていたようで、import するタイミングにも大変苦労された痕跡がありました。それで以前の記事のように gevent を使わないで ffmpeg の progress を取得し、かつその progress 値を progress server (progress bar を表示するためのサーバー) に渡しながら heartbeat を打つような構成にしたのでした。(この記事では最終的には打たなくてよい実装になります)

Rabbitmqの仕様

Rabbitmqでは、callback内での処理側が途中で居なくなってしまったことを検出するために、定期的に heartbeat を打つことが必要です。個人的な実験では3分間程度打たないと connection が close しました。暗黙の常識としては、1秒から10秒程度に1回 heartbeat を打つことが多いように見受けられます。

また、callback の処理時間ですが、基本的にはどんなに長くてもよいことになっていますが、Rabbitmq のドキュメント によると、

“If a consumer does not ack its delivery for more than the timeout value (30 minutes by default), its channel will be closed with a PRECONDITION_FAILED channel exception. The error will be logged by the node that the consumer was connected to. The timeout value is configurable in [rabbitmq.conf] (in milliseconds): …”


と書かれており、30分以上処理が続く場合はこの値を数時間に設定する必要があります。

「Callback はできる限り短い処理で抜けてあげる」と教わってきた人間にはなかなか受け入れがたいものがありましたが、callback thread がどこの資源で動いているかくらいは意識しましょう。

そして callback での処理が完了したら delivery_tag を投げます。これによって当該ファイルがキューから抜けて、次のファイルに処理が移ります。

場当たり的な実装からの気付き

まず気付いたのは、各処理モジュールに connection を渡したあとの投げ方が各モジュール任せになっていて heartbeat の打ち方がまちまちになっていたこと。

次にはまったのは、メイン処理(動画を frame by frame で画像処理する)は30フレームに1回(1秒に1回) heartbeat を打つ実装になっていましたが、一部の処理が特殊な映像のときにスラッシングを起こしていたため、heartbeat を打つ間隔が時に恐ろしく長くなってしまい、connection closed が発生したこと。

他にも細かい問題がいくつも発生したため、これからご紹介するようなフレームワークを考えてみました。

フレームワーク

そんなたいそうなものではありませんが、

  • Callback が呼ばれその後の処理のための準備が整ったら、処理スレッドへ開始イベントを送る
  • Callback スレッドは、定期的に heartbeat を打ちながら、完了イベントを待つ
  • 処理スレッドは全ての処理が完了したら、完了イベントを callback スレッドへ送る
  • Callback スレッドは delivery_tag を打ち、終了する

という流れです。実装としてはこんな感じになりました(パラメータは別途定義した config file から読み込む想定です):

import pika
import json
import threading
import functools
from web_config import config, pconfig

DEFAULT_HOST = config['default_host']
DEFAULT_PORT = config['default_port']
QUEUE_NAME = config['queue_name']
USER = config['user']
KEY = config['key']

class WebComm(object):
    def __init__(self):
        self.start_event = threading.Event()
        self.end_event = threading.Event()

        self.thread = threading.Thread(target=self.run)
        self.thread.setDaemon(True)
        self.thread.start()

    def run(self):
        while True:
            credentials = pika.PlainCredentials(USER, KEY)
            parameters = pika.ConnectionParameters(host=DEFAULT_HOST, port=DEFAULT_PORT, credentials=credentials)
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
            self.channel.basic_qos(prefetch_count=1)
            self.channel.queue_declare(queue=QUEUE_NAME)

            self.jds = None

            cb = functools.partial(self.callback, conn=self.connection)
            self.channel.basic_consume(queue=QUEUE_NAME, on_message_callback=cb, auto_ack=False)
            print('Waiting for task dispatch...')

            try:
                self.channel.start_consuming()
            except Exception as e:
                print("Exit consuming loop by exception:", e)
                self.channel.stop_consuming()
            self.connection.close()

    def callback(self, ch, method, properties, body, conn):
        print("Callback start. Received %r" % body.decode())
        self.jds = json.loads(body.decode())

        # bodyのparseなどこの後の処理のための準備
        # …

        self.start_event.set()

        while not self.end_event.wait(10):
            self.conn_msg()

            # 定期的にやりたい処理
            # (ファイルの存在確認など)

        self.end_event.clear()

        self.ack_msg()
        print("Callback end.")

    def conn_msg(self):
        self.conn.process_data_events()

    def ack_msg(self):
        self.ch.basic_ack(self.method.delivery_tag)

if __name__ == '__main__':

    web_comm = WebComm()

    while True:
        web_comm.start_event.wait()
        print("Task dispatched.")
        web_comm.start_event.clear()

        # 処理

        print("Task end.")
        web_comm.end_event.set()

この作りにしておけば、10秒おきに heartbeat が打たれる上に、main 側から conn_msg や ack_msg を呼びたくなった時にも対処できます。元のプログラムがものすごい構成だったので、個人的にはかなりスッキリでした。

さいごに

Rabbitmq+Python Pika で callback を使用する上で、比較的シンプルなフレームワークを作ってみたので、そのご紹介でした。まあまあスッキリ書けたのではないかと思います。

あと、処理が長くなる場合はくれぐれも Rabbitmq の timeout 設定をお忘れなく。でも本当にメイン処理が死んでしまったときのためにも必要なものなので、あまり大きな値にすると別の問題を引き起こす可能性もあるので、ほどほどな値(数時間程度)にしないとですね。


   
関連記事
  • np.concatenate()とnp.random.shuffle()で多次元データのAugmentation

    コメントを残す

    *

    CAPTCHA