応募作品の仕組み解説「MQTTでのPublishとSubscribe」みんなのラズパイコンテスト2017

みんなのラズパイコンテスト2017応募作品の技術的な仕組みの解説です!

しゃべるラズパイとMQTTで館内放送を実現したものの「MQTT実装部分」について詳しく触れます。

システムの全体像はコチラの記事でどうぞ!

 

実行環境

パブリッシャー

放送用のメッセージを送るプログラムで利用。
ラズパイ・テスト用Mac・WEBサーバーを利用しています。
mosquitto_pub や Pythonのpaho-mqttを使って実装しているので、これらが使える環境であれば何でもOKです。

サブスクライバー

放送用のメッセージを受け取って音声に変換して再生(放送)するプログラムで利用。
ラズパイにスピーカーを接続して利用しています。
mosquitto_sub と Pythonのpaho-mqttを使って実装しています。
日本語音声合成には「Open JTalk」を使っています。

ブローカー

MQTTの通信を仲介するプログラム。パブリッククラウド上に設置しています。
HEROKUのアドオン「CloudMQTT」を使用。

他にも様々なブローカーサービスがありますが、詳しくは↓コチラの記事を参照してください。

ブローカーの設定

HEROKUのアカウントで、CloudMQTT(無料版のCute Cat)のアドオンを追加した新しいプロジェクトを作成します。

アドオンが追加されたら、「CloudMQTT」をクリックして「CloudMQTT Console」を開きます。

Instance info」に接続するサーバーの情報が載っているので確認します。
もともとあるUserを使っても良いのですが、念のため新しく専用のUserを作成します。

Users and ACL」というタブを開いて、「Manage Users」にユーザー名とパスワードを入力して「Save」ボタンを押します。

 

ユーザーが作成できたらさらに下の方にスクロールして、「ACLs」の所で「New Rule」を作成します。

先ほど作ったユーザーを選択して、「Topic」に放送用メッセージの配信に使用するトピックを入力します。

Read Access? と Write Access? にチェックを入れて「Save」ボタンを押して完了です。

パブリッシャー(メッセージ送信)

放送用のメッセージを送るプログラム。
mosquitto_pubやpaho-mqttを使って送信します。

mosquitto_pubコマンドで送る場合は次のようになります。

mosquitto_pub \
-h ${HOST} \
-p ${PORT} \
-u ${USER} \
-P ${PASS} \
-t ${TOPIC} \
-r \
-m ${MESG}

ここで、

HOST='CloudMQTTのサーバー'
PORT=ポート
USER='ユーザー'
PASS='パスワード'
TOPIC='トピック'
MESG='メッセージ(詳細は下記参照)'

送信するメッセージの詳細

送信するメッセージにはキーバリュー形式で3つの情報を含めます。

  • area … 放送範囲を指定(all: 全ての端末、grp: 特定のグループに属している端末、dev: 特定の端末)
  • id … グループまたは端末のIDを指定(001〜999)
  • message … 放送したいメッセージ(日本語テキスト)

グループや端末のIDを指定することで、特定の範囲に放送することができます。

  • 例)特定の端末に送る場合
    '{"area":"dev","id":"001","message":"佐藤さん、応接室にお越し下さい。"}'
  • 例)特定のグループに送る場合
    '{"area":"grp","id":"002","message":"鈴木さん、お客様がいらっしゃいました。"}'
  • 例)全端末に送る場合(idは無視されます)
    '{"area":"all","id":"003","message":"みなさん、お昼休みの時間になりました。"}'

なお、放送するメッセージを送った後は、以下のコマンドを送ってサーバー側に保持されたメッセージをクリアしておきます。
(これをしないと新しいサブスクライバーがアクセスした際に前回のメッセージが届いてしまいます。)

mosquitto_pub \
-h ${HOST} \
-p ${PORT} \
-u ${USER} \
-P ${PASS} \
-t ${TOPIC} \
-r \
-n

上記をまとめたシェルスクリプトの例です。
変数は自分の環境にあわせて適宜修正して下さい。

iok-pub.sh

#!/bin/sh

