AWS IoTのBasic ingestのテストと仕様を確認

AWS IoTのBasic ingestのテストと仕様を確認:


はじめに

AWS IoT CoreにBasic Ingestの機能がリリースされていました。

データ送信だけをしたいユースケースで使うと、Message brokerに接続しないでルールエンジンを発火できるようです。

メリットは飛ばした、メッセージブローカーのコストが掛からないということで、データ送信のみを考えるユースケースで便利かと思います。


使い方

topicの先頭に "$aws/rules/{実行したいrule名}を追加するだけです。

ルールエンジンの記述は、 $aws/rules/{rule名} を省略できます。

つまり、 data/{device_name}でデータを送信していた場合、

  • 送信側のtopic
$aws/rules/{rule_name}/data/{device_name} 
  • ルールエンジンのSQL句の書き方
data/# 
でよく、Device側でおくるtopic名のupdateができれば、AWS IoT側の設定変更不要です。

Basic Ingestトピックの最初の3つのレベル($ aws / rules / rule-name)は、トピックの8セグメント長制限または256文字合計制限の方にカウントされません。

また存在しないルール名をtopicに指定すると、RuleNotFoundのmetricが増える。

publicな開発ドキュメントはこちら


テスト


Lambdaの設置

Basic Ingestが来たら、受けたメッセージを打ち返すLambdaを用意します。打ち返すtopicは"basic_ingest/got"とします。

roleはLambdabasicとAWS IoTへpublishできるroleが必要です。

just_publish.py
from __future__ import print_function 
import json 
import boto3 
 
iot = boto3.client('iot-data') 
 
def lambda_handler(event, context): 
  try: 
    iot.publish( 
        topic = "basic_ingest/got", 
        qos = 0, 
        payload = json.dumps(event) 
    ) 
  except Exception as e: 
    print("exception") 
    print("e.message") 
 
  return 
 


AWS IoT Ruleエンジン

以下のように設定しました。


スクリーンショット 2018-11-09 20.54.28.png



テストクライアント

classで隠蔽したPython Device SDKは省略しています。詳細はこちらを参照してください。

やっていることは単純に $aws/rules/basic_ingest(上で設定したルール名)/data/引数で指定したデバイス名

に無限ループでデータをpublishしています。

また、Shadowと同期していますが今回はあまり関係ないので無視してください。

test_client.py
import sys 
import time 
import logging 
import argparse 
import IoT_common as IoT 
import dependency as dc 
 
 
BASIC_INGEST = "$aws/rules/basic_ingest/data/" 
#------------------------------------------------------------------------------- 
def init_log(): 
    logger = logging.getLogger() 
    handler = logging.StreamHandler(sys.stdout) 
    logger.addHandler(handler) 
    logger.setLevel(logging.WARN) 
    logging.basicConfig() 
 
    return logger 
#------------------------------------------------------------------------------- 
def device_main(): 
    try: 
        logger = init_log() 
        logger.info("start") 
        parser = argparse.ArgumentParser() 
        init_info = dc.arg_check(parser) 
        device_name = init_info['device_name'] 
        iot = IoT.IoT_common(device_name, 
                             init_info['endpoint'], 
                             init_info['certs']) 
        iot.create_shadow() 
        status_str = dc.set_status(iot.WAIT_TIME) 
        iot.updateShadow(status_str) 
        iot.create_mqtt() 
        topic = BASIC_INGEST + device_name 
        while True: 
            payload = dc.create_data(device_name) 
            logger.debug("payload:{}".format(payload)) 
            iot.PublishMsg(topic, payload) 
            time.sleep(iot.WAIT_TIME) 
    except Exception as e: 
        logger.error("Error main()") 
        logger.error("error msg:\n{}".format(e.message)) 
 
if __name__ == '__main__': 
    device_main() 
 
でこちらのクライアントを起動します。


テスト確認

AWS IoTのテスト画面から # をsubscribeしてみます。

Basic ingestはルールエンジンに直接つながっているので、subscribeには表示されず、Lambdaが打ち返したメッセージのみが表示されています。


スクリーンショット 2018-11-09 21.04.40.png


ということで、メッセージブローカーを通らないことでテストクライアントのメッセージはsubscriberで受信できずに、しかし、ルールエンジンは発火していることがわかりました。


まとめ

ということで、deviceが使うtopicの変更のみでこの機能が使えることがわかりました。注意点としてはpolicyの運用を厳格にやっている方は、Resource管理でこのtopicを許すことをお忘れなく、といったあたりになります。

また、メッセージpublishをsubscriberにメッセージを届けたい場合は、本機能ではなく従来のtopicを使ったpub/subのメッセージブローカー経由で使う必要があります。


免責

本投稿は、個人の意見で、所属する企業や団体は関係ありません。

また掲載しているsampleプログラムの動作に関しても保障いたしませんので、参考程度にしてください。

コメント

このブログの人気の投稿

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2020-12-01 09:41:49 RSSフィード2020-12-01 09:00 分まとめ(69件)