ソース構造 - Python バックエンド

Contents

全体構造

SlowDash は,主に 4 つの Python レイヤーで構成されています.

Client / CLI / CGI
    |
    v
Slowlette
    - ASGI/WSGI アダプタ
    - URL ルーティング
    - リクエスト引数のバインディング
    - レスポンスのマージ
    |
    v
SlowDash サーバーコンポーネント
    - プロジェクト/設定の処理
    - データソース API
    - ユーザー/タスクモジュール API
    - エクスポート API
    - ユーザー HTML/コンテンツ API
    - リアルタイム/現在値データの補助機能
    |
    v
プラグインとライブラリ
    - app/plugin のデータソースとエクスポータ
    - slowpy のデータオブジェクト,制御ノード,ストア,クライアントヘルパー

中心となるサーバーオブジェクトは,app/server/slowdash.pyApp です.Appslowlette.App を継承し,Project を生成して実行環境を整えたうえで,SlowDash の各コンポーネントを Slowlette のルーターに組み込みます.

主要ディレクトリ

app/server

このディレクトリには,SlowDash のウェブアプリケーションと組み込みの API コンポーネントが含まれます.

主なファイル:

app/plugin

このディレクトリには,PluginComponent によって読み込まれるプラグインモジュールが含まれます.

データソースプラグイン:

エクスポートプラグイン:

プラグインのファイル名とクラス名は命名規約で決まります.たとえばデータソースの種別が SQLite の場合は,次のように対応します.

app/plugin/datasource_SQLite.py
DataSource_SQLite

lib/slowlette

Slowlette は,SlowDash が使用する小さなウェブフレームワークです.

主なファイル:

lib/slowpy

SlowPy は,データ型,制御の抽象化,ストレージへの書き込み機能,クライアントヘルパー,作図ヘルパーを提供します.

主な領域:

公開されているトップレベルの slowpy パッケージは,HistogramGraphTrendTreeTableTimeSeriesSlowFetchslowdashifyslowplot など,よく使われるデータオブジェクトやヘルパーをエクスポートします.

アプリケーションの起動フロー

コマンドラインまたはサーバーの起動

主要なエントリーポイントは app/server/slowdash.py です.

通常の起動シーケンスは次のとおりです.

  1. コマンドラインオプションを解析する.
  2. App(project_dir, project_file, is_cgi, is_command, is_async) を生成する.
  3. AppProject を生成する.
  4. Project が SlowDash のシステムディレクトリとプロジェクトディレクトリを探す.
  5. ProjectSlowdashProject.yaml を読み込む.設定によっては,環境変数から初期設定を生成する.
  6. プロジェクトディレクトリが存在する場合,App はプロセスの作業ディレクトリをプロジェクトディレクトリに移す.
  7. App は,システムのプラグインディレクトリ,プロジェクトディレクトリ,プロジェクトの config ディレクトリを sys.path に追加する.
  8. App がすべての組み込みコンポーネントを,自身の Slowlette ルーターに組み込む.
  9. 選択されたモードに応じて,ASGI,WSGI,CGI,またはコマンドラインの内部リクエストとして実行される.

コンポーネントの組み込み順序

slowdash.py は,コンポーネントを次の順序で組み込みます.

ConsoleComponent
MeshComponent
UserModuleComponent
TaskModuleComponent
ConfigComponent
DataSourceComponent
UserHtmlComponent
ExportComponent
MiscApiComponent
SlowMQComponent

この順序は重要です.Slowlette は,一致した複数のハンドラからレスポンスを集めてマージします.そのため,先に組み込まれたコンポーネントがマージ用のラッパーを担い,後続のコンポーネントがマージ対象のコンテンツを提供できます.

コード中で示されている順序の意図は次のとおりです.

Slowlette のルーティングとレスポンスモデル

リクエストの流れ

ASGI の場合:

ASGI server
    |
    v
slowlette.server.dispatch_asgi()
    |
    +-- lifespan.startup/shutdown -> router.dispatch_event()
    |
    +-- websocket -> router.websocket()
    |
    +-- http -> read body -> Request -> router.dispatch()

