フォトシンス エンジニアブログ

株式会社Photosynth のテックブログです

エンジニアのエゴを全面にだして会議室の在室確認モニターをつくってみた

ishturk - Qiita です。この記事はAdvent 14日目の記事です。

今回は久々にIoTっぽい話です。

会議室の在室モニターつくってみた

会議室に人がいるか・誰がいるっぽいのかわかるIoTデバイスをつくりました

なぜつくったのか

会議室に人がいるかどうか知りたい。オフィスで仕事していると週に数回感じます。プチストレス。

エンジニアだもの、ストレスを感じたら ついカッとなってつくっちゃう ものでしょう?

要件ぎめ

解決したいのはこんな困りごとです

会議室つかいたいけど、誰か中にいそう... 会議予約の時間だけど前の打ち合わせ延びてる?VIPだったら...

(そーっと様子を伺う)

いないんかーい!

弊社オフィスの会議室はとてもおしゃれなので、目線の高さがすりガラスになっていて中がみえないようになっています。 足下の高さは透けてるので、下から覗いて確認したりするんですけど、あまり気持ちの良い光景ではないですね...

room

会議室にはAkerunがついていて、常に施錠されています。

以下のようなことを実現しようと思います

  • 会議室に誰かいるのか、外からわかる → 人感センサが必要
  • 最後に会議室に入室したのが誰かわかる → AkerunのAPIで取得できる!

バイス選定・技術選定

人がいるかをセンシングするデバイス

まずはどうやって在室を検知するのか検討します。

  • お手軽導入(安価・調達しやすい・簡単に設置)
  • 検出精度そこそこ
  • プライバシーに配慮できる

という条件で調査してみます

赤外線センサー

  • 焦電センサーともよばれ、人感センサとして広く流通。人感センサ照明などで使われている
  • 人から発する赤外線(体温によるもの)を検知して、動きを判別する
  • 熱源の動きであれば、人以外でも反応してしまう

良さそうです。これを基準に他と比べて行こうと思います。

ミリ波レーダーセンサー

  • ミリ波(電波)を使って、物体の動きを検知する
  • 精度が高く、心拍などまで判別できる
  • 高コスト、設置環境にあわせた設計・工事が必要

お手軽ではないのでNG

超音波センサー

  • 超音波で物体との距離を測定できる。測距センサーや障害物検知デバイスとして広く流通。
  • 周囲の反響やノイズに影響されやすい

空間に人がいるか、を判断するには配置・判定ロジックが複雑になりそうなので、赤外線センサーに軍配

TOFセンサー

  • 赤外線やレーザーで物体との距離を測定できる。顔認証等で利用されているらしい
  • 高精度
  • 高価、インダストリアルユースで利用されるため入手性が良くない

カメラ

  • シンプル
  • ナレッジが豊富
  • 画像処理・演算でエッジ端末の性能が必要

人感検知用途ではオーバースペックなので、赤外線センサーに軍配

赤外線センサーに決まりました

構成

  • 室内にセンサーを配置
  • 在室状態は会議室外で確認したいので、会議室入口のモニターに表示
  • 会議室の入口は複数かもしれないので、モニターは複数箇所に設置できるように

ということを考えました。 以下のようになりました。 モニターは作る手間を惜しんでSBC+ディスプレイにしましたが、マイコン+電子ペーパーとかのほうが安価になるかもしれません。

outline

無線通信

採用する無線についても、なるべくシンプルにしたかったのと、知見があるBluetooth Low Energy(BLE)を採用しました。

BLE のあれこれ

ロール

BLEでは、振る舞いによってロールという定義があります。いわゆるマスター/スレーブのような関係性です。

  • Connection型

CentralとPeripheralの2つがあり、相互に接続したうえで通信します。 Centralが接続要求→Peripheralが受諾することで接続が確立されます。 接続後はGATTという規則に則ってデータのやり取りされます。

  • Broadcaster型

Connection型と違い、接続することなく、Broadcasterが一方的にデータを発信します。Observer側からデータを送ることはできません。

どちらの型も、advertise/scan の仕組みは共通で、peripheral から特定フォーマットのパケットを送信します。このデータは接続を要せず、どんなデバイスでも受信・読み込むことができます。

Connection型のほうが、セキュアに大容量のデータのやり取りが可能です。 一方でBroadcaster型は同時に多数のデバイスに情報を伝えることができ、接続処理の実装が不要です。

在室モニターでは

  • 情報量は小さいがリアルタイムに知りたい
  • 在室状況は複数のモニターで同時にアクセスしたい
  • セキュアな情報ではない

