ishturk - Qiita です。この記事はAdvent 14日目の記事です。
今回は久々にIoTっぽい話です。
会議室の在室モニターつくってみた
会議室に人がいるか・誰がいるっぽいのかわかるIoTデバイス をつくりました
なぜつくったのか
会議室に人がいるかどうか知りたい。オフィスで仕事していると週に数回感じます。プチストレス。
エンジニアだもの、ストレスを感じたら ついカッとなって つくっちゃう ものでしょう?
要件ぎめ
解決したいのはこんな困りごとです
会議室つかいたいけど、誰か中にいそう...
会議予約の時間だけど前の打ち合わせ延びてる?VIPだったら...
(そーっと様子を伺う)
いないんかーい!
弊社オフィスの会議室はとてもおしゃれなので、目線の高さがすりガラスになっていて中がみえないようになっています。
足下の高さは透けてるので、下から覗いて確認したりするんですけど、あまり気持ちの良い光景ではないですね...
room
会議室にはAkerunがついていて、常に施錠されています。
以下のようなことを実現しようと思います
会議室に誰かいるのか、外からわかる → 人感センサが必要
最後に会議室に入室したのが誰かわかる → AkerunのAPI で取得できる!
デバイス 選定・技術選定
人がいるかをセンシングするデバイス
まずはどうやって在室を検知するのか検討します。
お手軽導入(安価・調達しやすい・簡単に設置)
検出精度そこそこ
プライバシーに配慮できる
という条件で調査してみます
赤外線センサー
焦電センサーともよばれ、人感センサとして広く流通。人感センサ照明などで使われている
人から発する赤外線(体温によるもの)を検知して、動きを判別する
熱源の動きであれば、人以外でも反応してしまう
良さそうです。これを基準に他と比べて行こうと思います。
ミリ波レーダーセンサー
ミリ波(電波)を使って、物体の動きを検知する
精度が高く、心拍などまで判別できる
高コスト、設置環境にあわせた設計・工事が必要
お手軽ではないのでNG
超音波センサー
超音波で物体との距離を測定できる。測距センサーや障害物検知デバイス として広く流通。
周囲の反響やノイズに影響されやすい
空間に人がいるか、を判断するには配置・判定ロジックが複雑になりそうなので、赤外線センサーに軍配
TOFセンサー
赤外線やレーザーで物体との距離を測定できる。顔認証等で利用されているらしい
高精度
高価、インダストリアルユースで利用されるため入手性が良くない
カメラ
シンプル
ナレッジが豊富
画像処理・演算でエッジ端末の性能が必要
人感検知用途ではオーバースペックなので、赤外線センサーに軍配
赤外線センサーに決まりました
構成
室内にセンサーを配置
在室状態は会議室外で確認したいので、会議室入口のモニターに表示
会議室の入口は複数かもしれないので、モニターは複数箇所に設置できるように
ということを考えました。 以下のようになりました。
モニターは作る手間を惜しんでSBC+ディスプレイにしましたが、マイコン +電子ペーパー とかのほうが安価になるかもしれません。
outline
無線通信
採用する無線についても、なるべくシンプルにしたかったのと、知見があるBluetooth Low Energy(BLE)を採用しました。
BLE のあれこれ
ロール
BLEでは、振る舞いによってロールという定義があります。いわゆるマスター/スレーブのような関係性です。
CentralとPeripheralの2つがあり、相互に接続したうえで通信します。
Centralが接続要求→Peripheralが受諾することで接続が確立されます。
接続後はGATT という規則に則ってデータのやり取りされます。
Connection型と違い、接続することなく、Broadcasterが一方的にデータを発信します。Observer側からデータを送ることはできません。
どちらの型も、advertise/scan の仕組みは共通で、peripheral から特定フォーマットのパケットを送信します。このデータは接続を要せず、どんなデバイス でも受信・読み込むことができます。
Connection型のほうが、セキュアに大容量のデータのやり取りが可能です。
一方でBroadcaster型は同時に多数のデバイス に情報を伝えることができ、接続処理の実装が不要です。
在室モニターでは
情報量は小さいがリアルタイムに知りたい
在室状況は複数のモニターで同時にアクセスしたい
セキュアな情報ではない
ということから、Broadcaster型 を採用します。
データ構造
advertise パケットのデータ構造です。 AD Type で規定されたデータを含めることができます。 AD TypeはBluetoothSIGで決められています。
図は Nordicのサイト から拝借
自由にデータを設定できるTypeとして Manufacture Specific Data があります(iBeaconもこのフィールドを使ってます)。 今回はこのTypeを使ってAdvertiseにデータを埋め込んでいきたいと思います。
AdvData は全体で31Byteまでの制約があるので、埋め込むデータもなるべく小さいサイズにします。bit arrayで以下のように決めました。
直近1分以内に検知したら 0b00000001
直近5分以内になら 0b00000010
直近10分以内になら 0b00000100
直近20分以内になら 0b00001000
直近30分以内になら 0b00010000
直近40分以内になら 0b00100000
直近50分以内になら 0b01000000
直近60分以内になら 0b10000000
これで、最後に検知した時間がおおよそわかります
実装
センサーの配線図です。とてもシンプル。
赤外線センサーは パナソニック 製のPaPIRs を採用しました。
センサーデバイス 制御のソースコード です。 Raspberry Pi PICO W を使用したので MicroPythonになりました。
ChatGPTに書かせました。
はじめはaioble を使用したコードがサジェストされたのですが、どうもadvertiseを動的に変更することが考慮されてないようだったので、low-level Bluetooth を直接参照した実装に変えました。
import tkinter as tk
import asyncio
from datetime import datetime, timedelta
from ble_scanner import BLEScanner
import threading
from akerun_log import get_latest_user_history, get_user_access_details
ORGANIZATION_ID = "O-xxxxxx-xxxxxx"
AKERUN_DEVICE_ID = "Axxxxxxxxxx"
class RoomStatusApp :
def __init__ (self, root):
self.root = root
self.root.title("部屋の状況モニター" )
self.root.geometry("1600x1200" )
self.root.configure(bg="#2c3e50" )
self.title_label = tk.Label(
root, text="部屋の状況" , font=("Arial" , 40 , "bold" ), bg="#2c3e50" , fg="#ecf0f1"
)
self.title_label.pack(pady=10 )
self.status_label = tk.Label(
root, text="部屋にいる: 不明" , font=("Arial" , 32 ), bg="#2c3e50" , fg="#bdc3c7"
)
self.status_label.pack(pady=10 )
self.time_label = tk.Label(
root, text="最終検知: -- 分前" , font=("Arial" , 28 ), bg="#2c3e50" , fg="#bdc3c7"
)
self.time_label.pack(pady=10 )
self.entry_label = tk.Label(
root,
text="RSSI: -- (時刻: --)" ,
font=("Arial" , 28 ),
bg="#2c3e50" ,
fg="#bdc3c7" ,
)
self.entry_label.pack(pady=10 )
self.last_user_label = tk.Label(
root,
text="最終入退室者: --" ,
font=("Arial" , 28 ),
bg="#2c3e50" ,
fg="#ecf0f1"
)
self.last_user_label.pack(pady=10 )
self.ble_scanner = BLEScanner(target_name="detector-00" )
self.running = True
self.thread = threading.Thread(target=self.run_async_loop, daemon=True )
self.thread.start()
self.update_status()
def run_async_loop (self):
"""バックグラウンドスレッドでBLEスキャンを実行"""
asyncio.run(self.start_ble_scan())
async def start_ble_scan (self):
"""BLEスキャナーを開始"""
await self.ble_scanner.start_scan()
while self.running:
await asyncio.sleep(10 )
def update_status (self):
"""スキャン結果に基づいてUIを更新"""
scan_result = self.ble_scanner.get_detection_time_and_rssi()
if scan_result:
detection_time = scan_result["detection_time" ]
rssi = scan_result["rssi" ]
print (scan_result)
if detection_time > 0 and detection_time < 5 :
self.status_label.config(text="部屋にいる: はい" , fg="#2ecc71" )
else :
self.status_label.config(text="部屋にいる: いいえ" , fg="#e74c3c" )
last_detected = datetime.now() - timedelta(minutes=detection_time)
timestamp = last_detected.strftime("%H:%M" )
self.time_label.config(text=f"最終検知: {detection_time} 分前" )
self.entry_label.config(text=f"RSSI: {rssi} (時刻: {timestamp})" )
else :
self.status_label.config(text="部屋にいる: 不明" , fg="#bdc3c7" )
self.time_label.config(text="最終検知: -- 分前" )
self.entry_label.config(text="RSSI: -- (時刻: --)" )
self.update_last_user_info()
self.root.after(5000 , self.update_status)
def update_last_user_info (self):
"""最終入退室者を更新"""
try :
latest_history = get_latest_user_history(ORGANIZATION_ID, AKERUN_DEVICE_ID)
user_access_details = get_user_access_details(latest_history)
self.last_user_label.config(
text=f"最終入退室者: {user_access_details['user_name']} (時刻: {user_access_details['accessed_at']})"
)
except Exception as e:
self.last_user_label.config(text="最終入退室者: エラー発生" )
print ("Error getting user access details:" , str (e))
def stop_ble_scan (self):
"""スキャンの停止"""
self.running = False
self.ble_scanner.stop_scan()
if __name__ == "__main__" :
root = tk.Tk()
app = RoomStatusApp(root)
try :
root.mainloop()
finally :
app.stop_ble_scan()
ble_scanner.py
import asyncio
from bluepy.btle import Scanner, DefaultDelegate
import time
import struct
class ScanDelegate (DefaultDelegate):
def __init__ (self, target_name="detect-00" ):
super ().__init__()
self.target_name = target_name
self.scan_results = []
def handleDiscovery (self, dev, isNewDev, isNewData):
if isNewDev or isNewData:
name = dev.getValueText(9 )
if name == self.target_name:
manuf_data = dev.getValueText(255 )
if manuf_data is not None :
manuf_data_bytes = bytes .fromhex(manuf_data.replace(" " , "" ))
sensor_data = manuf_data_bytes[2 :]
if len (sensor_data) > 0 :
detection_raw = int .from_bytes(sensor_data, byteorder='little' )
if detection_raw == 0 :
detection_time = 0
elif detection_raw == 0b1 :
detection_time = 1
elif detection_raw <= 0b10 :
detection_time = 5
elif detection_raw <= 0b100 :
detection_time = 10
elif detection_raw <= 0b1000 :
detection_time = 20
elif detection_raw <= 0b10000 :
detection_time = 30
elif detection_raw <= 0b100000 :
detection_time = 40
elif detection_raw <= 0b1000000 :
detection_time = 50
elif detection_raw <= 0b10000000 :
detection_time = 60
else :
detection_time = 0
self.scan_results.append({
'address' : dev.addr,
'name' : name,
'rssi' : dev.rssi,
'sensor_status' : detection_raw,
'detection_time' : detection_time,
'raw_data' : manuf_data_bytes
})
class BLEScanner :
def __init__ (self, target_name="detector-00" ):
self.target_name = target_name
self.scanner = Scanner().withDelegate(ScanDelegate(self.target_name))
self.delegate = ScanDelegate(self.target_name)
self.scanner.delegate = self.delegate
self.scanning = False
self.cache = None
async def _scan_background (self):
"""バックグラウンドでスキャンを5秒ごとに実行し、結果をキャッシュします。"""
while self.scanning:
self.delegate.scan_results.clear()
await asyncio.to_thread(self.scanner.scan, 5.0 )
self.cache = self.delegate.scan_results[-1 ] if self.delegate.scan_results else None
print ("Scan complete, cache updated." )
await asyncio.sleep(5 )
async def start_scan (self):
"""バックグラウンドでスキャンを開始します。"""
if not self.scanning:
print ("Starting BLE scan..." )
self.scanning = True
asyncio.create_task(self._scan_background())
def stop_scan (self):
"""スキャンを停止します。"""
self.scanning = False
print ("Stopping scanner." )
def get_detection_time_and_rssi (self):
"""キャッシュされた結果から、最新の検出時間とRSSIを返します。"""
if self.cache:
return {
"detection_time" : self.cache['detection_time' ],
"rssi" : self.cache['rssi' ]
}
else :
return None
akerun_log.py
import os
import requests
from dotenv import load_dotenv
load_dotenv()
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN" )
if not ACCESS_TOKEN:
raise ValueError ("ACCESS_TOKEN is not set in the .env file" )
API_BASE_URL = "https://api.akerun.com/v3"
def get_latest_user_history (organization_id, akerun_device_id):
"""
特定のAkerunデバイスの最後のユーザーアクセス履歴を取得
:param organization_id: 組織ID
:param akerun_device_id: AkerunデバイスID
:return: 最後のユーザーアクセス履歴 (userがNULLのものは無視)
"""
url = f"{API_BASE_URL}/organizations/{organization_id}/accesses"
headers = {
"Authorization" : f"Bearer {ACCESS_TOKEN}" ,
}
params = {
"akerun_ids[]" : [akerun_device_id],
"limit" : 100
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200 :
raise Exception (f"Failed to fetch data: {response.status_code}, {response.text}" )
data = response.json()
accesses = data.get("accesses" , [])
valid_accesses = [access for access in accesses if access.get("user" ) is not None ]
return valid_accesses[0 ] if valid_accesses else None
def get_user_access_details (access_data):
"""
ユーザーアクセス履歴の詳細(ユーザー名とアクセス日時)を取得
:param access_data: アクセス履歴データ
:return: ユーザー名とアクセス日時
"""
if access_data:
user_name = access_data['user' ].get('name' , 'Unknown User' )
accessed_at = access_data.get('accessed_at' , 'Unknown Time' )
return {"user_name" : user_name, "accessed_at" : accessed_at}
return {"user_name" : "No valid access" , "accessed_at" : "No valid time" }
モニター側のソースコード です。こちらもChatGPTに書かせました。RaspberryPi上で簡易なGUI を実現で指定したところ、tkinter がサジェストされました。
AkerunのAPI 仕様は(developersサイト)https://developers.akerun.com/#introduction で公開されています。今回は履歴一覧のAPI を利用しています。
import tkinter as tk
import asyncio
from datetime import datetime, timedelta
from ble_scanner import BLEScanner
import threading
from akerun_log import get_latest_user_history, get_user_access_details
ORGANIZATION_ID = "O-111111-111111"
AKERUN_DEVICE_ID = "A12345678"
class RoomStatusApp :
def __init__ (self, root):
self.root = root
self.root.title("部屋の状況モニター" )
self.root.geometry("1600x1200" )
self.root.configure(bg="#2c3e50" )
self.title_label = tk.Label(
root, text="部屋の状況" , font=("Arial" , 40 , "bold" ), bg="#2c3e50" , fg="#ecf0f1"
)
self.title_label.pack(pady=10 )
self.status_label = tk.Label(
root, text="部屋にいる: 不明" , font=("Arial" , 32 ), bg="#2c3e50" , fg="#bdc3c7"
)
self.status_label.pack(pady=10 )
self.time_label = tk.Label(
root, text="最終検知: -- 分前" , font=("Arial" , 28 ), bg="#2c3e50" , fg="#bdc3c7"
)
self.time_label.pack(pady=10 )
self.entry_label = tk.Label(
root,
text="RSSI: -- (時刻: --)" ,
font=("Arial" , 28 ),
bg="#2c3e50" ,
fg="#bdc3c7" ,
)
self.entry_label.pack(pady=10 )
self.last_user_label = tk.Label(
root,
text="最終入退室者: --" ,
font=("Arial" , 28 ),
bg="#2c3e50" ,
fg="#ecf0f1"
)
self.last_user_label.pack(pady=10 )
self.ble_scanner = BLEScanner(target_name="detector-00" )
self.running = True
self.thread = threading.Thread(target=self.run_async_loop, daemon=True )
self.thread.start()
self.update_status()
def run_async_loop (self):
"""バックグラウンドスレッドでBLEスキャンを実行"""
asyncio.run(self.start_ble_scan())
async def start_ble_scan (self):
"""BLEスキャナーを開始"""
await self.ble_scanner.start_scan()
while self.running:
await asyncio.sleep(10 )
def update_status (self):
"""スキャン結果に基づいてUIを更新"""
scan_result = self.ble_scanner.get_detection_time_and_rssi()
if scan_result:
detection_time = scan_result["detection_time" ]
rssi = scan_result["rssi" ]
print (scan_result)
if detection_time > 0 and detection_time < 5 :
self.status_label.config(text="部屋にいる: はい" , fg="#2ecc71" )
else :
self.status_label.config(text="部屋にいる: いいえ" , fg="#e74c3c" )
last_detected = datetime.now() - timedelta(minutes=detection_time)
timestamp = last_detected.strftime("%H:%M" )
self.time_label.config(text=f"最終検知: {detection_time} 分前" )
self.entry_label.config(text=f"RSSI: {rssi} (時刻: {timestamp})" )
else :
self.status_label.config(text="部屋にいる: 不明" , fg="#bdc3c7" )
self.time_label.config(text="最終検知: -- 分前" )
self.entry_label.config(text="RSSI: -- (時刻: --)" )
self.update_last_user_info()
self.root.after(5000 , self.update_status)
def update_last_user_info (self):
"""最終入退室者を更新"""
try :
latest_history = get_latest_user_history(ORGANIZATION_ID, AKERUN_DEVICE_ID)
user_access_details = get_user_access_details(latest_history)
self.last_user_label.config(
text=f"最終入退室者: {user_access_details['user_name']} (時刻: {user_access_details['accessed_at']})"
)
except Exception as e:
self.last_user_label.config(text="最終入退室者: エラー発生" )
print ("Error getting user access details:" , str (e))
def stop_ble_scan (self):
"""スキャンの停止"""
self.running = False
self.ble_scanner.stop_scan()
if __name__ == "__main__" :
root = tk.Tk()
app = RoomStatusApp(root)
try :
root.mainloop()
finally :
app.stop_ble_scan()
トライアル
会議室のモニターに設置してみました。
こいつ...動くぞ...!
実際に導入するなら...
トライアルの結果、期待通りの振る舞いをすることがわかりました。
今回はついカッとなって サクッとつくりましたが、実際に導入するとなると、課題があります
ブレッドボードではなく、基板実装(PCBA)にして筐体に格納しないと、物理接触 ・衝撃・静電気等で停止・故障する
モニターに公開API の認証情報・入退室履歴が残るので、セキュアな管理・設置ができるように
長期安定動作するか検証
また、そもそも会議室が時間通り空くように、予約時間終了前に利用者に知らせるようなソリューションもほしいよね? という声もありました。それはそう!
おわり
今回のプロジェクトは、以下の裏の目的がありました。
製品開発ではこんなことできないので、趣味 プロトタイピングとして実施しました。
API ・ライブラリ仕様が公開されているものは、そのリファレンスをChatGPTに渡すと内容を解釈して実装してくれました。便利。
エンジニアはサクッと動くものをつくるのは得意です。ですが、好きなようにつくると好み・スキル・経験に基づいて作ってしまうことも多々。
ソリューションとして開発するなら、要求の洗い出し・要件定義・技術選定を丁寧にすすめることがとても大事。
株式会社フォトシンスでは、一緒にプロダクトを成長させる様々なレイヤのエンジニアを募集しています。
photosynth.co.jp
Akerunにご興味のある方はこちらから
akerun.com