WSGI の場合:

WSGI server
    |
    v
slowlette.server.dispatch_wsgi()
    |
    v
Request -> asyncio.run(router.dispatch()) -> WSGI response

Slowlette は,受信した URL を Request に変換します.

デコレータと引数のバインディング

ハンドラは,次のようなデコレータで宣言します.

@slowlette.get('/api/channels')
@slowlette.post('/api/control')
@slowlette.websocket('/ws/slowmq')
@slowlette.on_event('startup')

router.pyPathRule は,デコレートされた関数のシグネチャを調べ,次のものをバインドします.

ルーターはサブアプリを組み込めます.各コンポーネント自体が slowlette.App であるため,コンポーネントごとに独自のルートを追加できます.

レスポンスのマージ

Slowlette のディスパッチは,最初に一致したハンドラで止まりません.コンポーネントツリーをたどってすべての一致するレスポンスを集め,下から上へとマージします.

Response.merge_response() のデフォルトのマージ動作は次のとおりです.

SlowDash は,集約型のエンドポイントのためにこの仕組みを利用しています.

一部のコンポーネントは,merge_response() をオーバーライドした独自の Response サブクラスを返します.たとえば現在値データのキャッシュコンポーネントは,データソースの応答が生成された後に現在値を追加します.

プロジェクト設定のフロー

sd_project.pyProject は,プロジェクト設定の探索と読み込みを担当します.

設定の取得元は次のとおりです.

  1. 明示的に指定された --project-dir または --project-file
  2. SLOWDASH_PROJECT
  3. 親ディレクトリをたどっての SlowdashProject.yaml の探索.
  4. SLOWDASH_INIT_DATASOURCE_URL による,環境変数ベースの初期データソース.

プロジェクトファイルは slowdash_project 辞書を含む必要があります.読み込み時には,Substitution が次の形式を含む文字列の置換を処理します.

${VARIABLE}
${VARIABLE-default}
${VARIABLE:-default-like-empty-is-null}
$(COMMAND)
$$

読み込み後の処理は次のとおりです.

ConfigComponent は,公開用のプロジェクトメタデータを /api/config から提供します.ただし,生のプロジェクト設定は秘密情報を含む可能性があるため,そのまま公開しません.

組み込みサーバーコンポーネント

ComponentPluginComponent

Component は,サーバーコンポーネントの基底クラスです.次のものを提供します.

PluginComponent は,プロジェクト設定からコンポーネントのプラグインを構築します.

  1. project.config[component_type],またはその複数形を読み込む.
  2. 単一のノードをリストに正規化する.
  3. プラグインのファイル名とクラス名を解決する.
  4. app/plugin からプラグインモジュールを読み込む.
  5. プラグインクラスをインスタンス化する.
  6. 各プラグインを Slowlette のサブアプリとして組み込む.

アプリが非同期でない場合は,利用可能であれば _NoAsync 版のプラグインファイルが優先されます.

ConfigComponent

主な責務は次のとおりです.

/api/config/contentlist/api/config/content/{filename} のエンドポイントは,UI コンポーネントがダッシュボード,プロット,cruise,その他のユーザーコンテンツを見つけるために使われます.

DataSourceComponentDataSource

DataSourceComponent は,データソース用のプラグインベースのコンポーネントです.

DataSource プラグインは,次のルートを提供します.

GET /api/channels
GET /api/data/{channels}
GET /api/blob/{channel}
startup
shutdown

基底の DataSource クラスは,同期・非同期のどちらのプラグイン実装にも対応します.

initialize()       -> aio_initialize()
finalize()         -> aio_finalize()
get_channels()     -> aio_get_channels()
get_timeseries()   -> aio_get_timeseries()
get_object()       -> aio_get_object()
get_blob()         -> aio_get_blob()

データ取得の流れは次のとおりです.

GET /api/data/{channels}
    |
    v
parse length/to/resample/reducer/filler/envelope/prior_data
    |
    v
aio_get_timeseries(...)
aio_get_object(...)
    |
    v
merge time-series and object results into one dict