ということから、Broadcaster型 を採用します。

データ構造

advertise パケットのデータ構造です。 AD Type で規定されたデータを含めることができます。 AD TypeはBluetoothSIGで決められています。

図は Nordicのサイト から拝借

自由にデータを設定できるTypeとして Manufacture Specific Data があります(iBeaconもこのフィールドを使ってます)。 今回はこのTypeを使ってAdvertiseにデータを埋め込んでいきたいと思います。

AdvData は全体で31Byteまでの制約があるので、埋め込むデータもなるべく小さいサイズにします。bit arrayで以下のように決めました。

  • 直近1分以内に検知したら 0b00000001
  • 直近5分以内になら 0b00000010
  • 直近10分以内になら 0b00000100
  • 直近20分以内になら 0b00001000
  • 直近30分以内になら 0b00010000
  • 直近40分以内になら 0b00100000
  • 直近50分以内になら 0b01000000
  • 直近60分以内になら 0b10000000

これで、最後に検知した時間がおおよそわかります

実装

センサーの配線図です。とてもシンプル。 赤外線センサーは パナソニック製のPaPIRsを採用しました。

センサーデバイス制御のソースコードです。 Raspberry Pi PICO W を使用したので MicroPythonになりました。 ChatGPTに書かせました。

はじめはaiobleを使用したコードがサジェストされたのですが、どうもadvertiseを動的に変更することが考慮されてないようだったので、low-level Bluetoothを直接参照した実装に変えました。

import tkinter as tk
import asyncio
from datetime import datetime, timedelta
from ble_scanner import BLEScanner
import threading
from akerun_log import get_latest_user_history, get_user_access_details

# 組織IDとデバイスIDを指定
ORGANIZATION_ID = "O-xxxxxx-xxxxxx"  # 必要に応じて変更してください
AKERUN_DEVICE_ID = "Axxxxxxxxxx"  # 必要に応じて変更してください

class RoomStatusApp:
    def __init__(self, root):
        self.root = root
        self.root.title("部屋の状況モニター")
        self.root.geometry("1600x1200")
        self.root.configure(bg="#2c3e50")  # ダークグレーの背景

        # UIの初期化
        self.title_label = tk.Label(
            root, text="部屋の状況", font=("Arial", 40, "bold"), bg="#2c3e50", fg="#ecf0f1"
        )
        self.title_label.pack(pady=10)

        self.status_label = tk.Label(
            root, text="部屋にいる: 不明", font=("Arial", 32), bg="#2c3e50", fg="#bdc3c7"
        )
        self.status_label.pack(pady=10)

        self.time_label = tk.Label(
            root, text="最終検知: -- 分前", font=("Arial", 28), bg="#2c3e50", fg="#bdc3c7"
        )
        self.time_label.pack(pady=10)

        self.entry_label = tk.Label(
            root,
            text="RSSI: -- (時刻: --)",
            font=("Arial", 28),
            bg="#2c3e50",
            fg="#bdc3c7",
        )
        self.entry_label.pack(pady=10)

        # 最終入退室者のラベルを追加
        self.last_user_label = tk.Label(
            root,
            text="最終入退室者: --",
            font=("Arial", 28),
            bg="#2c3e50",
            fg="#ecf0f1"
        )
        self.last_user_label.pack(pady=10)

        # BLEスキャナーの初期化
        self.ble_scanner = BLEScanner(target_name="detector-00")
        self.running = True

        # BLEスキャンを非同期で開始
        self.thread = threading.Thread(target=self.run_async_loop, daemon=True)
        self.thread.start()

        # 定期的にUIを更新
        self.update_status()

    def run_async_loop(self):
        """バックグラウンドスレッドでBLEスキャンを実行"""
        asyncio.run(self.start_ble_scan())

    async def start_ble_scan(self):
        """BLEスキャナーを開始"""
        await self.ble_scanner.start_scan()
        while self.running:
            await asyncio.sleep(10)  # 10秒間隔でスキャン

    def update_status(self):
        """スキャン結果に基づいてUIを更新"""
        scan_result = self.ble_scanner.get_detection_time_and_rssi()
        if scan_result:
            detection_time = scan_result["detection_time"]
            rssi = scan_result["rssi"]

            print(scan_result)
            if detection_time > 0 and detection_time < 5:
                self.status_label.config(text="部屋にいる: はい", fg="#2ecc71")  # 緑色
            else:
                self.status_label.config(text="部屋にいる: いいえ", fg="#e74c3c")  # 赤色
            last_detected = datetime.now() - timedelta(minutes=detection_time)
            timestamp = last_detected.strftime("%H:%M")
            self.time_label.config(text=f"最終検知: {detection_time} 分前")
            self.entry_label.config(text=f"RSSI: {rssi} (時刻: {timestamp})")
        else:
            self.status_label.config(text="部屋にいる: 不明", fg="#bdc3c7")
            self.time_label.config(text="最終検知: -- 分前")
            self.entry_label.config(text="RSSI: -- (時刻: --)")

        # 最終入退室者の更新
        self.update_last_user_info()

        # 5秒後に再度更新
        self.root.after(5000, self.update_status)

    def update_last_user_info(self):
        """最終入退室者を更新"""
        try:
            # Akerun APIから最新のユーザー履歴を取得
            latest_history = get_latest_user_history(ORGANIZATION_ID, AKERUN_DEVICE_ID)
            user_access_details = get_user_access_details(latest_history)

            # ユーザー名とアクセス日時を表示
            self.last_user_label.config(
                text=f"最終入退室者: {user_access_details['user_name']} (時刻: {user_access_details['accessed_at']})"
            )
        except Exception as e:
            self.last_user_label.config(text="最終入退室者: エラー発生")
            print("Error getting user access details:", str(e))

    def stop_ble_scan(self):
        """スキャンの停止"""
        self.running = False
        self.ble_scanner.stop_scan()

