#! /usr/bin/env python3
# /// script
# dependencies = [
# "pandas",
# "matplotlib",
# "marpledata",
# "numpy",
# ]
# ///
"""
Parameters
"""
API_TOKEN = "<MDB_TOKEN>" # your Marple DB token here
STREAM = "Datastream" # your Marple DB datastream name here
DATASET = "file1.h5" # your file here
PITCH_SIG = "PitchAngle"
ROLL_SIG = "RollAngle"
SPD_SIG = "TrueAirspeedTAS"
ALT_SIG = "AltitudeMSL"
SEG_SIG = "segment"
OUT_GIF = "pfd_stall.gif"
N_FRAMES = 450
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon, Rectangle
from matplotlib.animation import FuncAnimation, PillowWriter
from matplotlib.transforms import Affine2D
from marple import DB
def dataset_label(ds):
return getattr(ds, "path", None) or getattr(ds, "name", None) or f"id={getattr(ds, 'id', 'unknown')}"
db = DB(API_TOKEN)
stream = db.get_stream(STREAM)
datasets = stream.get_datasets().wait_for_import().where_imported()
numeric_df = None
segment_df = None
for ds, df in datasets.get_data([PITCH_SIG, ROLL_SIG, SPD_SIG, ALT_SIG], resample_rule="1s"):
if dataset_label(ds) == DATASET:
numeric_df = df.copy()
break
for ds, df in datasets.get_data([SEG_SIG]):
if dataset_label(ds) == DATASET:
segment_df = df.copy()
break
if numeric_df is None:
raise ValueError("Dataset not found")
numeric_df = numeric_df[[PITCH_SIG, ROLL_SIG, SPD_SIG, ALT_SIG]].copy()
for col in [PITCH_SIG, ROLL_SIG, SPD_SIG, ALT_SIG]:
numeric_df[col] = pd.to_numeric(numeric_df[col], errors="coerce")
numeric_df = numeric_df.dropna(subset=[PITCH_SIG, ROLL_SIG, SPD_SIG, ALT_SIG])
if numeric_df.empty:
raise ValueError("No valid numeric rows were returned")
if segment_df is not None and SEG_SIG in segment_df.columns:
num = (
numeric_df.sort_index()
.reset_index()
.rename(columns={numeric_df.index.name or "index": "time"})
)
seg = (
segment_df[[SEG_SIG]]
.copy()
.sort_index()
.reset_index()
.rename(columns={segment_df.index.name or "index": "time"})
)
seg[SEG_SIG] = seg[SEG_SIG].astype(str)
df = pd.merge_asof(
num.sort_values("time"),
seg.sort_values("time"),
on="time",
direction="backward"
).set_index("time")
df[SEG_SIG] = df[SEG_SIG].fillna("Unknown")
else:
df = numeric_df.copy()
df[SEG_SIG] = "Unknown"
stall_df = df[df[SEG_SIG].astype(str).str.contains("stall", case=False, na=False)].copy()
if stall_df.empty:
raise ValueError("No stall segments found in this dataset")
pitch = stall_df[PITCH_SIG].to_numpy()
roll = stall_df[ROLL_SIG].to_numpy()
speed = stall_df[SPD_SIG].to_numpy()
altitude = stall_df[ALT_SIG].to_numpy()
segment = stall_df[SEG_SIG].astype(str).to_numpy()
if len(stall_df) > N_FRAMES:
idx = np.linspace(0, len(stall_df) - 1, N_FRAMES).astype(int)
pitch = pitch[idx]
roll = roll[idx]
speed = speed[idx]
altitude = altitude[idx]
segment = segment[idx]
fig, ax = plt.subplots(figsize=(9, 7))
fig.patch.set_facecolor("black")
ax.set_facecolor("black")
ax.set_xlim(-1.55, 1.55)
ax.set_ylim(-1.15, 1.15)
ax.set_aspect("equal")
ax.axis("off")
att_x0, att_y0 = -0.78, -0.60
att_w, att_h = 1.56, 1.20
att_clip = Rectangle((att_x0, att_y0), att_w, att_h, transform=ax.transData)
sky = Polygon([[-4, 0], [4, 0], [4, 4], [-4, 4]], closed=True, color="#4da6ff")
ground = Polygon([[-4, -4], [4, -4], [4, 0], [-4, 0]], closed=True, color="#9c5a1a")
horizon_line, = ax.plot([-4, 4], [0, 0], color="white", linewidth=2)
ax.add_patch(sky)
ax.add_patch(ground)
for artist in [sky, ground, horizon_line]:
artist.set_clip_path(att_clip)
pitch_lines = []
pitch_labels = []
for deg in range(-30, 31, 5):
if deg == 0:
continue
half = 0.30 if deg % 10 == 0 else 0.18
line, = ax.plot([-half, half], [deg / 30.0, deg / 30.0], color="white", linewidth=1.2)
line.set_clip_path(att_clip)
pitch_lines.append((deg, line))
if deg % 10 == 0:
t1 = ax.text(
-half - 0.06, deg / 30.0, f"{abs(deg)}",
color="white", fontsize=9, ha="right", va="center",
fontweight="bold", zorder=12
)
t2 = ax.text(
half + 0.06, deg / 30.0, f"{abs(deg)}",
color="white", fontsize=9, ha="left", va="center",
fontweight="bold", zorder=12
)
pitch_labels.append((deg, t1, t2))
ax.add_patch(Rectangle((att_x0, att_y0), att_w, att_h, fill=False, edgecolor="white", linewidth=2, zorder=20))
ax.add_patch(Rectangle((-2, 0.60), 4, 1.0, facecolor="black", edgecolor="none", zorder=30))
ax.add_patch(Rectangle((-2, -1.20), 4, 0.60, facecolor="black", edgecolor="none", zorder=30))
ax.add_patch(Rectangle((att_x0, att_y0), att_w, att_h, fill=False, edgecolor="white", linewidth=2, zorder=31))
ax.plot([-0.30, -0.07], [0, 0], color="yellow", linewidth=3, zorder=32)
ax.plot([0.07, 0.30], [0, 0], color="yellow", linewidth=3, zorder=32)
ax.plot([0, 0], [-0.06, 0.06], color="yellow", linewidth=3, zorder=32)
bank_pointer = Polygon([[0, 0.72], [-0.03, 0.66], [0.03, 0.66]], closed=True, color="white", zorder=32)
ax.add_patch(bank_pointer)
speed_x0 = -1.11
alt_x0 = 0.83
tape_w = 0.28
tape_h = 1.20
speed_box = Rectangle((speed_x0, -0.60), tape_w, tape_h, facecolor="#101010", edgecolor="white", linewidth=1.5, zorder=10)
alt_box = Rectangle((alt_x0, -0.60), tape_w, tape_h, facecolor="#101010", edgecolor="white", linewidth=1.5, zorder=10)
ax.add_patch(speed_box)
ax.add_patch(alt_box)
speed_window = Rectangle((speed_x0 - 0.02, -0.10), tape_w + 0.04, 0.20, facecolor="white", edgecolor="white", linewidth=1.5, zorder=40)
alt_window = Rectangle((alt_x0 - 0.02, -0.10), tape_w + 0.04, 0.20, facecolor="white", edgecolor="white", linewidth=1.5, zorder=40)
ax.add_patch(speed_window)
ax.add_patch(alt_window)
speed_value = ax.text(speed_x0 + tape_w / 2, 0.0, "", color="black", fontsize=14, ha="center", va="center", fontweight="bold", zorder=41)
alt_value = ax.text(alt_x0 + tape_w / 2, 0.0, "", color="black", fontsize=14, ha="center", va="center", fontweight="bold", zorder=41)
speed_ticks = []
alt_ticks = []
segment_box = Rectangle((-0.70, -1.05), 1.40, 0.20, facecolor="#101010", edgecolor="white", linewidth=1.2, zorder=50)
ax.add_patch(segment_box)
segment_title = ax.text(0, -0.93, "Current Segment", color="white", fontsize=10, ha="center", va="center", fontweight="bold", zorder=51)
segment_text = ax.text(0, -1.00, "", color="white", fontsize=11, ha="center", va="center", zorder=51)
def clear_tick_artists(tick_list):
for artist in tick_list:
artist.remove()
tick_list.clear()
def draw_left_tape(center_value, tick_store, step):
clear_tick_artists(tick_store)
for d in range(-4, 5):
val = center_value + d * step
y = d * 0.14
line = ax.plot([speed_x0 + 0.03, speed_x0 + 0.09], [y, y], color="white", linewidth=1, zorder=12)[0]
txt = ax.text(speed_x0 + 0.11, y, f"{val:.0f}", color="white", fontsize=9, ha="left", va="center", zorder=12)
tick_store.extend([line, txt])
def draw_right_tape(center_value, tick_store, step):
clear_tick_artists(tick_store)
for d in range(-4, 5):
val = center_value + d * step
y = d * 0.14
line = ax.plot([alt_x0 + 0.03, alt_x0 + 0.09], [y, y], color="white", linewidth=1, zorder=12)[0]
txt = ax.text(alt_x0 + 0.11, y, f"{val:.0f}", color="white", fontsize=9, ha="left", va="center", zorder=12)
tick_store.extend([line, txt])
def update(i):
p = float(pitch[i])
r = float(roll[i])
spd = float(speed[i])
alt = float(altitude[i])
seg = str(segment[i])
pitch_offset = p / 25.0
trans = Affine2D().translate(0, -pitch_offset).rotate_deg_around(0, 0, -r) + ax.transData
sky.set_transform(trans)
ground.set_transform(trans)
horizon_line.set_transform(trans)
for deg, line in pitch_lines:
half = 0.30 if deg % 10 == 0 else 0.18
y = deg / 30.0
line.set_data([-half, half], [y, y])
line.set_transform(trans)
for deg, t1, t2 in pitch_labels:
half = 0.30
y = deg / 30.0
t1.set_position((-half - 0.06, y))
t2.set_position((half + 0.06, y))
t1.set_transform(trans)
t2.set_transform(trans)
speed_value.set_text(f"{spd:.0f}")
alt_value.set_text(f"{alt:.0f}")
segment_text.set_text(seg)
draw_left_tape(spd, speed_ticks, 10)
draw_right_tape(alt, alt_ticks, 100)
return [sky, ground, horizon_line, speed_value, alt_value, segment_title, segment_text] + speed_ticks + alt_ticks
anim = FuncAnimation(fig, update, frames=len(pitch), interval=200, blit=False)
anim.save(OUT_GIF, writer=PillowWriter(fps=5))
plt.close(fig)
print(f"Saved {OUT_GIF}")