DataSource.resample() ヘルパーは,時系列データをビンに揃え,lastmeanmedianminmaxcountsem などのリデューサに対応します.

ExportComponent

ExportComponent は,プロジェクト設定からエクスポートプラグインを読み込みます.

また,次のデフォルトのエクスポート機能を必ず追加します.

実際のエクスポート用ルートは,エクスポートプラグイン側が提供します.

UserModuleComponent

sd_usermodule.py は,SlowDash のプロセス内 Python 拡張機構を提供します.

ユーザーモジュールはプロジェクト設定から読み込まれ,UserModuleThread 内で実行されます.モジュールは,次のライフサイクルコールバックを定義できます.

_setup(app, params) or _setup(app) or _setup()
_initialize(params) or _initialize()
_run()
_loop()
_finalize()

ユーザーモジュールは,定義されている関数に応じて,API ハンドラ,コンテンツ,HTML,レイアウト,チャンネル/データのフック,制御コマンドも提供できます.

ユーザーモジュールのスレッドは,通常は自身のイベントループを使います._run()_loop() が非同期に対応している場合に限り,設定によってメインのイベントループを使えます.

TaskModuleComponent

sd_taskmodule.py は,現行のプロセス内タスクモジュールシステムです.

ユーザーモジュールの機構を拡張し,タスクコマンドの解析,コマンドの実行,ControlSystem との統合を追加します.

主なルートは次のとおりです.

GET  /api/control/task
POST /api/control
POST /api/control/task/{taskname}
GET  /api/channels
GET  /api/data/{channels}
POST /api/consume/current_data

タスクコマンドの流れは次のとおりです.

POST /api/control
    |
    v
TaskModuleComponent.execute_command()
    |
    v
each TaskModule.process_command()
    |
    v
parse command name, arguments, await/reentrant flags
    |
    v
match namespace prefix/suffix
    |
    v
call task function immediately or in TaskFunctionThread

エクスポートされた制御ノードは現在値チャンネルとして公開され,/api/data/{channels} から読み取れます.受信した現在値データのメッセージは,/api/consume/current_data を通じてエクスポート済み変数の設定にも使えます.

UserHtmlComponent

sd_userhtml.py は,ユーザー提供の HTML と関連コンテンツを配信します.また,ユーザー URL を内部の設定/コンテンツ API へリダイレクトまたはマッピングします.

これにより,プロジェクト固有の UI ページを,コアサーバーを変更することなく,プロジェクトの設定/コンテンツ領域に置けます.

MeshComponent

sd_mesh.py は,/api/consume/current_data を通じて受け取った現在値データのキャッシュを保持します.

主な役割は次のとおりです.

このコンポーネントはデータソースより前に組み込まれます.そのため独自のレスポンスが,後続のデータソースの応答とキャッシュデータをマージできます.

SlowMQComponent

sd_slowmq.py は,組み込みの websocket pub/sub サービスを提供します.

主なルートは次のとおりです.

WEBSOCKET /ws/slowmq

接続中の各クライアントは,次のものを持ちます.

メッセージはヘッダを含みます.ヘッダの action によって,そのメッセージが publish,subscribe,unsubscribe のいずれの操作かが決まります.

トピックパターンはドット区切りで,次のものに対応します.

その他のコンポーネント

その他のサーバーコンポーネントは次のとおりです.

プラグインアーキテクチャ

プラグインは,app/plugin 以下にある通常の Python モジュールです.ファイル名とクラス名によって動的に読み込まれます.

データソースの例:

slowdash_project:
  data_source:
    type: SQLite
    parameters:
      ...

これは次のように解決されます.

datasource_SQLite.py
DataSource_SQLite

エクスポートの例:

slowdash_project:
  export:
    type: CSV

これは次のように解決されます.

export_CSV.py
Export_CSV

また PluginComponent は,デフォルトでネストした parameters 辞書をルートのパラメータ辞書にマージします.これにより,プラグインのコンストラクタは平坦化されたパラメータを参照できます.

データ取得の通信フロー

最も一般的な読み取り経路は次のとおりです.

