Realtime stream
Stream live data via the DB SDK
Appending datasets in real time is simple using the SDK. The following script demonstrates how to stream 12 datapoints:
How It Works
The script performs the following steps:
Create a live datastream (if it doesn’t already exist)
Create a dataset to append the data to
(Optional) Add signal metadata
Append the dataset in real time
Note: This feature requires marpledata SDK version 2.2.1 or higher.
import time
import pandas as pd
from marple import DB
API_TOKEN = "your_api_token"
DB_URL = "https://db.marpledata.com/api/v1"
LIVE_STREAM_NAME = "Live Demo"
# Metadata and signals -- change as needed
DATASET_METADATA = {
"outing": "Outing 1",
"driver": "NV",
"track": "Spa",
"session": "FP",
}
SIGNALS = [
{'signal': 'm.speed', 'unit': 'm/s', 'description': 'Speed', 'group': 'speed', 'type': 'real'},
{'signal': 'm.throttle', 'unit': '%', 'description': 'Throttle position', 'group': 'controls', 'type': 'real'},
{'signal': 'm.lap', 'description': 'Current lap number', 'group': 'lap', 'type': 'real'}
]
# Example: start live data ingestion
db = DB(api_token=API_TOKEN, api_url=DB_URL)
# Optionally create a new datastream, if it does not already exist
try:
db.create_stream(name=LIVE_STREAM_NAME, description="Live Data Stream", type="realtime")
except Exception as e:
print(f"Stream {LIVE_STREAM_NAME} already exists")
# Start new live dataset
dataset_id = db.add_dataset(LIVE_STREAM_NAME, dataset_name=DATASET_METADATA["outing"], metadata=DATASET_METADATA)
# (Optional) Add signal metadata
db.upsert_signals(LIVE_STREAM_NAME, signals=SIGNALS, dataset_id=dataset_id)
# Ingest data in a loop
df = pd.DataFrame({
"m.speed": [44.320,45.305,46.483,47.283,48.763,48.041,48.959,50.577,51.152,52.745,53.333,53.444],
"m.throttle": [1] * 12,
"m.gear": [6] * 12
})
for i in range(df["m.speed"].size): # Simulate data pushes
sample = df[i:i+1].copy()
sample["time"] = time.time_ns()
print("Tick - adding sample")
print(sample.to_string(index=False))
db.dataset_append(LIVE_STREAM_NAME, dataset_id=dataset_id, data=sample)
time.sleep(1) # Simulate delay between data points
Last updated