Skip to content

Stream API Reference

The Stream class is the main entry point for the framework.

Bases: Generic[T]

High-level Facade for Stream Processing using Composition.

Manages the lifecycle of a stream processor, including connection handling, schema validation, and the processing loop.

Attributes:

Name Type Description
backend StreamingBackend

The storage backend for stream operations.

schema Optional[Type[T]]

Pydantic model for data validation.

state_store Optional[StateStore]

Optional store for stateful aggregations.

processor BatchProcessor

Internal processor for batch handling.

telemetry TelemetryManager

Observability manager.

Source code in pspf/stream.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
class Stream(Generic[T]):
    """
    High-level Facade for Stream Processing using Composition.

    Manages the lifecycle of a stream processor, including connection handling,
    schema validation, and the processing loop.

    Attributes:
        backend (StreamingBackend): The storage backend for stream operations.
        schema (Optional[Type[T]]): Pydantic model for data validation.
        state_store (Optional[StateStore]): Optional store for stateful aggregations.
        processor (BatchProcessor): Internal processor for batch handling.
        telemetry (TelemetryManager): Observability manager.
    """
    def __init__(self, 
                 backend: Optional[StreamingBackend] = None,
                 topic: Optional[str] = None,
                 group: Optional[str] = None,
                 schema: Optional[Type[T]] = None,
                 state_store: Optional[StateStore] = None):
        """
        Initialize the Stream facade.

        Args:
            backend (Optional[StreamingBackend]): Configured backend instance.
            topic (Optional[str]): Stream topic (required if backend is None).
            group (Optional[str]): Consumer group (required if backend is None).
            schema (Optional[Type[T]]): Pydantic model class for validation. 
                                        If None, uses dynamic SchemaRegistry.
            state_store (Optional[StateStore]): Key-Value store for persistent state.
        """
        if backend is None:
            if not topic or not group:
                raise ValueError("Must provide either a 'backend', or both 'topic' and 'group' for auto-instantiation.")

            from pspf.settings import settings

            # Auto-selection logic
            use_valkey = False
            # If VALKEY_HOST is explicitly set to something other than localhost, we assume intent to use it.
            # Otherwise, we only use it if specifically requested or not in dev.
            if os.getenv("VALKEY_HOST") or os.getenv("PSPF_BACKEND") == "valkey":
                use_valkey = True
            elif settings.ENV != "dev":
                use_valkey = True

            if use_valkey:
                try:
                    from pspf.connectors.valkey import ValkeyConnector, ValkeyStreamBackend
                    import uuid
                    connector = ValkeyConnector(host=settings.valkey.HOST, port=settings.valkey.PORT)
                    worker_id = f"worker-{uuid.uuid4().hex[:8]}"
                    backend = ValkeyStreamBackend(connector, topic, group, worker_id)
                    logger.info(f"Auto-instantiated Valkey backend for topic '{topic}'")
                except Exception as e:
                    logger.warning(f"Failed to auto-instantiate Valkey backend: {e}. Falling back to MemoryBackend.")
                    from pspf.connectors.memory import MemoryBackend
                    backend = MemoryBackend(stream_key=topic, group_name=group)
            else:
                from pspf.connectors.memory import MemoryBackend
                backend = MemoryBackend(stream_key=topic, group_name=group)
                logger.debug(f"Auto-instantiated MemoryBackend for topic '{topic}' (ENV={settings.ENV})")

        self.backend = backend
        self.schema = schema
        self.state_store = state_store

        # Inject state_store into backend if it supports it
        if hasattr(self.backend, "state_store"):
            self.backend.state_store = state_store

        self.processor = BatchProcessor(self.backend)
        self.telemetry = TelemetryManager()
        self._subscriptions: List[Dict[str, Any]] = []
        self._processors: List[BatchProcessor] = []

    def subscribe(self, topic: str, batch_size: int = 10) -> Callable:
        """Decorator to register a stateless handler for a topic."""
        def decorator(func: Callable) -> Callable:
            self._subscriptions.append({
                "type": "subscribe",
                "topic": topic,
                "handler": func,
                "batch_size": batch_size
            })
            return func
        return decorator

    def window(self, topic: str, window: Window, batch_size: int = 10, watermark_delay_ms: int = 0) -> Callable:
        """Decorator to register a stateful aggregation handler for a topic."""
        def decorator(func: Callable) -> Callable:
            self._subscriptions.append({
                "type": "aggregate",
                "topic": topic,
                "window": window,
                "handler": func,
                "batch_size": batch_size,
                "watermark_delay_ms": watermark_delay_ms
            })
            return func
        return decorator

    async def run_forever(self) -> None:
        """Start the infinite processing loop for all registered decorators concurrently."""
        import asyncio
        if not self._subscriptions:
            logger.warning("No subscriptions registered. Exiting run_forever.")
            return

        await self.backend.connect()
        if self.state_store:
            await self.state_store.start()

        tasks = []
        self._processors = []
        for i, sub in enumerate(self._subscriptions):
            topic = sub["topic"]
            backend_clone = self.backend.clone_with_topic(topic)
            await backend_clone.ensure_group_exists()

            processor = BatchProcessor(backend_clone, state_store=self.state_store, start_admin_server=(i == 0))
            self._processors.append(processor)

            if sub["type"] == "subscribe":
                typed_handler = self._create_typed_handler(sub["handler"], topic)
                task = asyncio.create_task(processor.run_loop(typed_handler, batch_size=sub["batch_size"]))
                tasks.append(task)
            elif sub["type"] == "aggregate":
                if not self.state_store:
                    raise RuntimeError(f"A StateStore must be provided to use @stream.window on {topic}")
                typed_agg_handler = self._create_aggregation_handler(sub["handler"], topic, sub["window"], sub["watermark_delay_ms"], backend_clone)
                task = asyncio.create_task(processor.run_loop(typed_agg_handler, batch_size=sub["batch_size"]))
                tasks.append(task)

        logger.info(f"Starting {len(tasks)} Stream processors concurrently.")
        try:
            await asyncio.gather(*tasks)
        except asyncio.CancelledError:
            logger.info("run_forever cancelled.")
        finally:
            for p in self._processors:
                await p.shutdown()
            if self.state_store:
                await self.state_store.stop()

    async def stop(self) -> None:
        """Stop all running processors."""
        for p in self._processors:
            await p.shutdown()
        logger.info("Stream stopped.")

    async def health(self) -> Dict[str, str]:
        """
        Perform a health check on the stream components.

        Pings the backend to ensure connectivity.

        Returns:
            Dict[str, str]: A dictionary containing status ('ok' or 'error') 
                            and details.
        """
        try:
            # Assuming backend has a ping method (we will add it)
            await self.backend.ping()
            # Could also check if group exists
            return {"status": "ok", "backend": "connected"}
        except Exception as e:
            logger.error(f"Health check failed: {e}")
            return {"status": "error", "reason": str(e)}

    async def __aenter__(self) -> "Stream[T]":
        """
        Async context manager entry.

        Connects to the backend and ensures the consumer group exists.
        """
        # We delegate connection management to the backend's connector 
        # if the user hasn't already connected it.
        # But logically, the backend should be ready. 
        # We will ensure the group exists here.
        await self.backend.connect() # Idempotent-ish check inside
        await self.backend.ensure_group_exists()

        if self.state_store:
            await self.state_store.start()

        return self

    async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None:
        """
        Async context manager exit.

        Trigger shutdown of the processor and closes the backend connection.
        """
        if self.processor:
            await self.processor.shutdown()

        if self.state_store:
            await self.state_store.stop()

        # We close the connection here as we "took ownership" in aenter
        await self.backend.close()

    async def emit(self, event: Any, topic: Optional[str] = None) -> str:
        """
        Produce an event to the stream.

        Accepts either a Pydantic model or a dictionary.
        Serializes and injects tracing context before sending to the backend.

        Args:
            event (Any): The event object to publish (BaseModel or dict).
            topic (Optional[str]): The stream/topic to publish to. Defaults to backend's stream_key.

        Returns:
            str: The message ID generated by the backend.
        """
        if hasattr(event, "model_dump"):
            data = event.model_dump(mode='json')
            if "event_type" not in data:
                data["event_type"] = event.__class__.__name__
        elif isinstance(event, dict):
            data = event.copy()
            if "event_type" not in data:
                data["event_type"] = "GenericEvent"
        else:
            raise ValueError("Event must be a Pydantic model or a dictionary")
        # Ensure event_type is present for deserialization
        if "event_type" not in data:
            data["event_type"] = event.__class__.__name__

        # Inject Trace Context
        # We add a hidden field to carry the trace context
        self.telemetry.inject_context(data)

        target_backend = self.backend

        # Ensure target backend is connected
        # Most connectors should be idempotent on connect()
        await target_backend.connect()

        if topic and topic != self.backend.stream_key:
            target_backend = self.backend.clone_with_topic(topic)
            await target_backend.connect()

        msg_id = await target_backend.add_event(data)
        return msg_id

    async def run(self, handler: Callable[[T], Awaitable[None]], batch_size: int = 10) -> None:
        """
        Start the infinite processing loop.

        Continuously fetches batches of messages, validates them against the schema,
        and invokes the user-provided handler.

        Args:
            handler (Callable[[T], Awaitable[None]]): Async function to process each event.
            batch_size (int): Number of messages to fetch in each batch. Default is 10.

        Raises:
             Exception: Any unhandled exception from the processor or validation logic.
        """
        async def typed_handler(msg_id: str, raw_data: Dict[str, Any], ctx: Context) -> None:
            return await self._create_typed_handler(handler, self.backend.stream_key)(msg_id, raw_data, ctx)

        logger.info(f"Starting stream processor for {self.backend.stream_key}...")
        await self.processor.run_loop(typed_handler, batch_size=batch_size)

    def _create_typed_handler(self, handler: Callable, topic: str) -> Callable[[str, Dict[str, Any], Context], Awaitable[None]]:
        import inspect
        sig = inspect.signature(handler)

        async def typed_handler(msg_id: str, raw_data: Dict[str, Any], ctx: Context) -> None:
            # 1. Deserialize / Validate
            if self.schema:
                try:
                    event = self.schema.model_validate(raw_data)
                except Exception as e:
                    logger.error(f"Schema validation failed for msg {msg_id}: {e}")
                    # Re-raise so processor handles DLO logic
                    raise e
            else:
                # Dynamic validation via Registry or fallback
                # Note: raw_data might need cleanup if it has Redis artifacts? 
                # (Valkey decode_responses=True handles bytes->str)
                try:
                    # cast is needed because validate returns BaseModel but we expect T
                    from typing import cast
                    event = cast(T, SchemaRegistry.validate(raw_data))
                except Exception as e:
                    logger.error(f"Dynamic validation failed for msg {msg_id}: {e}")
                    raise e

            # Inject metadata
            if isinstance(event, BaseEvent):
                event.offset = msg_id

            # 2. Call User Logic
            # Pass ctx if the user handler accepts it
            if len(sig.parameters) >= 3:
                await handler(msg_id, raw_data, ctx)
            elif len(sig.parameters) == 2:
                await handler(msg_id, raw_data)
            else:
                await handler(event)
        return typed_handler

    async def aggregate(self, 
                        window: Window, 
                        handler: Callable[[T, Any], Awaitable[Any]],
                        batch_size: int = 10,
                        watermark_delay_ms: int = 0) -> None:
        """
        Start the infinite processing loop with stateful windowed aggregation.

        Args:
            window (Window): The windowing strategy to apply.
            handler (Callable[[T, Any], Awaitable[Any]]): Async function taking (event, current_state) 
                                                          and returning the new state.
            batch_size (int): Number of messages to fetch in each batch. Default is 10.

        Raises:
            RuntimeError: If no StateStore is attached to the Stream.
        """
        if not self.state_store:
            raise RuntimeError("A StateStore must be provided to Stream.__init__ to use aggregate()")

        typed_agg_handler = self._create_aggregation_handler(handler, self.backend.stream_key, window, watermark_delay_ms, self.backend)
        logger.info(f"Starting windowed aggregation processor for {self.backend.stream_key}...")
        await self.processor.run_loop(typed_agg_handler, batch_size=batch_size)

    def _create_aggregation_handler(self, handler: Callable, topic: str, window: Window, watermark_delay_ms: int, backend: StreamingBackend) -> Callable[[str, Dict[str, Any], Context], Awaitable[None]]:
        max_event_ts = 0.0

        async def aggregation_handler(msg_id: str, raw_data: Dict[str, Any], ctx: Context) -> None:
            nonlocal max_event_ts
            # 1. Deserialize / Validate (similar to run_loop)
            if self.schema:
                try:
                    event = self.schema.model_validate(raw_data)
                except Exception as e:
                    logger.error(f"Schema validation failed for msg {msg_id}: {e}")
                    raise e
            else:
                try:
                    from typing import cast
                    event = cast(T, SchemaRegistry.validate(raw_data))
                except Exception as e:
                    logger.error(f"Dynamic validation failed for msg {msg_id}: {e}")
                    raise e

            if isinstance(event, BaseEvent):
                event.offset = msg_id

            # 2. Extract Timestamp
            # Prioritize event timestamp if available, else fallback to current time
            # For robustness, we should use the actual event time
            ts = getattr(event, "timestamp", None)
            if ts is None:
                # If no timestamp field, we inject one or fail?
                # Fallback to current time
                import time
                ts_val = time.time()
            else:
                from datetime import datetime
                if isinstance(ts, datetime):
                    ts_val = ts.timestamp()
                elif isinstance(ts, (int, float)):
                    ts_val = float(ts)
                else:
                    import time
                    ts_val = time.time()

            # 3. Handle Watermarks
            max_event_ts = max(max_event_ts, ts_val)
            current_watermark = max_event_ts - (watermark_delay_ms / 1000.0)

            # 4. Assign Windows
            windows = window.assign_windows(ts_val)

            # 5. Process Each Window Location
            # Key extraction: prefer 'key' attribute, else default
            event_key = getattr(event, "key", "default_key")

            for start, end in windows:
                if watermark_delay_ms > 0 and end < current_watermark:
                    logger.warning(f"Dropping late event {msg_id}. Window {end} is older than Watermark {current_watermark:.2f}")
                    # Route to late-events DLQ
                    late_topic = f"{topic}-late"
                    try:
                        late_backend = backend.clone_with_topic(late_topic)
                        await late_backend.add_event(raw_data)
                        logger.debug(f"Late event {msg_id} routed to DLQ {late_topic}")
                    except Exception as e:
                        logger.error(f"Failed to route late event to DLQ: {e}")
                    continue

                # Dynamic Logic for Sessions vs Fixed Windows
                if window.is_session:
                    # Sessions are tracked by a moving active key
                    state_key = f"{self.backend.stream_key}:{event_key}:session:active"
                    current_raw = await self.state_store.get(state_key)

                    # session_state structure: {"start": float, "last": float, "agg": Any}
                    if current_raw and isinstance(current_raw, dict) and "last" in current_raw:
                        last_activity = current_raw["last"]
                        # Check activity gap (Note: SessionWindow.gap_ms is in ms)
                        if (ts_val - last_activity) <= (window.gap_ms / 1000.0):
                            # Within gap: Update existing session
                            current_state = current_raw.get("agg")
                            new_agg = await handler(event, current_state)
                            new_state = {
                                "start": current_raw["start"],
                                "last": ts_val,
                                "agg": new_agg
                            }
                        else:
                            # Outside gap: Old session is effectively closed. 
                            # We could move it to a 'closed' archive key here if needed.
                            # For now, we just start a fresh one.
                            new_agg = await handler(event, None)
                            new_state = {
                                "start": ts_val,
                                "last": ts_val,
                                "agg": new_agg
                            }
                    else:
                        # Fresh session
                        new_agg = await handler(event, None)
                        new_state = {
                            "start": ts_val,
                            "last": ts_val,
                            "agg": new_agg
                        }
                else:
                    # Fixed Windows (Tumbling/Sliding)
                    state_key = f"{self.backend.stream_key}:{event_key}:{start}:{end}"
                    current_state = await self.state_store.get(state_key)
                    new_state = await handler(event, current_state) # type: ignore

                # Save state
                await self.state_store.put(state_key, new_state)

            # Atomically checkpoint offset for exactly-once-ish semantics
            # if the state store supports transactional combined checkpointing.
            # ( SQLiteStateStore does this )
            group_name = getattr(backend, "group_name", "default_group")
            await self.state_store.checkpoint(
                stream_id=topic,
                group_id=group_name,
                offset=msg_id
            )

        return aggregation_handler