Browser or client
    |
    v
GET /api/channels
GET /api/data/{channels}?length=...&to=...
    |
    v
Slowlette ASGI/WSGI dispatch
    |
    v
SlowDash component tree
    |
    +-- MeshComponent cache merge response
    +-- UserModuleComponent hooks
    +-- TaskModuleComponent current exports
    +-- DataSourceComponent plugins
    |
    v
Slowlette response merge
    |
    v
JSON response

/api/data/{channels} では,DataSource プラグインが SlowDash のデータモデル形式でデータを返します.キャッシュコンポーネントは,次の条件を満たす場合に現在値を追加できます.

書き込み・配信・現在値データのフロー

現在値データの更新は,次の経路で SlowDash に入ります.

POST /api/emit/{topic}
POST /api/consume/current_data
internal app.request_emit(topic, message, sender=...)

典型的な流れは次のとおりです.

producer
    |
    v
/api/emit/current_data
    |
    v
app.request('/consume/current_data', data)
    |
    +-- MeshComponent.cache_current_data()
    +-- TaskModuleComponent.set_variable()
    |
    v
websocket forwarding to attached clients

sender パラメータは,タスク自身が配信した値が同じタスク変数のパスに反射して戻ってくるのを防ぐために使われます.

制御フロー

制御コマンドは /api/control を使います.

現行のプロセス内タスクの流れは次のとおりです.

POST /api/control
    |
    v
TaskModuleComponent.execute_command()
    |
    v
TaskModule.process_task_command()
    |
    +-- parse "await", "reentrant", "async", "parallel" prefixes
    +-- parse function arguments
    +-- match task namespace
    +-- bind parameters using Python function signature
    |
    v
execute function synchronously, await it, or run it in a command thread

ユーザーモジュールも,定義したフックに応じてコマンド処理に参加できます.

SlowPy ライブラリの役割

SlowPy は,サーバー側のコンポーネントとユーザーコードの両方から使われます.

データオブジェクトモデル

SlowPy は,SlowDash 互換のデータに変換できる Python オブジェクトを提供します.

これらのオブジェクトは,タスク/ユーザーコード,ストレージへの書き込み,現在値データを配信する API で使われます.

データオブジェクトの生成とデータ追加

データオブジェクトは,トップレベルの slowpy パッケージから生成し,通常はタスクのループ内で逐次データを追加していきます.

import slowpy as slp

hist = slp.Histogram(100, 0, 10)         # [0, 10] を 100 ビンに分割
graph = slp.Graph(['channel', 'value'])

while not ControlSystem.is_stop_requested():
    value = device.read(...)
    hist.fill(value)
    graph.fill(channel, value)

各オブジェクトは to_json() を実装しており,これにより同じオブジェクトを現在値データとして配信したり(ControlSystem.stream() / aio_publish() を通じて),データストアに書き込んだりできます.slp.RateTrend のようなトレンド用のヘルパーは,移動窓に値を蓄積し,保存に適した TimeSeries を生成します.

rate_trend = slp.RateTrend(length=300, tick=10)
rate_trend.fill(time.time())
datastore.append(rate_trend.time_series('rate'))

制御ノード

slowpy/control/node.pyControlNode は,読み書き可能な制御エンドポイントの基底となる抽象です.

主なメソッドは次のとおりです.

set(value)
get()
aio_set(value)
aio_get()
has_data()
aio_has_data()
sleep()
aio_sleep()
wait()
aio_wait()
readonly()
writeonly()

非同期メソッドは,デフォルトでは同期メソッドに委譲します._is_thread_safe が設定されている場合,同期版の get()set() の呼び出しを asyncio.to_thread() 経由で実行できます.

ノードチェーンの構築と get()/set() の利用

SlowPy は,外部システムやデバイスをすべて単一の制御ツリーに対応づけます.各ノードは set()get() を持ち,名詞的な名前のメソッドは子ノードを返します.これらのアクセサを連ねたチェーンが,特定のエンドポイントへの論理的な経路を表します.

