Realtime stream

Stream live data via the DB SDK

Appending datasets in real time is simple using the SDK. The following script demonstrates how to stream 12 datapoints:

How It Works

The script performs the following steps:

  1. Create a live datastream (if it doesn’t already exist)

  2. Create a dataset to append the data to

  3. (Optional) Add signal metadata

  4. Append the dataset in real time

Note: This feature requires marpledata SDK version 2.2.1 or higher.

import time

import pandas as pd
from marple import DB

API_TOKEN = "your_api_token"
DB_URL = "https://db.marpledata.com/api/v1"
LIVE_STREAM_NAME = "Live Demo"

# Metadata and signals -- change as needed
DATASET_METADATA = {
        "outing": "Outing 1",
        "driver": "NV",
        "track": "Spa",
        "session": "FP",
    }
SIGNALS = [
    {'signal': 'm.speed', 'unit': 'm/s', 'description': 'Speed', 'group': 'speed', 'type': 'real'}, 
    {'signal': 'm.throttle', 'unit': '%', 'description': 'Throttle position', 'group': 'controls', 'type': 'real'}, 
    {'signal': 'm.lap', 'description': 'Current lap number', 'group': 'lap', 'type': 'real'}
]

# Example: start live data ingestion
db = DB(api_token=API_TOKEN, api_url=DB_URL)

# Optionally create a new datastream, if it does not already exist
try:
    db.create_stream(name=LIVE_STREAM_NAME, description="Live Data Stream", type="realtime")
except Exception as e:
    print(f"Stream {LIVE_STREAM_NAME} already exists")

# Start new live dataset
dataset_id = db.add_dataset(LIVE_STREAM_NAME, dataset_name=DATASET_METADATA["outing"], metadata=DATASET_METADATA)

# (Optional) Add signal metadata
db.upsert_signals(LIVE_STREAM_NAME, signals=SIGNALS, dataset_id=dataset_id)

# Ingest data in a loop
df = pd.DataFrame({
    "m.speed": [44.320,45.305,46.483,47.283,48.763,48.041,48.959,50.577,51.152,52.745,53.333,53.444],
    "m.throttle": [1] * 12,
    "m.gear": [6] * 12
})
for i in range(df["m.speed"].size):  # Simulate data pushes
    sample = df[i:i+1].copy()
    sample["time"] = time.time_ns()
    print("Tick - adding sample")
    print(sample.to_string(index=False))
    db.dataset_append(LIVE_STREAM_NAME, dataset_id=dataset_id, data=sample)
    time.sleep(1)  # Simulate delay between data points

Last updated