__aenter__() async

Async context manager entry.

Connects to the backend and ensures the consumer group exists.

Source code in pspf/stream.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
async def __aenter__(self) -> "Stream[T]":
    """
    Async context manager entry.

    Connects to the backend and ensures the consumer group exists.
    """
    # We delegate connection management to the backend's connector 
    # if the user hasn't already connected it.
    # But logically, the backend should be ready. 
    # We will ensure the group exists here.
    await self.backend.connect() # Idempotent-ish check inside
    await self.backend.ensure_group_exists()

    if self.state_store:
        await self.state_store.start()

    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Trigger shutdown of the processor and closes the backend connection.

Source code in pspf/stream.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None:
    """
    Async context manager exit.

    Trigger shutdown of the processor and closes the backend connection.
    """
    if self.processor:
        await self.processor.shutdown()

    if self.state_store:
        await self.state_store.stop()

    # We close the connection here as we "took ownership" in aenter
    await self.backend.close()

__init__(backend=None, topic=None, group=None, schema=None, state_store=None)

Initialize the Stream facade.

Parameters:

Name Type Description Default
backend Optional[StreamingBackend]

Configured backend instance.

None
topic Optional[str]

Stream topic (required if backend is None).

None
group Optional[str]

Consumer group (required if backend is None).