起点となるのは,通常は ControlSystem のインスタンス(または共有インスタンスの control_system)です.

from slowpy.control import ControlSystem
ctrl = ControlSystem()

# ノードチェーンを構築する: Ethernet 接続 -> SCPI -> 特定のコマンド
device = ctrl.ethernet(host='192.168.1.43', port=5025)
Vout = device.scpi(append_opc=True).command('VOLT')
V    = device.scpi().command('MEAS:VOLT:DC')

チェーン中の各呼び出しが,1 つの枝を追加します.

末端のノードを構築したら,set() で書き込み,get() で読み取ります.

Vout.set(10)        # SCPI "VOLT 10;*OPC?" を送る
value = V.get()     # SCPI "MEAS:VOLT:DC?" を送り,応答を返す

ControlNode には,次のショートカットも定義されています.

非同期版は await node.aio_set(value)value = await node.aio_get() で,後述の同期/非同期モデルを通じて,すべてのノードで利用できます.

枝はプラグインによって追加され,ルートの ControlSystem に限られません.プラグインは任意のノードに読み込めます.たとえば,Ethernet ノードに読み込まれたプロトコルプラグインは,Ethernet ノードの set()(送信)と get()(受信)を再利用するサブ枝を作ります.組み込みノード(Ethernet/SCPI/Telnet,HTTP,Shell,Slowdash,Redis など)の一覧と,それぞれの set() / get() の意味については,Controls Script を参照してください.

slowpy/control/control_*.py 以下の制御モジュールは,デバイス,ネットワーク,メッセージ,シェル,HTTP,データストア,各種プロトコルとの具体的な連携を提供します.

同期版・非同期版の制御モジュール

ほとんどの連携は,control_X.pycontrol_AsyncX.py という同期版・非同期版のモジュールの組として提供されます.

control_HTTP.py      / control_AsyncHTTP.py
control_Redis.py     / control_AsyncRedis.py
control_RabbitMQ.py  / control_AsyncRabbitMQ.py
control_MQTT.py      / control_AsyncMQTT.py
control_Modbus.py    / control_AsyncModbus.py
control_Slowdash.py  / control_AsyncSlowdash.py
control_Dripline.py  / control_AsyncDripline.py

一方の形式しか存在しない連携もあります.

両者の違いは,入出力(I/O)メソッドの実装方法だけです.

どちらも ControlNode を継承するため,ノードツリーのモデル,子ノードのアクセサ,readonly() / writeonly() のラッパー,ヘルパーメソッド(sleep()wait(),およびそれらの aio_* 版)を共通して持ちます.変わるのは末端の I/O 実装だけです.

ノードツリーへの登録

各ノードクラスは,ファクトリ関数を返すクラスメソッド _node_creator_method() を定義します.ControlNode.add_node() はこの関数を親ノードクラスのメソッドとして注入し,その関数名が子ノードを生成するためのアクセサになります.同期版と非同期版では,通常は異なるアクセサ名を公開します.たとえば次のとおりです.

HttpNode       -> node.http(url)
AsyncHttpNode  -> node.async_http(url)

ControlNode.import_control_module(name) は,現在の作業ディレクトリまたは slowpy/control ディレクトリから control_<name>.py を読み込み,_node_creator_method() を定義しているクラスを走査して,それらのアクセサをノードクラスに登録します.ControlSystem.__init__() は,次のデフォルトのモジュール群を読み込みます.

Ethernet
HTTP
AsyncHTTP
Shell
DataStore

それ以外のモジュールは,タスクコードやユーザーコードから必要に応じて読み込みます.たとえば ControlSystem().import_control_module('Redis') のようにします.

非同期フォールバックとの関係

基底の ControlNode は,同期版の set() / get() に委譲するデフォルトの aio_set() / aio_get() をあらかじめ提供しています(直接呼び出すか,_is_thread_safe が設定されている場合は asyncio.to_thread() 経由で呼び出します).このため,同期版のみのモジュールでも,非同期コードから利用できます.専用の control_Async*.py モジュールは,長時間維持するネットワーク接続やメッセージブローカーとの接続など,真にノンブロッキングな I/O が重要になる場合のために用意されています.これは,前述したサーバー側の DataSource の同期/非同期の二重メソッドや,_NoAsync データソースプラグインと同じ考え方です.