# アプリケーション起動
if __name__ == "__main__":
    root = tk.Tk()
    app = RoomStatusApp(root)
    try:
        root.mainloop()
    finally:
        app.stop_ble_scan()

ble_scanner.py

import asyncio
from bluepy.btle import Scanner, DefaultDelegate
import time
import struct

class ScanDelegate(DefaultDelegate):
    def __init__(self, target_name="detect-00"):
        super().__init__()
        self.target_name = target_name
        self.scan_results = []

    def handleDiscovery(self, dev, isNewDev, isNewData):
        if isNewDev or isNewData:
            name = dev.getValueText(9)  # Complete Local Name

            if name == self.target_name:
                # Get the manufacturer specific data (advertisement data)
                manuf_data = dev.getValueText(255)
                if manuf_data is not None:
                    # Convert the string data to bytes
                    manuf_data_bytes = bytes.fromhex(manuf_data.replace(" ", ""))
                    # Skip the first two bytes (0th and 1st byte)
                    sensor_data = manuf_data_bytes[2:]
                    # Parse the sensor detection time (next bytes)
                    if len(sensor_data) > 0:  # Ensure there's enough data for detection time
                        detection_raw = int.from_bytes(sensor_data, byteorder='little')

                        if detection_raw == 0:
                            detection_time = 0
                        elif detection_raw == 0b1:
                            detection_time = 1
                        elif detection_raw <= 0b10:
                            detection_time = 5
                        elif detection_raw <= 0b100:
                            detection_time = 10
                        elif detection_raw <= 0b1000:
                            detection_time = 20
                        elif detection_raw <= 0b10000:
                            detection_time = 30
                        elif detection_raw <= 0b100000:
                            detection_time = 40
                        elif detection_raw <= 0b1000000:
                            detection_time = 50
                        elif detection_raw <= 0b10000000:
                            detection_time = 60
                        else:
                            detection_time = 0

                        # Append results with sensor detection status and raw data
                        self.scan_results.append({
                            'address': dev.addr,
                            'name': name,
                            'rssi': dev.rssi,
                            'sensor_status': detection_raw,
                            'detection_time': detection_time,
                            'raw_data': manuf_data_bytes
                        })


class BLEScanner:
    def __init__(self, target_name="detector-00"):
        self.target_name = target_name
        self.scanner = Scanner().withDelegate(ScanDelegate(self.target_name))
        self.delegate = ScanDelegate(self.target_name)
        self.scanner.delegate = self.delegate
        self.scanning = False
        self.cache = None  # キャッシュ用の変数

    async def _scan_background(self):
        """バックグラウンドでスキャンを5秒ごとに実行し、結果をキャッシュします。"""
        while self.scanning:
            self.delegate.scan_results.clear()
            # Perform scan asynchronously for 5 seconds
            await asyncio.to_thread(self.scanner.scan, 5.0)
            # キャッシュの更新
            self.cache = self.delegate.scan_results[-1] if self.delegate.scan_results else None
            print("Scan complete, cache updated.")
            await asyncio.sleep(5)

    async def start_scan(self):
        """バックグラウンドでスキャンを開始します。"""
        if not self.scanning:
            print("Starting BLE scan...")
            self.scanning = True
            # スキャンをバックグラウンドタスクとして実行
            asyncio.create_task(self._scan_background())

    def stop_scan(self):
        """スキャンを停止します。"""
        self.scanning = False
        print("Stopping scanner.")

    def get_detection_time_and_rssi(self):
        """キャッシュされた結果から、最新の検出時間とRSSIを返します。"""
        if self.cache:
            return {
                "detection_time": self.cache['detection_time'],
                "rssi": self.cache['rssi']
            }
        else:
            return None