None
schema Optional[Type[T]]

Pydantic model class for validation. If None, uses dynamic SchemaRegistry.

None
state_store Optional[StateStore]

Key-Value store for persistent state.

None
Source code in pspf/stream.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(self, 
             backend: Optional[StreamingBackend] = None,
             topic: Optional[str] = None,
             group: Optional[str] = None,
             schema: Optional[Type[T]] = None,
             state_store: Optional[StateStore] = None):
    """
    Initialize the Stream facade.

    Args:
        backend (Optional[StreamingBackend]): Configured backend instance.
        topic (Optional[str]): Stream topic (required if backend is None).
        group (Optional[str]): Consumer group (required if backend is None).
        schema (Optional[Type[T]]): Pydantic model class for validation. 
                                    If None, uses dynamic SchemaRegistry.
        state_store (Optional[StateStore]): Key-Value store for persistent state.
    """
    if backend is None:
        if not topic or not group:
            raise ValueError("Must provide either a 'backend', or both 'topic' and 'group' for auto-instantiation.")

        from pspf.settings import settings

        # Auto-selection logic
        use_valkey = False
        # If VALKEY_HOST is explicitly set to something other than localhost, we assume intent to use it.
        # Otherwise, we only use it if specifically requested or not in dev.
        if os.getenv("VALKEY_HOST") or os.getenv("PSPF_BACKEND") == "valkey":
            use_valkey = True
        elif settings.ENV != "dev":
            use_valkey = True

        if use_valkey:
            try:
                from pspf.connectors.valkey import ValkeyConnector, ValkeyStreamBackend
                import uuid
                connector = ValkeyConnector(host=settings.valkey.HOST, port=settings.valkey.PORT)
                worker_id = f"worker-{uuid.uuid4().hex[:8]}"
                backend = ValkeyStreamBackend(connector, topic, group, worker_id)
                logger.info(f"Auto-instantiated Valkey backend for topic '{topic}'")
            except Exception as e:
                logger.warning(f"Failed to auto-instantiate Valkey backend: {e}. Falling back to MemoryBackend.")
                from pspf.connectors.memory import MemoryBackend
                backend = MemoryBackend(stream_key=topic, group_name=group)
        else:
            from pspf.connectors.memory import MemoryBackend
            backend = MemoryBackend(stream_key=topic, group_name=group)
            logger.debug(f"Auto-instantiated MemoryBackend for topic '{topic}' (ENV={settings.ENV})")

    self.backend = backend
    self.schema = schema
    self.state_store = state_store

    # Inject state_store into backend if it supports it
    if hasattr(self.backend, "state_store"):
        self.backend.state_store = state_store

    self.processor = BatchProcessor(self.backend)
    self.telemetry = TelemetryManager()
    self._subscriptions: List[Dict[str, Any]] = []
    self._processors: List[BatchProcessor] = []