データストア

SlowPy のデータストアは,書き込み側のストレージヘルパーを提供します.

store/factory.py は,URL を実装にマッピングします.

postgresql:// -> DataStore_PostgreSQL
mysql://      -> DataStore_MySQL
sqlite://     -> DataStore_SQLite
influxdb2://  -> DataStore_InfluxDB2
redis://      -> DataStore_Redis
csv:///       -> DataStore_CSV
dump:///      -> DataStore_TextDump

DataStore は,次の操作に対応します.

append(values, tag=None, timestamp=None)
update(values, tag=None, timestamp=None)
close()

値には,スカラー,フィールドの辞書,データ要素,TimeSeries を使えます.

データの書き込み

ストアは,クラスから直接生成するか,store/factory.pycreate_datastore_from_url() を使って URL から生成します.

from slowpy.store import DataStore_PostgreSQL

datastore = DataStore_PostgreSQL(
    'postgresql://postgres:postgres@localhost:5432/SlowTestData',
    table='SlowData'
)

while True:
    datastore.append(value, tag='voltmeter')      # チャンネルタグを付けた単一の値
    datastore.append({'ch00': v0, 'ch01': v1})    # チャンネル/値のペアの辞書

append() は新しい時系列レコードを追加し,update() は直前の値を上書きして最新の値だけを残します.この違いは,ヒストグラムのようなデータ要素オブジェクトを扱う際に意味を持ちます.

datastore.append(hist, tag='spectrum')   # ヒストグラムの時系列(時刻ごとに 1 つ)
datastore.update(hist, tag='spectrum')   # 最新のヒストグラムだけを残す

SQL 系のストアでは,デフォルトで UNIX タイムスタンプを用いた「ロングフォーマット」が使われます.異なるテーブル構成が必要な場合は,ユーザー定義の TableFormat でスキーマや INSERT 文を上書きできます.

SlowDash にとって重要な Slowlette の内部仕様

SlowDash のいくつかの挙動は,Slowlette の設計に直接依存しています.

同一ルートに複数のハンドラが応答できる

Slowlette は,アプリツリー内の一致するすべてのハンドラを意図的に呼び出します.このため,複数のコンポーネントが /api/config/api/channels/api/data/{channels} を提供できます.

レスポンスのマージはアプリケーションモデルの一部である

マージされたレスポンスは,単なる便利機能ではありません.独立に開発されたコンポーネントやプラグインから集約型の API レスポンスを組み立てるための仕組みです.

コンポーネントの順序には意味がある

独自のレスポンスが後続のレスポンスをマージできるため,slowdash.py における組み込み順序は実行時の挙動の一部です.

内部 API 呼び出しも同じルーターを使う

App.request()request_config()request_channels()request_data()request_emit()self.slowlette(...) を直接呼び出します.このため,サーバー側のプロデューサ/コンシューマも,外部の HTTP クライアントと同じルーティングおよびレスポンスマージのモデルを使います.

主要フローのまとめ

起動

slowdash.py
  -> App
  -> Project
  -> sys.path / cwd setup
  -> include components
  -> ASGI/WSGI/CGI/CLI dispatch

API リクエスト

HTTP request
  -> Slowlette server adapter
  -> Request
  -> Router
  -> matching component/plugin handlers
  -> Response list
  -> merged Response
  -> HTTP response

データ読み取り

/api/data/{channels}
  -> data source plugins
  -> optional user/task/current-data additions
  -> merged JSON data

設定の読み取り

/api/config
  -> each component public_config()
  -> deep-merged JSON config

現在値データ

/api/emit/current_data
  -> /api/consume/current_data
  -> cache update and variable update hooks
  -> websocket forwarding where applicable

プラグインの読み込み

project config
  -> PluginComponent
  -> app/plugin module lookup
  -> class lookup
  -> plugin instance
  -> Slowlette include

開発上の実務的な注意