分析モジュールを1つ追加するたびに「どのファイルを触ればいいんだっけ?」と迷う——そんな状態は設計の問題です。
本記事では、あるデータ分析プロジェクトで20種以上の分析モジュールを統一インターフェースで管理するために設計した「Enum + Config + Detector」の3層アーキテクチャを解説します。新しいモジュールを追加する際の変更箇所が明確で、テンプレート化されたパターンにより迷いなく実装できる構造です。
課題:モジュール追加のたびに散在するファイルを修正
データ分析パイプラインで複数の分析モジュールを管理するシステムでは、1つのモジュールを追加するだけで複数のファイルに手を入れる必要があります。
変更箇所が文書化されていないと、以下の問題が発生します。
- 新しいモジュールの登録漏れ(パイプラインで実行されない)
- Enumとコンフィグの不整合(定義はあるが計算ロジックがない)
- データベースのカラム定義漏れ(分析結果を保存できない)
3層アーキテクチャの全体像
1┌─────────────────────────────────┐2│ signalsConfig.py │ ← Layer 1: Enum + シグナル定義3│ 分析結果の列挙型定義 │4└─────────────┬───────────────────┘5 │6┌─────────────▼───────────────────┐7│ analyticsEngine.py │ ← Layer 2: 分析エンジン8│ 各モジュールの計算ロジック │9└─────────────┬───────────────────┘10 │11┌─────────────▼───────────────────┐12│ moduleManager.py │ ← Layer 3: 一元管理13│ 各モジュールの登録・実行制御 │14└─────────────────────────────────┘Layer 1: signalsConfig.py — Enumベースの定義
すべての分析モジュールが出力するシグナル(検出結果)をEnumで定義します。
1from enum import Enum2
3class PositiveSignal(Enum):4 """ポジティブシグナルの定義"""5 TREND_UP_CROSS = "trend_up_cross"6 OSCILLATOR_LOW = "oscillator_low"7 BAND_LOWER_TOUCH = "band_lower_touch"8 VOLATILITY_BREAKOUT = "volatility_breakout"9 # ... 他のシグナル10
11class NegativeSignal(Enum):12 """ネガティブシグナルの定義"""13 TREND_DOWN_CROSS = "trend_down_cross"14 OSCILLATOR_HIGH = "oscillator_high"15 BAND_UPPER_TOUCH = "band_upper_touch"2 collapsed lines
16 VOLATILITY_CONTRACTION = "volatility_contraction"17 # ... 他のシグナル設計ポイント:
- Enumにすることで、タイポによるバグを型チェックで防止
- シグナルの一覧がこのファイルだけで把握できる
- IDEの補完が効くため、開発効率も向上
Layer 2: analyticsEngine.py — 分析エンジン
各分析モジュールの計算ロジックを統一インターフェースで実装します。
1from abc import ABC, abstractmethod2import pandas as pd3
4# 分析モジュールの基底(関数ベースでも良いが、5# 状態を持つモジュールがあるため抽象基底を使用)6class AnalyticsModuleBase(ABC):7 """分析モジュールの共通インターフェース"""8
9 @abstractmethod10 def calculate(self, df: pd.DataFrame) -> pd.DataFrame:11 """指標を計算してDataFrameに列を追加する"""12 ...13
14 @abstractmethod15 def detect_signals(self, df: pd.DataFrame) -> pd.DataFrame:33 collapsed lines
16 """シグナル(判定結果)を検出する"""17 ...18
19# 移動平均クロス分析の実装例20class MovingAverageCrossModule(AnalyticsModuleBase):21 def __init__(22 self,23 fast_period: int = 12,24 slow_period: int = 26,25 signal_period: int = 9,26 ):27 self.fast_period = fast_period28 self.slow_period = slow_period29 self.signal_period = signal_period30
31 def calculate(self, df: pd.DataFrame) -> pd.DataFrame:32 fast_ema = df["value"].ewm(span=self.fast_period).mean()33 slow_ema = df["value"].ewm(span=self.slow_period).mean()34 df["ma_diff"] = fast_ema - slow_ema35 df["ma_signal"] = df["ma_diff"].ewm(span=self.signal_period).mean()36 df["ma_histogram"] = df["ma_diff"] - df["ma_signal"]37 return df38
39 def detect_signals(self, df: pd.DataFrame) -> pd.DataFrame:40 df["up_cross"] = (41 (df["ma_diff"] > df["ma_signal"])42 & (df["ma_diff"].shift(1) <= df["ma_signal"].shift(1))43 )44 df["down_cross"] = (45 (df["ma_diff"] < df["ma_signal"])46 & (df["ma_diff"].shift(1) >= df["ma_signal"].shift(1))47 )48 return dfLayer 3: moduleManager.py — 一元管理
すべての分析モジュールを登録し、実行順序を制御します。
1import pandas as pd2from typing import Sequence3
4class AnalyticsModuleManager:5 """分析モジュールの一元管理"""6
7 def __init__(self, modules: Sequence[AnalyticsModuleBase]):8 self._modules = list(modules)9
10 def run_all(self, df: pd.DataFrame) -> pd.DataFrame:11 """全モジュールを順次計算・シグナル検出"""12 for module in self._modules:13 df = module.calculate(df)14 df = module.detect_signals(df)15 return df17 collapsed lines
16
17 @property18 def module_count(self) -> int:19 return len(self._modules)20
21# 使用例22manager = AnalyticsModuleManager([23 MovingAverageCrossModule(),24 OscillatorModule(), # ※実装省略25 BandModule(),26 VolatilityModule(),27 # ... 20種以上のモジュールを登録28])29
30# 全モジュールを一括実行31df = pd.read_parquet("path/to/data.parquet")32result = manager.run_all(df)6ファイル連鎖更新パターン
新しい分析モジュールを追加する際、以下の6ファイルを順番に更新します。
11. signalsConfig.py2 └→ PositiveSignal / NegativeSignal にEnum値を追加3
42. analyticsEngine.py5 └→ AnalyticsModuleBaseの実装クラスを追加6
73. moduleManager.py8 └→ マネージャーに新しいモジュールを登録9
104. pipeline/main.py(パイプラインの実行エントリポイント)11 └→ 新モジュールのシグナルを下流処理で使用12
135. output/main.py14 └→ 分析結果をParquetファイルに出力する処理を追加15
2 collapsed lines
166. schema.sql17 └→ データベースのカラム定義を追加この6ファイルの更新手順をドキュメント化しておくことで、新しいモジュールの追加漏れを防止します。
Parquetファイルでのデータ管理
分析結果はParquetファイルで管理しています。
1# 分析結果の保存2result.to_parquet(3 f"data/analytics/{module_name}_{target_id}_{date}.parquet"4)Parquetを選択した理由:
- カラム型フォーマットのため、特定の指標だけの読み込みが高速
- データ型が保存されるため、CSVのような型推論の不安定さがない
- pandasとの相性が良く、
read_parquet/to_parquetで直接読み書きできる
この設計パターンのメリット
1. 変更箇所が明確
「新しいモジュールを追加する」という作業が、6ファイルの更新という明確な手順に分解されています。チェックリスト化できるため、抜け漏れが起きにくくなります。
2. テストが書きやすい
各モジュールが AnalyticsModuleBase の統一インターフェースを持つため、テストもテンプレート化できます。
1def test_module_interface(module: AnalyticsModuleBase):2 """全モジュールに共通のインターフェーステスト"""3 df = create_sample_data()4 result = module.calculate(df)5 assert isinstance(result, pd.DataFrame)6 result = module.detect_signals(result)7 assert isinstance(result, pd.DataFrame)3. モジュールの有効/無効をManagerで制御
マネージャーの登録リストからモジュールを外すだけで、特定の分析を無効化できます。条件分岐やフラグは不要です。
まとめ
分析モジュールの管理で重要なのは「どこを触ればいいか」が即座に分かる設計です。Enum + Config + Detector の3層アーキテクチャと6ファイル連鎖更新パターンにより、20種以上のモジュールを統一的に管理し、新しいモジュールの追加を安全かつ迅速に行える構造を実現しました。
この設計は「同じインターフェースを持つモジュールを大量に管理する」場面全般に応用できるパターンです。データ分析に限らず、バリデーションルール、通知チャネル、レポート生成など、プラグイン的に拡張したいシステムで有効です。