aggregate(window, handler, batch_size=10, watermark_delay_ms=0) async

Start the infinite processing loop with stateful windowed aggregation.

Parameters:

Name Type Description Default
window Window

The windowing strategy to apply.

required
handler Callable[[T, Any], Awaitable[Any]]

Async function taking (event, current_state) and returning the new state.

required
batch_size int

Number of messages to fetch in each batch. Default is 10.

10

Raises:

Type Description
RuntimeError

If no StateStore is attached to the Stream.

Source code in pspf/stream.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
async def aggregate(self, 
                    window: Window, 
                    handler: Callable[[T, Any], Awaitable[Any]],
                    batch_size: int = 10,
                    watermark_delay_ms: int = 0) -> None:
    """
    Start the infinite processing loop with stateful windowed aggregation.

    Args:
        window (Window): The windowing strategy to apply.
        handler (Callable[[T, Any], Awaitable[Any]]): Async function taking (event, current_state) 
                                                      and returning the new state.
        batch_size (int): Number of messages to fetch in each batch. Default is 10.

    Raises:
        RuntimeError: If no StateStore is attached to the Stream.
    """
    if not self.state_store:
        raise RuntimeError("A StateStore must be provided to Stream.__init__ to use aggregate()")

    typed_agg_handler = self._create_aggregation_handler(handler, self.backend.stream_key, window, watermark_delay_ms, self.backend)
    logger.info(f"Starting windowed aggregation processor for {self.backend.stream_key}...")
    await self.processor.run_loop(typed_agg_handler, batch_size=batch_size)