akerun_log.py

import os
import requests
from dotenv import load_dotenv

# .envファイルを読み込む
load_dotenv()

# アクセストークンを取得
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN")
if not ACCESS_TOKEN:
    raise ValueError("ACCESS_TOKEN is not set in the .env file")

# Akerun APIのエンドポイント
API_BASE_URL = "https://api.akerun.com/v3"

def get_latest_user_history(organization_id, akerun_device_id):
    """
    特定のAkerunデバイスの最後のユーザーアクセス履歴を取得
    :param organization_id: 組織ID
    :param akerun_device_id: AkerunデバイスID
    :return: 最後のユーザーアクセス履歴 (userがNULLのものは無視)
    """
    url = f"{API_BASE_URL}/organizations/{organization_id}/accesses"
    headers = {
        "Authorization": f"Bearer {ACCESS_TOKEN}",
    }
    params = {
        "akerun_ids[]": [akerun_device_id],
        "limit": 100  # 必要に応じて取得件数を調整
    }

    response = requests.get(url, headers=headers, params=params)

    if response.status_code != 200:
        raise Exception(f"Failed to fetch data: {response.status_code}, {response.text}")

    data = response.json()
    # アクセス履歴データを取得
    accesses = data.get("accesses", [])

    # userがNULLではないアクセスをフィルタリング
    valid_accesses = [access for access in accesses if access.get("user") is not None]

    # 最後のアクセスを取得
    return valid_accesses[0] if valid_accesses else None

def get_user_access_details(access_data):
    """
    ユーザーアクセス履歴の詳細(ユーザー名とアクセス日時)を取得
    :param access_data: アクセス履歴データ
    :return: ユーザー名とアクセス日時
    """
    if access_data:
        user_name = access_data['user'].get('name', 'Unknown User')
        accessed_at = access_data.get('accessed_at', 'Unknown Time')
        return {"user_name": user_name, "accessed_at": accessed_at}
    return {"user_name": "No valid access", "accessed_at": "No valid time"}

モニター側のソースコードです。こちらもChatGPTに書かせました。RaspberryPi上で簡易なGUIを実現で指定したところ、tkinterがサジェストされました。

AkerunのAPI仕様は(developersサイト)https://developers.akerun.com/#introductionで公開されています。今回は履歴一覧のAPIを利用しています。

import tkinter as tk
import asyncio
from datetime import datetime, timedelta
from ble_scanner import BLEScanner
import threading
from akerun_log import get_latest_user_history, get_user_access_details

# 組織IDとデバイスIDを指定
ORGANIZATION_ID = "O-111111-111111"  # 必要に応じて変更してください
AKERUN_DEVICE_ID = "A12345678"  # 必要に応じて変更してください

