#!/usr/bin/env python3 """ TCL S-Line Protocol Analyzer — flexible post-processing and live analysis. Plot any byte from any packet type, apply math transforms, mask bits. Import JSONL captures or connect live to ESP32 WiFi bridge. Author: TG (dj9kw.de) """ import json import sys import time from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path from typing import Optional import math import numpy as np import pyqtgraph as pg from PyQt6.QtCore import Qt, QTimer, pyqtSignal, QObject, QThread from PyQt6.QtGui import QColor, QFont, QPalette from PyQt6.QtNetwork import QTcpSocket from PyQt6.QtWidgets import ( QApplication, QCheckBox, QColorDialog, QComboBox, QFileDialog, QGridLayout, QGroupBox, QHBoxLayout, QLabel, QLineEdit, QMainWindow, QMenu, QPushButton, QScrollArea, QSizePolicy, QSpinBox, QSplitter, QVBoxLayout, QWidget, ) # Import shared protocol bits from the monitor tool from tcl_bus_monitor import PacketParser, ReplayWorker, PKT_TYPES # ── Protocol knowledge ────────────────────────────────────────────────────── # Best-guess byte names per packet type (byte indices 4-16, the payload) BYTE_NAMES = { 0x01: { 4: "Mode (0=stby,1=cool,2=cdry,3=dehum,4=heat)", 5: "Freq (RPS)", 6: "Valve (0=cool,8=heat)", 7: "B7", 8: "B8", 9: "B9", 10: "B10", 11: "OnOff (0=act,1=stby)", 12: "EEV Position (B12)", 13: "ODU Fan Speed (B13)", 14: "AmbEcho", 15: "B15", 16: "ModeFlag (0x50/0x10)", }, 0x02: { 4: "Counter", 5: "Vbus (mains voltage)", 6: "B6 (always 0)", 7: "ODT discharge (raw ADC, 20k NTC+2k pull)", 8: "OPT coil (raw ADC, 5k NTC+5.1k pull)", 9: "OAT ambient (raw ADC, 5k NTC+5.1k pull)", 10: "B10 (always 0)", 11: "B11 (const=60, unknown)", 12: "B12 (always 0)", 13: "B13 (const=144)", 14: "B14", 15: "B15", 16: "B16", }, 0x81: { 4: "ModeStatus (0x70-0x74)", 5: "B5 (const=127)", 6: "Turbo (0x02=on)", 7: "B7", 8: "B8", 9: "CRT (setpoint+fan_offset)", 10: "FanSpeed (80-125)", 11: "Indoor Coil (raw ADC)", 12: "Room Temp (raw ADC)", 13: "B13", 14: "B14 (varying)", 15: "B15", 16: "TypeDiff (0x40=81,0x00=82)", }, 0x82: { 4: "ModeStatus", 5: "B5", 6: "Turbo", 7: "B7", 8: "B8", 9: "CRT", 10: "FanSpeed", 11: "B11", 12: "B12", 13: "B13", 14: "B14", 15: "B15", 16: "TypeDiff (0x00)", }, 0x14: { 4: "B4", 5: "B5", 6: "B6", 7: "B7 (0x10)", 8: "B8", 9: "B9", 10: "B10", 11: "B11", 12: "B12", 13: "B13", 14: "B14", 15: "B15", 16: "B16", }, 0x84: { 4: "FanMode (0x09-0x0E)", 5: "B5", 6: "B6", 7: "B7 (0x00/0x80)", 8: "B8", 9: "B9", 10: "B10", 11: "B11", 12: "B12", 13: "B13", 14: "B14", 15: "B15", 16: "B16", }, } # Steinhart-Hart NTC conversion for use in transform expressions # Usage in transform: "sh_b7(val)" or "sh_b8(val)" or "sh_b9(val)" def _sh_temp(raw, rpull, A, B, C): """Steinhart-Hart: raw 8-bit ADC → °C. Returns NaN for open/short.""" if raw <= 2 or raw >= 254: return float('nan') r = rpull * (255.0 / raw - 1.0) ln_r = math.log(r) return 1.0 / (A + B * ln_r + C * ln_r ** 3) - 273.15 def sh_b7(val): """ODT discharge: 20kΩ NTC + 2kΩ pull""" return _sh_temp(val, 2000, 1.1362532432e-03, 2.0967549577e-04, 1.4371748865e-07) def sh_b8(val): """OPT coil: 5kΩ NTC + 5.1kΩ pull""" return _sh_temp(val, 5100, 1.2762540165e-03, 2.1830488166e-04, 3.5380619936e-07) def sh_b9(val): """OAT ambient: 5kΩ NTC + 5.1kΩ pull""" return _sh_temp(val, 5100, 1.3151700813e-03, 2.1027974713e-04, 4.0091436008e-07) def sh_idu(val): """IDU indoor coil/room temp: direct quadratic fit (non-standard circuit)""" if val <= 2 or val >= 254: return float('nan') inv_t = 4.0048320172e-03 + (-6.5182245701e-06) * val + 7.8572599935e-09 * val * val if inv_t <= 0: return float('nan') return 1.0 / inv_t - 273.15 TRANSFORM_PRESETS = [ ("val", "Raw value"), ("sh_b7(val)", "ODT °C (20k NTC)"), ("sh_b8(val)", "OPT °C (5k NTC)"), ("sh_b9(val)", "OAT °C (5k NTC)"), ("sh_idu(val)", "IDU °C (coil/room)"), ("val - 32", "CRT (val-32)"), ("val * 60", "RPS→RPM"), ("(val & 0x0F)", "Lower nibble"), ("(val >> 4)", "Upper nibble"), ] TRACE_COLORS = [ "#FF5252", "#42A5F5", "#66BB6A", "#FFF176", "#AB47BC", "#FF7043", "#26C6DA", "#9CCC65", "#FFCA28", "#7E57C2", "#EF5350", "#29B6F6", "#81C784", "#FFD54F", "#CE93D8", ] # ── Packet Store ──────────────────────────────────────────────────────────── class PacketStore: """In-memory store of all parsed packets, indexed by type.""" def __init__(self): self.packets = [] # [(rel_time, pkt_type, raw_bytes), ...] self.by_type = defaultdict(list) # type -> [(rel_time, raw_bytes), ...] self.t0 = None def clear(self): self.packets.clear() self.by_type.clear() self.t0 = None def add(self, ts: float, pkt_type: int, raw: list): if self.t0 is None: self.t0 = ts rel = ts - self.t0 entry = (rel, raw) self.packets.append((rel, pkt_type, raw)) self.by_type[pkt_type].append(entry) def get_trace(self, pkt_type: int, byte_idx: int, mask: int = 0xFF, transform: str = "val") -> tuple: """Extract (times, values) for a given byte with optional mask and transform.""" data = self.by_type.get(pkt_type, []) if not data: return np.array([]), np.array([]) times = [] values = [] for rel, raw in data: raw_val = raw[byte_idx] masked = raw_val & mask try: val = masked # noqa: F841 — used in eval transformed = eval(transform) # noqa: S307 except Exception: transformed = masked times.append(rel) values.append(transformed) return np.array(times), np.array(values, dtype=float) @property def duration(self): if not self.packets: return 0 return self.packets[-1][0] @property def count(self): return len(self.packets) # ── Trace Widget ──────────────────────────────────────────────────────────── class TraceWidget(QGroupBox): """UI controls for one data trace.""" changed = pyqtSignal() delete_requested = pyqtSignal(object) _color_idx = 0 def __init__(self, name: str = "", pkt_type: int = 0x02, byte_idx: int = 7, transform: str = "val", color: str = None): super().__init__() self._building = True if color is None: color = TRACE_COLORS[TraceWidget._color_idx % len(TRACE_COLORS)] TraceWidget._color_idx += 1 self._color = QColor(color) layout = QGridLayout(self) layout.setContentsMargins(4, 8, 4, 4) layout.setSpacing(3) row = 0 # Name + color + show/delete self.name_edit = QLineEdit(name) self.name_edit.setPlaceholderText("Trace name") self.name_edit.setFixedWidth(100) self.name_edit.textChanged.connect(self._on_change) self.color_btn = QPushButton() self.color_btn.setFixedSize(24, 24) self._update_color_btn() self.color_btn.clicked.connect(self._pick_color) self.show_cb = QCheckBox("Show") self.show_cb.setChecked(True) self.show_cb.stateChanged.connect(self._on_change) del_btn = QPushButton("✕") del_btn.setFixedSize(24, 24) del_btn.setStyleSheet("color: #EF5350;") del_btn.clicked.connect(lambda: self.delete_requested.emit(self)) hdr = QHBoxLayout() hdr.addWidget(self.name_edit) hdr.addWidget(self.color_btn) hdr.addWidget(self.show_cb) hdr.addStretch() hdr.addWidget(del_btn) layout.addLayout(hdr, row, 0, 1, 2) row += 1 # Packet type layout.addWidget(QLabel("Type:"), row, 0) self.type_combo = QComboBox() for pt, (short, desc, _) in PKT_TYPES.items(): self.type_combo.addItem(f"0x{pt:02X} {short}", pt) # Set initial for i in range(self.type_combo.count()): if self.type_combo.itemData(i) == pkt_type: self.type_combo.setCurrentIndex(i) break self.type_combo.currentIndexChanged.connect(self._on_type_change) layout.addWidget(self.type_combo, row, 1) row += 1 # Byte index layout.addWidget(QLabel("Byte:"), row, 0) self.byte_combo = QComboBox() self._populate_bytes(pkt_type) # Set initial byte for i in range(self.byte_combo.count()): if self.byte_combo.itemData(i) == byte_idx: self.byte_combo.setCurrentIndex(i) break self.byte_combo.currentIndexChanged.connect(self._on_change) layout.addWidget(self.byte_combo, row, 1) row += 1 # Transform layout.addWidget(QLabel("Transform:"), row, 0) self.transform_combo = QComboBox() self.transform_combo.setEditable(True) for expr, desc in TRANSFORM_PRESETS: self.transform_combo.addItem(f"{expr} — {desc}", expr) self.transform_combo.setCurrentText(transform) self.transform_combo.currentTextChanged.connect(self._on_change) layout.addWidget(self.transform_combo, row, 1) row += 1 # Bit mask layout.addWidget(QLabel("Bit mask:"), row, 0) mask_w = QWidget() mask_layout = QHBoxLayout(mask_w) mask_layout.setContentsMargins(0, 0, 0, 0) mask_layout.setSpacing(1) self.bit_cbs = [] for bit in range(7, -1, -1): cb = QCheckBox(str(bit)) cb.setChecked(True) cb.setFixedWidth(30) cb.stateChanged.connect(self._on_change) mask_layout.addWidget(cb) self.bit_cbs.append((bit, cb)) layout.addWidget(mask_w, row, 1) row += 1 self._update_title() self._building = False def _populate_bytes(self, pkt_type): self.byte_combo.blockSignals(True) self.byte_combo.clear() names = BYTE_NAMES.get(pkt_type, {}) for idx in range(4, 17): label = names.get(idx, f"B{idx}") self.byte_combo.addItem(f"B{idx}: {label}", idx) self.byte_combo.blockSignals(False) def _on_type_change(self): pt = self.type_combo.currentData() if pt is not None: self._populate_bytes(pt) self._on_change() def _on_change(self): self._update_title() if not self._building: self.changed.emit() def _update_title(self): name = self.name_edit.text() or "?" pt = self.type_combo.currentData() bi = self.byte_combo.currentData() self.setTitle(f"{name} [0x{pt:02X} B{bi}]" if pt and bi else name) def _pick_color(self): c = QColorDialog.getColor(self._color, self, "Trace Color") if c.isValid(): self._color = c self._update_color_btn() self._on_change() def _update_color_btn(self): self.color_btn.setStyleSheet( f"background-color: {self._color.name()}; border: 1px solid #555;" ) @property def pkt_type(self) -> int: return self.type_combo.currentData() or 0x02 @property def byte_idx(self) -> int: return self.byte_combo.currentData() or 4 @property def transform(self) -> str: text = self.transform_combo.currentText() # Strip description suffix if user selected a preset if " —" in text: return text.split(" —")[0].strip() return text @property def mask(self) -> int: m = 0 for bit, cb in self.bit_cbs: if cb.isChecked(): m |= (1 << bit) return m @property def color(self) -> QColor: return self._color @property def visible(self) -> bool: return self.show_cb.isChecked() @property def trace_name(self) -> str: return self.name_edit.text() or f"0x{self.pkt_type:02X}_B{self.byte_idx}" # ── Math Trace Widget ─────────────────────────────────────────────────────── class MathTraceWidget(QGroupBox): """UI controls for a computed math trace referencing other traces by name.""" changed = pyqtSignal() delete_requested = pyqtSignal(object) _color_idx = 0 def __init__(self, name: str = "", formula: str = "", color: str = None): super().__init__() if color is None: color = TRACE_COLORS[(7 + MathTraceWidget._color_idx) % len(TRACE_COLORS)] MathTraceWidget._color_idx += 1 self._color = QColor(color) layout = QGridLayout(self) layout.setContentsMargins(4, 8, 4, 4) layout.setSpacing(3) row = 0 # Header self.name_edit = QLineEdit(name) self.name_edit.setPlaceholderText("Math trace name") self.name_edit.setFixedWidth(100) self.name_edit.textChanged.connect(self._on_change) self.color_btn = QPushButton() self.color_btn.setFixedSize(24, 24) self._update_color_btn() self.color_btn.clicked.connect(self._pick_color) self.show_cb = QCheckBox("Show") self.show_cb.setChecked(True) self.show_cb.stateChanged.connect(self._on_change) del_btn = QPushButton("✕") del_btn.setFixedSize(24, 24) del_btn.setStyleSheet("color: #EF5350;") del_btn.clicked.connect(lambda: self.delete_requested.emit(self)) hdr = QHBoxLayout() hdr.addWidget(self.name_edit) hdr.addWidget(self.color_btn) hdr.addWidget(self.show_cb) hdr.addStretch() hdr.addWidget(del_btn) layout.addLayout(hdr, row, 0, 1, 2) row += 1 # Formula layout.addWidget(QLabel("Formula:"), row, 0) self.formula_edit = QLineEdit(formula) self.formula_edit.setPlaceholderText("e.g. ODT - OPT") self.formula_edit.editingFinished.connect(self._on_change) layout.addWidget(self.formula_edit, row, 1) row += 1 # Info info = QLabel("Use trace names as variables. np.* available.") info.setStyleSheet("color: #78909C; font-size: 10px;") layout.addWidget(info, row, 0, 1, 2) self._update_title() def _on_change(self): self._update_title() self.changed.emit() def _update_title(self): name = self.name_edit.text() or "Math" self.setTitle(f"Math: {name}") def _pick_color(self): c = QColorDialog.getColor(self._color, self, "Math Trace Color") if c.isValid(): self._color = c self._update_color_btn() self._on_change() def _update_color_btn(self): self.color_btn.setStyleSheet( f"background-color: {self._color.name()}; border: 1px solid #555;" ) @property def formula(self) -> str: return self.formula_edit.text() @property def color(self) -> QColor: return self._color @property def visible(self) -> bool: return self.show_cb.isChecked() @property def trace_name(self) -> str: return self.name_edit.text() or "Math" # ── Main Window ───────────────────────────────────────────────────────────── class AnalyzerWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("TCL S-Line Protocol Analyzer") self.resize(1500, 900) self._store = PacketStore() self._parser = PacketParser() self._parser.packet_ready.connect(self._on_packet) self._socket = None self._replay_thread = None self._replay_worker = None self._traces = [] # TraceWidget list self._math_traces = [] # MathTraceWidget list self._curves = {} # widget -> PlotCurveItem self._plot_dirty = False self._build_ui() self._apply_dark_theme() # Update plot at 10 Hz self._plot_timer = QTimer() self._plot_timer.setInterval(100) self._plot_timer.timeout.connect(self._update_plot) self._plot_timer.start() def _build_ui(self): central = QWidget() self.setCentralWidget(central) main_layout = QVBoxLayout(central) main_layout.setContentsMargins(4, 4, 4, 4) main_layout.setSpacing(4) # ── Toolbar ── toolbar = QHBoxLayout() load_btn = QPushButton("Load JSONL...") load_btn.clicked.connect(self._load_file) toolbar.addWidget(load_btn) toolbar.addSpacing(10) toolbar.addWidget(QLabel("TCP:")) self._ip_edit = QLineEdit("192.168.0.57") self._ip_edit.setFixedWidth(120) toolbar.addWidget(self._ip_edit) toolbar.addWidget(QLabel(":")) self._port_spin = QSpinBox() self._port_spin.setRange(1, 65535) self._port_spin.setValue(4001) self._port_spin.setFixedWidth(70) toolbar.addWidget(self._port_spin) self._connect_btn = QPushButton("Connect") self._connect_btn.setCheckable(True) self._connect_btn.clicked.connect(self._toggle_connection) toolbar.addWidget(self._connect_btn) self._status_lbl = QLabel(" No data") self._status_lbl.setStyleSheet("color: #90A4AE;") toolbar.addWidget(self._status_lbl) toolbar.addSpacing(20) # Presets menu preset_btn = QPushButton("Presets ▾") preset_menu = QMenu(self) for label, fn in [ ("Temperatures (0x02)", self._preset_temps), ("Command (0x01)", self._preset_cmd), ("Fan / CRT (0x81+0x84)", self._preset_fan), ("EEV (0x01)", self._preset_eev), ("All 0x02 raw", self._preset_all_02), ("Clear all traces", self._clear_traces), ]: preset_menu.addAction(label, fn) preset_btn.setMenu(preset_menu) toolbar.addWidget(preset_btn) toolbar.addStretch() self._stats_lbl = QLabel("") self._stats_lbl.setStyleSheet("color: #B0BEC5;") toolbar.addWidget(self._stats_lbl) main_layout.addLayout(toolbar) # ── Main splitter ── splitter = QSplitter(Qt.Orientation.Horizontal) # Left: trace config panel (scrollable) left_scroll = QScrollArea() left_scroll.setWidgetResizable(True) left_scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) left_scroll.setMinimumWidth(300) left_scroll.setMaximumWidth(400) self._trace_panel = QWidget() self._trace_layout = QVBoxLayout(self._trace_panel) self._trace_layout.setContentsMargins(4, 4, 4, 4) self._trace_layout.setSpacing(4) btn_row = QHBoxLayout() add_trace_btn = QPushButton("+ Add Trace") add_trace_btn.clicked.connect(lambda: self._add_trace()) btn_row.addWidget(add_trace_btn) add_math_btn = QPushButton("+ Add Math") add_math_btn.clicked.connect(lambda: self._add_math_trace()) btn_row.addWidget(add_math_btn) self._trace_layout.addLayout(btn_row) self._traces_container = QVBoxLayout() self._trace_layout.addLayout(self._traces_container) self._trace_layout.addStretch() left_scroll.setWidget(self._trace_panel) splitter.addWidget(left_scroll) # Right: plot self._plot = pg.PlotWidget() self._plot.setBackground("#1E1E1E") self._plot.showGrid(x=True, y=True, alpha=0.3) self._plot.setLabel("left", "Value") self._plot.setLabel("bottom", "Time", units="s") self._plot.addLegend(offset=(10, 10)) # Independent X/Y zooming: scroll zooms X, Ctrl+scroll zooms Y self._plot.getViewBox().setMouseMode(pg.ViewBox.RectMode) pg.setConfigOptions(antialias=True) # Crosshair self._vline = pg.InfiniteLine(angle=90, movable=False, pen=pg.mkPen("#555", width=1)) self._hline = pg.InfiniteLine(angle=0, movable=False, pen=pg.mkPen("#555", width=1)) self._plot.addItem(self._vline, ignoreBounds=True) self._plot.addItem(self._hline, ignoreBounds=True) self._crosshair_label = pg.TextItem("", anchor=(0, 1), color="#AAA") self._plot.addItem(self._crosshair_label, ignoreBounds=True) self._plot.scene().sigMouseMoved.connect(self._on_mouse_moved) splitter.addWidget(self._plot) splitter.setSizes([320, 1180]) main_layout.addWidget(splitter) def _apply_dark_theme(self): app = QApplication.instance() app.setStyle("Fusion") palette = QPalette() palette.setColor(QPalette.ColorRole.Window, QColor(30, 30, 30)) palette.setColor(QPalette.ColorRole.WindowText, QColor(220, 220, 220)) palette.setColor(QPalette.ColorRole.Base, QColor(25, 25, 25)) palette.setColor(QPalette.ColorRole.AlternateBase, QColor(35, 35, 35)) palette.setColor(QPalette.ColorRole.Text, QColor(220, 220, 220)) palette.setColor(QPalette.ColorRole.Button, QColor(45, 45, 45)) palette.setColor(QPalette.ColorRole.ButtonText, QColor(220, 220, 220)) palette.setColor(QPalette.ColorRole.Highlight, QColor(66, 165, 245)) palette.setColor(QPalette.ColorRole.HighlightedText, QColor(0, 0, 0)) app.setPalette(palette) # ── Trace management ── def _add_trace(self, name="", pkt_type=0x02, byte_idx=7, transform="val", color=None) -> TraceWidget: tw = TraceWidget(name, pkt_type, byte_idx, transform, color) tw.changed.connect(self._mark_dirty) tw.delete_requested.connect(self._remove_trace) self._traces.append(tw) # Insert before the stretch self._traces_container.addWidget(tw) self._mark_dirty() return tw def _add_math_trace(self, name="", formula="", color=None) -> MathTraceWidget: mw = MathTraceWidget(name, formula, color) mw.changed.connect(self._mark_dirty) mw.delete_requested.connect(self._remove_trace) self._math_traces.append(mw) self._traces_container.addWidget(mw) self._mark_dirty() return mw def _remove_trace(self, widget): if widget in self._traces: self._traces.remove(widget) if widget in self._math_traces: self._math_traces.remove(widget) if widget in self._curves: self._plot.removeItem(self._curves[widget]) del self._curves[widget] widget.setParent(None) widget.deleteLater() self._mark_dirty() def _clear_traces(self): for tw in list(self._traces): self._remove_trace(tw) for mw in list(self._math_traces): self._remove_trace(mw) TraceWidget._color_idx = 0 MathTraceWidget._color_idx = 0 def _mark_dirty(self): self._plot_dirty = True # ── Presets ── def _preset_temps(self): self._clear_traces() self._add_trace("ODT", 0x02, 7, "sh_b7(val)", "#FF5252") self._add_trace("OPT", 0x02, 8, "sh_b8(val)", "#42A5F5") self._add_trace("OAT", 0x02, 9, "sh_b9(val)", "#66BB6A") self._add_trace("IDU Coil", 0x81, 11, "sh_idu(val)", "#FFB74D") self._add_trace("IDU Room", 0x81, 12, "sh_idu(val)", "#CE93D8") self._add_trace("B11", 0x02, 11, "val", "#78909C") self._add_trace("Freq", 0x01, 5, "val", "#FFB74D") def _preset_cmd(self): self._clear_traces() self._add_trace("Freq", 0x01, 5, "val", "#FFB74D") self._add_trace("Valve", 0x01, 6, "val", "#42A5F5") self._add_trace("Mode", 0x01, 4, "val", "#FF5252") self._add_trace("EEV_B13", 0x01, 13, "val", "#AB47BC") self._add_trace("ModeFlg", 0x01, 16, "val", "#66BB6A") def _preset_fan(self): self._clear_traces() self._add_trace("CRT", 0x81, 9, "val", "#FFF176") self._add_trace("FanSpd", 0x81, 10, "val", "#42A5F5") self._add_trace("FanMode", 0x84, 4, "val", "#FF7043") self._add_trace("Turbo", 0x81, 6, "val", "#FF5252") def _preset_eev(self): self._clear_traces() self._add_trace("EEV_B13", 0x01, 13, "val", "#CE93D8") self._add_trace("EEV_B12", 0x01, 12, "val", "#78909C") self._add_trace("Freq", 0x01, 5, "val", "#FFB74D") def _preset_all_02(self): self._clear_traces() colors = TRACE_COLORS[:13] for i, idx in enumerate(range(4, 17)): names = BYTE_NAMES.get(0x02, {}) name = f"B{idx}" self._add_trace(name, 0x02, idx, "val", colors[i]) # ── Data loading ── def _load_file(self): filepath, _ = QFileDialog.getOpenFileName( self, "Open Capture File", ".", "JSONL files (*.jsonl);;All files (*)" ) if not filepath: return self._store.clear() self._parser.reset() self._status_lbl.setText(f" Loading {Path(filepath).name}...") self._status_lbl.setStyleSheet("color: #FFF176;") QApplication.processEvents() # Parse synchronously (fast enough for ~1M lines) count = 0 with open(filepath) as f: for line in f: rec = json.loads(line) ts = rec["ts"] byte_val = int(rec["hex"], 16) self._parser.feed(ts, byte_val) count += 1 self._status_lbl.setText( f" {Path(filepath).name}: {self._store.count} packets, " f"{self._store.duration:.0f}s ({self._store.duration/3600:.1f}h)" ) self._status_lbl.setStyleSheet("color: #69F0AE;") self._stats_lbl.setText(f"{count} bytes → {self._store.count} packets") self._mark_dirty() def _on_packet(self, pkt): """Called by PacketParser for each decoded packet.""" self._store.add(pkt.ts, pkt.pkt_type, pkt.raw) self._plot_dirty = True # ── TCP ── def _toggle_connection(self, checked): if checked: host = self._ip_edit.text().strip() port = self._port_spin.value() self._socket = QTcpSocket() self._socket.readyRead.connect(self._on_socket_data) self._socket.connected.connect( lambda: self._status_lbl.setText(" Connected (live)") ) self._socket.connected.connect( lambda: self._status_lbl.setStyleSheet("color: #69F0AE;") ) self._socket.disconnected.connect(self._on_disconnect) self._socket.errorOccurred.connect( lambda e: self._status_lbl.setText(f" Error: {self._socket.errorString()}") ) self._socket.connectToHost(host, port) self._status_lbl.setText(f" Connecting to {host}:{port}...") self._status_lbl.setStyleSheet("color: #FFF176;") else: if self._socket: self._socket.close() self._socket = None self._status_lbl.setText(" Disconnected") self._status_lbl.setStyleSheet("color: #90A4AE;") def _on_disconnect(self): self._connect_btn.setChecked(False) self._status_lbl.setText(" Disconnected") self._status_lbl.setStyleSheet("color: #EF5350;") def _on_socket_data(self): if not self._socket: return data = self._socket.readAll() ts = time.time() for i in range(data.size()): byte_val = data.at(i) if isinstance(byte_val, bytes): byte_val = byte_val[0] self._parser.feed(ts, byte_val) # ── Plot ── def _update_plot(self): if not self._plot_dirty: return self._plot_dirty = False # Remove old curves that don't have widgets anymore for w in list(self._curves.keys()): if w not in self._traces and w not in self._math_traces: self._plot.removeItem(self._curves[w]) del self._curves[w] # Collect named trace data for math traces trace_data = {} # Update regular traces for tw in self._traces: if tw not in self._curves: curve = self._plot.plot(pen=pg.mkPen(tw.color, width=2), name=tw.trace_name) self._curves[tw] = curve curve = self._curves[tw] # Update pen color/name curve.setPen(pg.mkPen(tw.color, width=2)) if tw.visible and self._store.count > 0: times, values = self._store.get_trace( tw.pkt_type, tw.byte_idx, tw.mask, tw.transform ) curve.setData(times, values) curve.show() trace_data[tw.trace_name] = (times, values) else: curve.hide() # Update math traces for mw in self._math_traces: if mw not in self._curves: curve = self._plot.plot( pen=pg.mkPen(mw.color, width=2, style=Qt.PenStyle.DashLine), name=mw.trace_name, ) self._curves[mw] = curve curve = self._curves[mw] curve.setPen(pg.mkPen(mw.color, width=2, style=Qt.PenStyle.DashLine)) if mw.visible and mw.formula and trace_data: try: result_times, result_values = self._eval_math(mw.formula, trace_data) curve.setData(result_times, result_values) curve.show() except Exception: curve.hide() else: curve.hide() # Update stats self._stats_lbl.setText( f"{self._store.count} packets | {self._store.duration:.0f}s" ) def _eval_math(self, formula: str, trace_data: dict): """Evaluate a math formula using trace names as variables.""" # Find a reference trace for time axis (use the one with most points) ref_name = max(trace_data, key=lambda k: len(trace_data[k][0])) ref_times = trace_data[ref_name][0] if len(ref_times) == 0: return np.array([]), np.array([]) # Build namespace: interpolate all traces to reference time axis ns = {"np": np} for name, (times, values) in trace_data.items(): safe_name = name.replace(" ", "_").replace("(", "").replace(")", "") if len(times) > 0: ns[safe_name] = np.interp(ref_times, times, values) else: ns[safe_name] = np.zeros_like(ref_times) # Also provide original names with special chars for name, (times, values) in trace_data.items(): if len(times) > 0: ns[name] = np.interp(ref_times, times, values) result = eval(formula, ns) # noqa: S307 return ref_times, np.asarray(result, dtype=float) def _on_mouse_moved(self, pos): vb = self._plot.getViewBox() if self._plot.sceneBoundingRect().contains(pos): mouse_point = vb.mapSceneToView(pos) x = mouse_point.x() y = mouse_point.y() self._vline.setPos(x) self._hline.setPos(y) mins = int(x // 60) secs = x % 60 self._crosshair_label.setText(f"{mins}:{secs:05.2f} y={y:.1f}") self._crosshair_label.setPos(x, y) def closeEvent(self, event): if self._socket: self._socket.close() if self._replay_worker: self._replay_worker.stop() if self._replay_thread: self._replay_thread.quit() self._replay_thread.wait() event.accept() # ── Entry point ───────────────────────────────────────────────────────────── def main(): app = QApplication(sys.argv) app.setApplicationName("TCL S-Line Protocol Analyzer") window = AnalyzerWindow() window.show() # Auto-load file if passed as argument if len(sys.argv) > 1: filepath = sys.argv[1] if Path(filepath).exists(): QTimer.singleShot(200, lambda: _auto_load(window, filepath)) sys.exit(app.exec()) def _auto_load(window, filepath): window._store.clear() window._parser.reset() window._status_lbl.setText(f" Loading {Path(filepath).name}...") QApplication.processEvents() count = 0 with open(filepath) as f: for line in f: rec = json.loads(line) window._parser.feed(rec["ts"], int(rec["hex"], 16)) count += 1 window._status_lbl.setText( f" {Path(filepath).name}: {window._store.count} packets, " f"{window._store.duration:.0f}s" ) window._status_lbl.setStyleSheet("color: #69F0AE;") window._mark_dirty() if __name__ == "__main__": main()