emit(event, topic=None) async

Produce an event to the stream.

Accepts either a Pydantic model or a dictionary. Serializes and injects tracing context before sending to the backend.

Parameters:

Name Type Description Default
event Any

The event object to publish (BaseModel or dict).

required
topic Optional[str]

The stream/topic to publish to. Defaults to backend's stream_key.

None

Returns:

Name Type Description
str str

The message ID generated by the backend.

Source code in pspf/stream.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
async def emit(self, event: Any, topic: Optional[str] = None) -> str:
    """
    Produce an event to the stream.

    Accepts either a Pydantic model or a dictionary.
    Serializes and injects tracing context before sending to the backend.

    Args:
        event (Any): The event object to publish (BaseModel or dict).
        topic (Optional[str]): The stream/topic to publish to. Defaults to backend's stream_key.

    Returns:
        str: The message ID generated by the backend.
    """
    if hasattr(event, "model_dump"):
        data = event.model_dump(mode='json')
        if "event_type" not in data:
            data["event_type"] = event.__class__.__name__
    elif isinstance(event, dict):
        data = event.copy()
        if "event_type" not in data:
            data["event_type"] = "GenericEvent"
    else:
        raise ValueError("Event must be a Pydantic model or a dictionary")
    # Ensure event_type is present for deserialization
    if "event_type" not in data:
        data["event_type"] = event.__class__.__name__

    # Inject Trace Context
    # We add a hidden field to carry the trace context
    self.telemetry.inject_context(data)

    target_backend = self.backend

    # Ensure target backend is connected
    # Most connectors should be idempotent on connect()
    await target_backend.connect()

    if topic and topic != self.backend.stream_key:
        target_backend = self.backend.clone_with_topic(topic)
        await target_backend.connect()

    msg_id = await target_backend.add_event(data)
    return msg_id