## MQTT broker 
HOST='hogehoge.net'
PORT=xxxxx
USER='user'
PASS='password'
TOPIC='topic'

PATHMQ='/path/to/command'

## Publish message
${PATHMQ}/mosquitto_pub -h ${HOST} -p ${PORT} -u ${USER} -P ${PASS} -t ${TOPIC} -r -m $1

## Clear retained message
${PATHMQ}/mosquitto_pub -h ${HOST} -p ${PORT} -u ${USER} -P ${PASS} -t ${TOPIC} -r -n

実行する場合は次のように引数にarea、id、messeageを含むテキストを与えます。

$ sh iok-pub.sh '{"area":"dev","id":"001","message":"佐藤さん、応接室にお越し下さい。"}'

コントロールパネル

上記の3つの情報を指定して送信するWEBインターフェースです。PHPなどでフォームから取得した値を用いて送信コマンドを実行します。

コマンドの実装によりますが、メッセージからは特殊な文字(半角スペース、各種記号類等)を除外しておきます。

サブスクライバー(メッセージ受信&放送)

テスト用に、受信したメッセージを確認する場合はmosquitto_subコマンドが便利です。

mosquitto_sub \
-h ${HOST} \
-p ${PORT} \
-u ${USER} \
-P ${PASS} \
-t ${TOPIC} \

※パブリッシャーと同じ設定です。
これをラズパイのターミナルで実行しておくとメッセージ待ち状態に入ります。

※実際のプログラムはpaho-mqttを使ったpythonで実行しており、メッセージが届いたら、area、id、messageを確認し、該当する端末であればメッセージを音声に変換して再生します。

メッセージテキストはOpen JTalk(各種設定をまとめてjsayコマンドとして/usr/local/bin/に設置)を使って音声化しています。

以下に、paho-mqttを使ったpythonのプログラム(必要最小限の動作が可能なもの)を示します。

※実際に使用する際はTLSを使うなどセキュリティに気をつける必要があります。

#!/bin/env python3
# coding: utf-8
import paho.mqtt.client as mqtt
import subprocess

### strを辞書型に変換する
import ast

### MQTT brokerへの接続情報
port = xxxxx
host = “hostname"
username = “user_name"
password = “password"
topic = “topic/xxx"

### IoK group and deviceID
iokDevID = '001'
iokGrpID = '002'

def speak(strings):
    print(strings)
    cmd = "/usr/local/bin/jsay " + strings
    ret  =  subprocess.check_output( cmd, shell=True )

def on_connect(client, userdata, flags, respons_code):
    """broker接続時のcallback関数
    """
    print('status {}'.format(respons_code))
    client.subscribe(topic)

def on_message(client, userdata, msg):
    """メッセージ受信時のcallback関数
    """
    payl = msg.payload.decode('utf-8')
    print(payl)
    if len(payl) > 0:
        dic = ast.literal_eval(str(payl))

        ## 対象データかどうかをチェックする
        speak_flag = False

        if 'message' in dic:
            strings =  dic['message']
            if 'area' in dic:
                if dic['area'] =='all':
                    speak_flag = True
                elif 'id' in dic:
                    if dic['area'] == 'grp':
                        if dic['id'] == iokGrpID:
                            speak_flag = True
                    elif dic['area'] == 'dev':
                        if dic['id'] == iokDevID:
                            speak_flag = True
        if speak_flag :
            speak(strings)

if __name__ == '__main__':
    ### インスタンス作成時にprotocol v3.1.1を指定
    client = mqtt.Client(protocol=mqtt.MQTTv311)

    ### パスワード認証
    client.username_pw_set(username, password)

    ### callback function
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(host, port=port, keepalive=60)
    client.loop_forever()

まとめ

MQTTブローカー、パブリッシャー、サブスクライバー、及びメッセージ部分の仕組みを紹介しました。mosquitto_pub、mosquitto_subコマンドを使って簡単に通信ができました!

スペシャルサンクス

Pythonで待ち受けプログラムを書く際に、大変参考になりました!
Mosquitto と paho-mqtt をつかってPythonで MQTTをさわる