class RoomStatusApp:
    def __init__(self, root):
        self.root = root
        self.root.title("部屋の状況モニター")
        self.root.geometry("1600x1200")
        self.root.configure(bg="#2c3e50")  # ダークグレーの背景

        # UIの初期化
        self.title_label = tk.Label(
            root, text="部屋の状況", font=("Arial", 40, "bold"), bg="#2c3e50", fg="#ecf0f1"
        )
        self.title_label.pack(pady=10)

        self.status_label = tk.Label(
            root, text="部屋にいる: 不明", font=("Arial", 32), bg="#2c3e50", fg="#bdc3c7"
        )
        self.status_label.pack(pady=10)

        self.time_label = tk.Label(
            root, text="最終検知: -- 分前", font=("Arial", 28), bg="#2c3e50", fg="#bdc3c7"
        )
        self.time_label.pack(pady=10)

        self.entry_label = tk.Label(
            root,
            text="RSSI: -- (時刻: --)",
            font=("Arial", 28),
            bg="#2c3e50",
            fg="#bdc3c7",
        )
        self.entry_label.pack(pady=10)

        # 最終入退室者のラベルを追加
        self.last_user_label = tk.Label(
            root,
            text="最終入退室者: --",
            font=("Arial", 28),
            bg="#2c3e50",
            fg="#ecf0f1"
        )
        self.last_user_label.pack(pady=10)

        # BLEスキャナーの初期化
        self.ble_scanner = BLEScanner(target_name="detector-00")
        self.running = True

        # BLEスキャンを非同期で開始
        self.thread = threading.Thread(target=self.run_async_loop, daemon=True)
        self.thread.start()

        # 定期的にUIを更新
        self.update_status()

    def run_async_loop(self):
        """バックグラウンドスレッドでBLEスキャンを実行"""
        asyncio.run(self.start_ble_scan())

    async def start_ble_scan(self):
        """BLEスキャナーを開始"""
        await self.ble_scanner.start_scan()
        while self.running:
            await asyncio.sleep(10)  # 10秒間隔でスキャン

    def update_status(self):
        """スキャン結果に基づいてUIを更新"""
        scan_result = self.ble_scanner.get_detection_time_and_rssi()
        if scan_result:
            detection_time = scan_result["detection_time"]
            rssi = scan_result["rssi"]

            print(scan_result)
            if detection_time > 0 and detection_time < 5:
                self.status_label.config(text="部屋にいる: はい", fg="#2ecc71")  # 緑色
            else:
                self.status_label.config(text="部屋にいる: いいえ", fg="#e74c3c")  # 赤色
            last_detected = datetime.now() - timedelta(minutes=detection_time)
            timestamp = last_detected.strftime("%H:%M")
            self.time_label.config(text=f"最終検知: {detection_time} 分前")
            self.entry_label.config(text=f"RSSI: {rssi} (時刻: {timestamp})")
        else:
            self.status_label.config(text="部屋にいる: 不明", fg="#bdc3c7")
            self.time_label.config(text="最終検知: -- 分前")
            self.entry_label.config(text="RSSI: -- (時刻: --)")

        # 最終入退室者の更新
        self.update_last_user_info()

        # 5秒後に再度更新
        self.root.after(5000, self.update_status)

    def update_last_user_info(self):
        """最終入退室者を更新"""
        try:
            # Akerun APIから最新のユーザー履歴を取得
            latest_history = get_latest_user_history(ORGANIZATION_ID, AKERUN_DEVICE_ID)
            user_access_details = get_user_access_details(latest_history)

            # ユーザー名とアクセス日時を表示
            self.last_user_label.config(
                text=f"最終入退室者: {user_access_details['user_name']} (時刻: {user_access_details['accessed_at']})"
            )
        except Exception as e:
            self.last_user_label.config(text="最終入退室者: エラー発生")
            print("Error getting user access details:", str(e))

    def stop_ble_scan(self):
        """スキャンの停止"""
        self.running = False
        self.ble_scanner.stop_scan()

# アプリケーション起動
if __name__ == "__main__":
    root = tk.Tk()
    app = RoomStatusApp(root)
    try:
        root.mainloop()
    finally:
        app.stop_ble_scan()

トライアル

会議室のモニターに設置してみました。

こいつ...動くぞ...!

実際に導入するなら...

トライアルの結果、期待通りの振る舞いをすることがわかりました。 今回はついカッとなってサクッとつくりましたが、実際に導入するとなると、課題があります

  • ブレッドボードではなく、基板実装(PCBA)にして筐体に格納しないと、物理接触・衝撃・静電気等で停止・故障する
  • モニターに公開APIの認証情報・入退室履歴が残るので、セキュアな管理・設置ができるように
  • 長期安定動作するか検証

また、そもそも会議室が時間通り空くように、予約時間終了前に利用者に知らせるようなソリューションもほしいよね? という声もありました。それはそう!

おわり

今回のプロジェクトは、以下の裏の目的がありました。

  • RasperryPi PICO WBluetoothを試したかった
  • 自分で実装しないでChatGPTに書かせて、ハードウェアを動かすコードを書けるのか試したかった

製品開発ではこんなことできないので、趣味プロトタイピングとして実施しました。 API・ライブラリ仕様が公開されているものは、そのリファレンスをChatGPTに渡すと内容を解釈して実装してくれました。便利。

エンジニアはサクッと動くものをつくるのは得意です。ですが、好きなようにつくると好み・スキル・経験に基づいて作ってしまうことも多々。 ソリューションとして開発するなら、要求の洗い出し・要件定義・技術選定を丁寧にすすめることがとても大事。


株式会社フォトシンスでは、一緒にプロダクトを成長させる様々なレイヤのエンジニアを募集しています。 photosynth.co.jp Akerunにご興味のある方はこちらから akerun.com