health() async

Perform a health check on the stream components.

Pings the backend to ensure connectivity.

Returns:

Type Description
Dict[str, str]

Dict[str, str]: A dictionary containing status ('ok' or 'error') and details.

Source code in pspf/stream.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def health(self) -> Dict[str, str]:
    """
    Perform a health check on the stream components.

    Pings the backend to ensure connectivity.

    Returns:
        Dict[str, str]: A dictionary containing status ('ok' or 'error') 
                        and details.
    """
    try:
        # Assuming backend has a ping method (we will add it)
        await self.backend.ping()
        # Could also check if group exists
        return {"status": "ok", "backend": "connected"}
    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return {"status": "error", "reason": str(e)}

run(handler, batch_size=10) async

Start the infinite processing loop.

Continuously fetches batches of messages, validates them against the schema, and invokes the user-provided handler.

Parameters:

Name Type Description Default
handler Callable[[T], Awaitable[None]]

Async function to process each event.

required
batch_size int

Number of messages to fetch in each batch. Default is 10.

10

Raises:

Type Description
Exception

Any unhandled exception from the processor or validation logic.

Source code in pspf/stream.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
async def run(self, handler: Callable[[T], Awaitable[None]], batch_size: int = 10) -> None:
    """
    Start the infinite processing loop.

    Continuously fetches batches of messages, validates them against the schema,
    and invokes the user-provided handler.

    Args:
        handler (Callable[[T], Awaitable[None]]): Async function to process each event.
        batch_size (int): Number of messages to fetch in each batch. Default is 10.

    Raises:
         Exception: Any unhandled exception from the processor or validation logic.
    """
    async def typed_handler(msg_id: str, raw_data: Dict[str, Any], ctx: Context) -> None:
        return await self._create_typed_handler(handler, self.backend.stream_key)(msg_id, raw_data, ctx)

    logger.info(f"Starting stream processor for {self.backend.stream_key}...")
    await self.processor.run_loop(typed_handler, batch_size=batch_size)

run_forever() async

Start the infinite processing loop for all registered decorators concurrently.

Source code in pspf/stream.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def run_forever(self) -> None:
    """Start the infinite processing loop for all registered decorators concurrently."""
    import asyncio
    if not self._subscriptions:
        logger.warning("No subscriptions registered. Exiting run_forever.")
        return

    await self.backend.connect()
    if self.state_store:
        await self.state_store.start()

    tasks = []
    self._processors = []
    for i, sub in enumerate(self._subscriptions):
        topic = sub["topic"]
        backend_clone = self.backend.clone_with_topic(topic)
        await backend_clone.ensure_group_exists()

        processor = BatchProcessor(backend_clone, state_store=self.state_store, start_admin_server=(i == 0))
        self._processors.append(processor)

        if sub["type"] == "subscribe":
            typed_handler = self._create_typed_handler(sub["handler"], topic)
            task = asyncio.create_task(processor.run_loop(typed_handler, batch_size=sub["batch_size"]))
            tasks.append(task)
        elif sub["type"] == "aggregate":
            if not self.state_store:
                raise RuntimeError(f"A StateStore must be provided to use @stream.window on {topic}")
            typed_agg_handler = self._create_aggregation_handler(sub["handler"], topic, sub["window"], sub["watermark_delay_ms"], backend_clone)
            task = asyncio.create_task(processor.run_loop(typed_agg_handler, batch_size=sub["batch_size"]))
            tasks.append(task)

    logger.info(f"Starting {len(tasks)} Stream processors concurrently.")
    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        logger.info("run_forever cancelled.")
    finally:
        for p in self._processors:
            await p.shutdown()
        if self.state_store:
            await self.state_store.stop()

stop() async

Stop all running processors.

Source code in pspf/stream.py
163
164
165
166
167
async def stop(self) -> None:
    """Stop all running processors."""
    for p in self._processors:
        await p.shutdown()
    logger.info("Stream stopped.")

subscribe(topic, batch_size=10)

Decorator to register a stateless handler for a topic.

Source code in pspf/stream.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def subscribe(self, topic: str, batch_size: int = 10) -> Callable:
    """Decorator to register a stateless handler for a topic."""
    def decorator(func: Callable) -> Callable:
        self._subscriptions.append({
            "type": "subscribe",
            "topic": topic,
            "handler": func,
            "batch_size": batch_size
        })
        return func
    return decorator

window(topic, window, batch_size=10, watermark_delay_ms=0)

Decorator to register a stateful aggregation handler for a topic.

Source code in pspf/stream.py
106
107
108
109
110
111
112
113
114
115
116
117
118
def window(self, topic: str, window: Window, batch_size: int = 10, watermark_delay_ms: int = 0) -> Callable:
    """Decorator to register a stateful aggregation handler for a topic."""
    def decorator(func: Callable) -> Callable:
        self._subscriptions.append({
            "type": "aggregate",
            "topic": topic,
            "window": window,
            "handler": func,
            "batch_size": batch_size,
            "watermark_delay_ms": watermark_delay_ms
        })
        return func
    return decorator