Skip to content

Backends API Reference

PSPF supports multiple backends for stream storage and coordination.

ValkeyStreamBackend

Bases: StreamingBackend

Handles all Stream-related operations on top of a ValkeyConnector.

Attributes:

Name Type Description
connector ValkeyConnector

The connection manager.

stream_key str

Name of the stream.

group_name str

Consumer group name.

consumer_name str

Unique consumer identifier.

Source code in pspf/connectors/valkey.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
class ValkeyStreamBackend(StreamingBackend):
    """
    Handles all Stream-related operations on top of a ValkeyConnector.

    Attributes:
        connector (ValkeyConnector): The connection manager.
        stream_key (str): Name of the stream.
        group_name (str): Consumer group name.
        consumer_name (str): Unique consumer identifier.
    """
    def __init__(self, connector: ValkeyConnector, stream_key: str, group_name: str, consumer_name: str):
        self.connector = connector
        self._stream_key = stream_key
        self._group_name = group_name
        self.consumer_name = consumer_name
        self.dlq_stream_key = f"{stream_key}-dlq"
        self.retry_tracker_key = f"pspf:retries:{group_name}:{stream_key}"

    @property
    def stream_key(self) -> str:
        return self._stream_key

    @property
    def group_name(self) -> str:
        return self._group_name

    def clone_with_topic(self, topic: str) -> "ValkeyStreamBackend":
        """Create a new backend instance for a different topic, sharing the underlying connection."""
        return ValkeyStreamBackend(
            connector=self.connector,
            stream_key=topic,
            group_name=self.group_name,
            consumer_name=self.consumer_name
        )

    async def connect(self) -> None:
        await self.connector.connect()

    async def close(self) -> None:
        await self.connector.close()

    async def ping(self) -> bool:
        client = self.connector.get_client()
        await client.ping()
        return True

    async def ensure_group_exists(self, start_id: str = "0") -> None:
        """
        Idempotent creation of the consumer group.

        Args:
            start_id (str): ID to start from. Default is "0" (process all).
        """
        client = self.connector.get_client()
        try:
            # MKSTREAM ensures the stream is created if it doesn't exist
            await client.xgroup_create(self.stream_key, self.group_name, id=start_id, mkstream=True)
            logger.info(f"Created consumer group '{self.group_name}' on stream '{self.stream_key}'")
        except ResponseError as e:
            if "BUSYGROUP" in str(e):
                logger.debug(f"Consumer group '{self.group_name}' already exists.")
            else:
                raise e

    async def read_batch(self, count: int = 10, block_ms: int = 1000) -> List[Tuple[str, Dict[str, Any]]]:
        """
        Reads a batch of new messages ('>') for this consumer group.

        Args:
            count (int): Max messages.
            block_ms (int): Block timeout in milliseconds.

        Returns:
            List[Tuple[str, Dict]]: List of (msg_id, payload).
        """
        client = self.connector.get_client()

        try:
            # Structure: [[stream_name, [(msg_id, data), ...]], ...]
            streams = await client.xreadgroup(
                groupname=self.group_name,
                consumername=self.consumer_name,
                streams={self.stream_key: ">"}, # ">" means new messages
                count=count,
                block=block_ms
            )

            if not streams:
                return []

            # We only read one stream, so take the first result
            stream_name, messages = streams[0]
            if not messages:
                return []

            # Deserialize nested JSON strings
            import json
            parsed_messages = []
            for msg_id, data in messages:
                parsed_data = {}
                for k, v in data.items():
                    if isinstance(v, str):
                        try:
                            parsed_data[k] = json.loads(v)
                        except (json.JSONDecodeError, TypeError):
                            parsed_data[k] = v
                    else:
                        parsed_data[k] = v
                parsed_messages.append((msg_id, parsed_data))

            return parsed_messages
        except Exception as e:
            logger.error(f"Error reading batch: {e}")
            raise

    async def get_retry_count(self, message_id: str) -> int:
        """
        Gets the current retry count for a message ID.
        """
        client = self.connector.get_client()
        count: Any = await client.hget(self.retry_tracker_key, message_id) # type: ignore
        if count:
            return int(count)
        return 0

    async def increment_retry_count(self, message_id: str) -> int:
        """
        Increments retry count for a message by 1.

        Args:
            message_id (str): The unique message ID.

        Returns:
             int: The new retry count.
        """
        client = self.connector.get_client()
        res: Any = await client.hincrby(self.retry_tracker_key, message_id, 1) # type: ignore
        return int(res)

    async def move_to_dlq(self, message_id: str, data: Dict[str, Any], error: str) -> None:
        """
        Moves a message to the DLQ stream and ACKs it in the main stream.

        Args:
             message_id (str): Message ID.
             data (Dict): Message payload.
             error (str): Error description.
        """
        client = self.connector.get_client()
        logger.warning(f"Moving message {message_id} to DLQ {self.dlq_stream_key}. Error: {error}")

        # Enrich data with error metadata
        dlq_data = data.copy()
        dlq_data["_error"] = str(error)
        dlq_data["_original_stream"] = self.stream_key
        dlq_data["_original_msg_id"] = message_id
        dlq_data["_moved_timestamp"] = str(time.time())

        # Add to DLQ
        await client.xadd(self.dlq_stream_key, dlq_data)

        # ACK in original stream to remove from Pending Entries List (PEL)
        await client.xack(self.stream_key, self.group_name, message_id)

        # Clean up retry tracker
        await client.hdel(self.retry_tracker_key, message_id) # type: ignore

    async def ack_batch(self, message_ids: List[str]) -> None:
        """
        Acknowledges a batch of message IDs and cleans up retry counters.

        Args:
            message_ids (List[str]): IDs to ACK.
        """
        if not message_ids:
            return

        client = self.connector.get_client()

        # Pipeline the XACK and retry cleanup
        async with client.pipeline() as pipe:
            pipe.xack(self.stream_key, self.group_name, *message_ids)
            pipe.hdel(self.retry_tracker_key, *message_ids)
            await pipe.execute()

        logger.debug(f"ACKed {len(message_ids)} messages")

    async def add_event(self, data: Dict[str, Any], max_len: Optional[int] = None) -> str:
        """
        Appends an event to the stream.

        Args:
             data (Dict): The payload.
             max_len (Optional[int]): Max stream length.

        Returns:
             str: The generated message ID.
        """
        # valkey-python requires primitive types (bytes, str, int, float)
        # We need to flatten or serialize the values if they are complex
        safe_data = {}
        for k, v in data.items():
            if isinstance(v, (dict, list, bool)) or v is None:
                import json
                safe_data[k] = json.dumps(v)
            else:
                safe_data[k] = v

        client = self.connector.get_client()
        msg_id: Any = await client.xadd(self.stream_key, safe_data, maxlen=max_len)
        return str(msg_id)

    async def claim_stuck_messages(self, min_idle_time_ms: int = 60000, count: int = 10) -> List[Tuple[str, Dict[str, Any]]]:
        """
        Auto-claims pending messages from crashed consumers.
        Using XAUTOCLAIM (Redis 6.2+ / Valkey) for efficiency.

        Args:
            min_idle_time_ms (int): Min idle time to consider a message 'stuck'.
            count (int): Max messages to claim.

        Returns:
             List[Tuple[str, Dict]]: Claimed messages.
        """
        client = self.connector.get_client()

        try:
            # xautoclaim(name, group, consumer, min_idle_time, start_id='0-0', count=None, justid=False)
            # Returns: (next_start_id, messages)
            # messages is a list of (msg_id, data)
            current_id = "0-0"
            all_messages: List[Tuple[str, Dict[str, Any]]] = []

            # We might need to loop if we really want to grab 'count' messages, 
            # but XAUTOCLAIM returns a batch usually. We'll just take one batch for simplicity to avoid implementation complexity.
            # XAUTOCLAIM returns (next_id, messages, [deleted_ids]) depending on version/client
            result = await client.xautoclaim(
                name=self.stream_key,
                groupname=self.group_name,
                consumername=self.consumer_name,
                min_idle_time=min_idle_time_ms,
                start_id=current_id,
                count=count
            )
            next_id = result[0]
            messages = result[1]



            if messages:
                logger.warning(f"Consumer {self.consumer_name} claimed {len(messages)} stuck messages.")
                from typing import cast
                # xautoclaim returns list of (msg_id, data)
                # Ensure data is deserialized if it was serialized? 
                # xautoclaim returns raw data just like xreadgroup
                # We need to deserialize it here too!
                raw_messages = cast(List[Tuple[str, Dict[str, Any]]], messages)

                # Deserialize nested JSON strings
                import json
                parsed_messages = []
                for msg_id, data in raw_messages:
                    parsed_data = {}
                    for k, v in data.items():
                        if isinstance(v, str):
                            try:
                                parsed_data[k] = json.loads(v)
                            except (json.JSONDecodeError, TypeError):
                                parsed_data[k] = v
                        else:
                            parsed_data[k] = v
                    parsed_messages.append((msg_id, parsed_data))
                return parsed_messages
            return []
            return []
        except Exception as e:
            logger.error(f"Error during XAUTOCLAIM: {e}")
            return []

    async def get_pending_info(self) -> Dict[str, Any]:
        """
        Retrieves lag and pending consumer group info.
        """
        client = self.connector.get_client()
        info = {
            "pending": 0,
            "lag": 0,
            "consumers": 0
        }
        try:
            # 1. Get Pending Count (Fast)
            pending_summary = await client.xpending(self.stream_key, self.group_name)
            info["pending"] = pending_summary["pending"]

            # 2. Get Group Info for Lag (slower, but accurate in Redis 7/Valkey)
            # XINFO GROUPS key
            groups = await client.xinfo_groups(self.stream_key)
            for g in groups:
                 if g["name"] == self.group_name:
                     info["lag"] = g.get("lag", 0)
                     info["consumers"] = g.get("consumers", 0)
                     break

            return info
        except Exception as e:
            logger.error(f"Error getting pending info: {e}")
            return info

ack_batch(message_ids) async

Acknowledges a batch of message IDs and cleans up retry counters.

Parameters:

Name Type Description Default
message_ids List[str]

IDs to ACK.

required
Source code in pspf/connectors/valkey.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
async def ack_batch(self, message_ids: List[str]) -> None:
    """
    Acknowledges a batch of message IDs and cleans up retry counters.

    Args:
        message_ids (List[str]): IDs to ACK.
    """
    if not message_ids:
        return

    client = self.connector.get_client()

    # Pipeline the XACK and retry cleanup
    async with client.pipeline() as pipe:
        pipe.xack(self.stream_key, self.group_name, *message_ids)
        pipe.hdel(self.retry_tracker_key, *message_ids)
        await pipe.execute()

    logger.debug(f"ACKed {len(message_ids)} messages")

add_event(data, max_len=None) async

Appends an event to the stream.

Parameters:

Name Type Description Default
data Dict

The payload.

required
max_len Optional[int]

Max stream length.

None

Returns:

Name Type Description
str str

The generated message ID.

Source code in pspf/connectors/valkey.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
async def add_event(self, data: Dict[str, Any], max_len: Optional[int] = None) -> str:
    """
    Appends an event to the stream.

    Args:
         data (Dict): The payload.
         max_len (Optional[int]): Max stream length.

    Returns:
         str: The generated message ID.
    """
    # valkey-python requires primitive types (bytes, str, int, float)
    # We need to flatten or serialize the values if they are complex
    safe_data = {}
    for k, v in data.items():
        if isinstance(v, (dict, list, bool)) or v is None:
            import json
            safe_data[k] = json.dumps(v)
        else:
            safe_data[k] = v

    client = self.connector.get_client()
    msg_id: Any = await client.xadd(self.stream_key, safe_data, maxlen=max_len)
    return str(msg_id)

claim_stuck_messages(min_idle_time_ms=60000, count=10) async

Auto-claims pending messages from crashed consumers. Using XAUTOCLAIM (Redis 6.2+ / Valkey) for efficiency.

Parameters:

Name Type Description Default
min_idle_time_ms int

Min idle time to consider a message 'stuck'.

60000
count int

Max messages to claim.

10

Returns:

Type Description
List[Tuple[str, Dict[str, Any]]]

List[Tuple[str, Dict]]: Claimed messages.

Source code in pspf/connectors/valkey.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
async def claim_stuck_messages(self, min_idle_time_ms: int = 60000, count: int = 10) -> List[Tuple[str, Dict[str, Any]]]:
    """
    Auto-claims pending messages from crashed consumers.
    Using XAUTOCLAIM (Redis 6.2+ / Valkey) for efficiency.

    Args:
        min_idle_time_ms (int): Min idle time to consider a message 'stuck'.
        count (int): Max messages to claim.

    Returns:
         List[Tuple[str, Dict]]: Claimed messages.
    """
    client = self.connector.get_client()

    try:
        # xautoclaim(name, group, consumer, min_idle_time, start_id='0-0', count=None, justid=False)
        # Returns: (next_start_id, messages)
        # messages is a list of (msg_id, data)
        current_id = "0-0"
        all_messages: List[Tuple[str, Dict[str, Any]]] = []

        # We might need to loop if we really want to grab 'count' messages, 
        # but XAUTOCLAIM returns a batch usually. We'll just take one batch for simplicity to avoid implementation complexity.
        # XAUTOCLAIM returns (next_id, messages, [deleted_ids]) depending on version/client
        result = await client.xautoclaim(
            name=self.stream_key,
            groupname=self.group_name,
            consumername=self.consumer_name,
            min_idle_time=min_idle_time_ms,
            start_id=current_id,
            count=count
        )
        next_id = result[0]
        messages = result[1]



        if messages:
            logger.warning(f"Consumer {self.consumer_name} claimed {len(messages)} stuck messages.")
            from typing import cast
            # xautoclaim returns list of (msg_id, data)
            # Ensure data is deserialized if it was serialized? 
            # xautoclaim returns raw data just like xreadgroup
            # We need to deserialize it here too!
            raw_messages = cast(List[Tuple[str, Dict[str, Any]]], messages)

            # Deserialize nested JSON strings
            import json
            parsed_messages = []
            for msg_id, data in raw_messages:
                parsed_data = {}
                for k, v in data.items():
                    if isinstance(v, str):
                        try:
                            parsed_data[k] = json.loads(v)
                        except (json.JSONDecodeError, TypeError):
                            parsed_data[k] = v
                    else:
                        parsed_data[k] = v
                parsed_messages.append((msg_id, parsed_data))
            return parsed_messages
        return []
        return []
    except Exception as e:
        logger.error(f"Error during XAUTOCLAIM: {e}")
        return []

clone_with_topic(topic)

Create a new backend instance for a different topic, sharing the underlying connection.

Source code in pspf/connectors/valkey.py
109
110
111
112
113
114
115
116
def clone_with_topic(self, topic: str) -> "ValkeyStreamBackend":
    """Create a new backend instance for a different topic, sharing the underlying connection."""
    return ValkeyStreamBackend(
        connector=self.connector,
        stream_key=topic,
        group_name=self.group_name,
        consumer_name=self.consumer_name
    )

ensure_group_exists(start_id='0') async

Idempotent creation of the consumer group.

Parameters:

Name Type Description Default
start_id str

ID to start from. Default is "0" (process all).

'0'
Source code in pspf/connectors/valkey.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
async def ensure_group_exists(self, start_id: str = "0") -> None:
    """
    Idempotent creation of the consumer group.

    Args:
        start_id (str): ID to start from. Default is "0" (process all).
    """
    client = self.connector.get_client()
    try:
        # MKSTREAM ensures the stream is created if it doesn't exist
        await client.xgroup_create(self.stream_key, self.group_name, id=start_id, mkstream=True)
        logger.info(f"Created consumer group '{self.group_name}' on stream '{self.stream_key}'")
    except ResponseError as e:
        if "BUSYGROUP" in str(e):
            logger.debug(f"Consumer group '{self.group_name}' already exists.")
        else:
            raise e

get_pending_info() async

Retrieves lag and pending consumer group info.

Source code in pspf/connectors/valkey.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
async def get_pending_info(self) -> Dict[str, Any]:
    """
    Retrieves lag and pending consumer group info.
    """
    client = self.connector.get_client()
    info = {
        "pending": 0,
        "lag": 0,
        "consumers": 0
    }
    try:
        # 1. Get Pending Count (Fast)
        pending_summary = await client.xpending(self.stream_key, self.group_name)
        info["pending"] = pending_summary["pending"]

        # 2. Get Group Info for Lag (slower, but accurate in Redis 7/Valkey)
        # XINFO GROUPS key
        groups = await client.xinfo_groups(self.stream_key)
        for g in groups:
             if g["name"] == self.group_name:
                 info["lag"] = g.get("lag", 0)
                 info["consumers"] = g.get("consumers", 0)
                 break

        return info
    except Exception as e:
        logger.error(f"Error getting pending info: {e}")
        return info

get_retry_count(message_id) async

Gets the current retry count for a message ID.

Source code in pspf/connectors/valkey.py
198
199
200
201
202
203
204
205
206
async def get_retry_count(self, message_id: str) -> int:
    """
    Gets the current retry count for a message ID.
    """
    client = self.connector.get_client()
    count: Any = await client.hget(self.retry_tracker_key, message_id) # type: ignore
    if count:
        return int(count)
    return 0

increment_retry_count(message_id) async

Increments retry count for a message by 1.

Parameters:

Name Type Description Default
message_id str

The unique message ID.

required

Returns:

Name Type Description
int int

The new retry count.

Source code in pspf/connectors/valkey.py
208
209
210
211
212
213
214
215
216
217
218
219
220
async def increment_retry_count(self, message_id: str) -> int:
    """
    Increments retry count for a message by 1.

    Args:
        message_id (str): The unique message ID.

    Returns:
         int: The new retry count.
    """
    client = self.connector.get_client()
    res: Any = await client.hincrby(self.retry_tracker_key, message_id, 1) # type: ignore
    return int(res)

move_to_dlq(message_id, data, error) async

Moves a message to the DLQ stream and ACKs it in the main stream.

Parameters:

Name Type Description Default
message_id str

Message ID.

required
data Dict

Message payload.

required
error str

Error description.

required
Source code in pspf/connectors/valkey.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
async def move_to_dlq(self, message_id: str, data: Dict[str, Any], error: str) -> None:
    """
    Moves a message to the DLQ stream and ACKs it in the main stream.

    Args:
         message_id (str): Message ID.
         data (Dict): Message payload.
         error (str): Error description.
    """
    client = self.connector.get_client()
    logger.warning(f"Moving message {message_id} to DLQ {self.dlq_stream_key}. Error: {error}")

    # Enrich data with error metadata
    dlq_data = data.copy()
    dlq_data["_error"] = str(error)
    dlq_data["_original_stream"] = self.stream_key
    dlq_data["_original_msg_id"] = message_id
    dlq_data["_moved_timestamp"] = str(time.time())

    # Add to DLQ
    await client.xadd(self.dlq_stream_key, dlq_data)

    # ACK in original stream to remove from Pending Entries List (PEL)
    await client.xack(self.stream_key, self.group_name, message_id)

    # Clean up retry tracker
    await client.hdel(self.retry_tracker_key, message_id) # type: ignore

read_batch(count=10, block_ms=1000) async

Reads a batch of new messages ('>') for this consumer group.

Parameters:

Name Type Description Default
count int

Max messages.

10
block_ms int

Block timeout in milliseconds.

1000

Returns:

Type Description
List[Tuple[str, Dict[str, Any]]]

List[Tuple[str, Dict]]: List of (msg_id, payload).

Source code in pspf/connectors/valkey.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def read_batch(self, count: int = 10, block_ms: int = 1000) -> List[Tuple[str, Dict[str, Any]]]:
    """
    Reads a batch of new messages ('>') for this consumer group.

    Args:
        count (int): Max messages.
        block_ms (int): Block timeout in milliseconds.

    Returns:
        List[Tuple[str, Dict]]: List of (msg_id, payload).
    """
    client = self.connector.get_client()

    try:
        # Structure: [[stream_name, [(msg_id, data), ...]], ...]
        streams = await client.xreadgroup(
            groupname=self.group_name,
            consumername=self.consumer_name,
            streams={self.stream_key: ">"}, # ">" means new messages
            count=count,
            block=block_ms
        )

        if not streams:
            return []

        # We only read one stream, so take the first result
        stream_name, messages = streams[0]
        if not messages:
            return []

        # Deserialize nested JSON strings
        import json
        parsed_messages = []
        for msg_id, data in messages:
            parsed_data = {}
            for k, v in data.items():
                if isinstance(v, str):
                    try:
                        parsed_data[k] = json.loads(v)
                    except (json.JSONDecodeError, TypeError):
                        parsed_data[k] = v
                else:
                    parsed_data[k] = v
            parsed_messages.append((msg_id, parsed_data))

        return parsed_messages
    except Exception as e:
        logger.error(f"Error reading batch: {e}")
        raise

ValkeyConnector

Manages the connection pool to a Valkey (or Redis) server.

Source code in pspf/connectors/valkey.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class ValkeyConnector:
    """
    Manages the connection pool to a Valkey (or Redis) server.
    """
    def __init__(self, host: Optional[str] = None, port: Optional[int] = None, password: Optional[str] = None, db: Optional[int] = None, valkey_settings: Optional[Any] = None):
        # Fallback to global settings if nothing provided
        from pspf.settings import settings as global_settings
        self._s = valkey_settings or global_settings.valkey

        self.host = host or self._s.HOST
        self.port = port or self._s.PORT
        self.password = password if password is not None else self._s.PASSWORD
        self.db = db if db is not None else self._s.DB
        self._pool: Optional[valkey.Valkey] = None

    async def connect(self) -> None:
        """
        Establish a connection pool to the database.

        Raises:
             ConnectionError: If connectivity fails.
        """
        if not self._pool:
            self._pool = valkey.Valkey(
                host=self.host,
                port=self.port,
                password=self.password,
                db=self.db,
                ssl=self._s.SSL,
                ssl_ca_certs=self._s.SSL_CA_CERTS,
                ssl_cert_reqs=self._s.SSL_CERT_REQS,
                decode_responses=True,
                socket_timeout=self._s.SOCKET_TIMEOUT,
                socket_connect_timeout=self._s.SOCKET_TIMEOUT
            )
            # Fail fast if connection is bad
            await self._pool.ping()
            logger.info(f"Connected to Valkey at {self.host}:{self.port} (SSL={self._s.SSL})")

    async def close(self) -> None:
        """
        Close the connection pool.
        """
        if self._pool:
            await self._pool.aclose()
            logger.info("Closed Valkey connection")
            self._pool = None

    def get_client(self) -> valkey.Valkey:
        """
        Get the active client instance.

        Returns:
            valkey.Valkey: The client instance.

        Raises:
            RuntimeError: If connect() has not been called.
        """
        if not self._pool:
            raise RuntimeError("ValkeyConnector is not connected. Call connect() first.")
        return self._pool

    async def __aenter__(self) -> "ValkeyConnector":
        await self.connect()
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        await self.close()

close() async

Close the connection pool.

Source code in pspf/connectors/valkey.py
50
51
52
53
54
55
56
57
async def close(self) -> None:
    """
    Close the connection pool.
    """
    if self._pool:
        await self._pool.aclose()
        logger.info("Closed Valkey connection")
        self._pool = None

connect() async

Establish a connection pool to the database.

Raises:

Type Description
ConnectionError

If connectivity fails.

Source code in pspf/connectors/valkey.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
async def connect(self) -> None:
    """
    Establish a connection pool to the database.

    Raises:
         ConnectionError: If connectivity fails.
    """
    if not self._pool:
        self._pool = valkey.Valkey(
            host=self.host,
            port=self.port,
            password=self.password,
            db=self.db,
            ssl=self._s.SSL,
            ssl_ca_certs=self._s.SSL_CA_CERTS,
            ssl_cert_reqs=self._s.SSL_CERT_REQS,
            decode_responses=True,
            socket_timeout=self._s.SOCKET_TIMEOUT,
            socket_connect_timeout=self._s.SOCKET_TIMEOUT
        )
        # Fail fast if connection is bad
        await self._pool.ping()
        logger.info(f"Connected to Valkey at {self.host}:{self.port} (SSL={self._s.SSL})")

get_client()

Get the active client instance.

Returns:

Type Description
Valkey

valkey.Valkey: The client instance.

Raises:

Type Description
RuntimeError

If connect() has not been called.

Source code in pspf/connectors/valkey.py
59
60
61
62
63
64
65
66
67
68
69
70
71
def get_client(self) -> valkey.Valkey:
    """
    Get the active client instance.

    Returns:
        valkey.Valkey: The client instance.

    Raises:
        RuntimeError: If connect() has not been called.
    """
    if not self._pool:
        raise RuntimeError("ValkeyConnector is not connected. Call connect() first.")
    return self._pool

Kafka

KafkaStreamBackend

Bases: StreamingBackend

Kafka (or Redpanda) implementation of StreamingBackend. Requires aiokafka package.

Source code in pspf/connectors/kafka.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class KafkaStreamBackend(StreamingBackend):
    """
    Kafka (or Redpanda) implementation of StreamingBackend.
    Requires `aiokafka` package.
    """
    def __init__(self, bootstrap_servers: str, topic: str, group_id: str, client_id: str, state_store: Optional[StateStore] = None) -> None:
        if not KAFKA_AVAILABLE:
            raise ImportError("aiokafka is required for KafkaStreamBackend. Install it with `pip install aiokafka`.")

        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.group_id = group_id
        self.client_id = client_id
        self.state_store = state_store
        self.retry_tracker_prefix = f"pspf:retries:{group_id}:{topic}:"

        self.consumer: Optional[AIOKafkaConsumer] = None
        self.producer: Optional[AIOKafkaProducer] = None
        self._connected = False

    @property
    def stream_key(self) -> str:
        return self.topic

    @property
    def group_name(self) -> str:
        return self.group_id

    def clone_with_topic(self, topic: str) -> "KafkaStreamBackend":
        """Create a new backend instance for a different topic, sharing the underlying connection."""
        return KafkaStreamBackend(
            bootstrap_servers=self.bootstrap_servers,
            topic=topic,
            group_id=self.group_id,
            client_id=self.client_id,
            state_store=self.state_store
        )

    async def connect(self) -> None:
        try:
            self.producer = AIOKafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                client_id=f"{self.client_id}-producer"
            )
            await self.producer.start()

            # Consumer will be started on demand or here? 
            # Best to start here to ensure connectivity
            self.consumer = AIOKafkaConsumer(
                self.topic,
                bootstrap_servers=self.bootstrap_servers,
                group_id=self.group_id,
                client_id=f"{self.client_id}-consumer",
                enable_auto_commit=False, # We Manually ACK
                auto_offset_reset="earliest"
            )
            await self.consumer.start()

            self._connected = True
            logger.info(f"Connected to Kafka at {self.bootstrap_servers}")
        except Exception as e:
            logger.error(f"Failed to connect to Kafka: {e}")
            raise

    async def close(self) -> None:
        if self.producer:
            await self.producer.stop()
        if self.consumer:
            await self.consumer.stop()
        self._connected = False
        logger.info("Closed Kafka connection")

    async def ping(self) -> bool:
        if not self._connected:
            raise ConnectionError("Not connected")
        # Lightweight check? Maybe metadata request?
        # aiokafka doesn't have a direct ping, but we can check assignment or partitions
        if self.consumer:
            await self.consumer.partitions_for_topic(self.topic)
        return True

    async def ensure_group_exists(self, start_id: str = "0") -> None:
        # Kafka handles group creation automatically on join
        pass

    async def add_event(self, data: Dict[str, Any], max_len: Optional[int] = None) -> str:
        if not self.producer:
            raise ConnectionError("Producer not connected")

        # Serialize
        payload = json.dumps(data).encode("utf-8")

        # Determine partition logic here? Or let producer duplicate.
        # Ideally we use event_id as key for ordering.
        key = data.get("event_id", "").encode("utf-8") if "event_id" in data else None

        try:
            record_metadata = await self.producer.send_and_wait(self.topic, payload, key=key)
            # Return "partition-offset" as ID
            return f"{record_metadata.partition}-{record_metadata.offset}"
        except Exception as e:
            logger.error(f"Failed to produce to Kafka: {e}")
            raise

    async def read_batch(self, count: int = 10, block_ms: int = 1000) -> List[Tuple[str, Dict[str, Any]]]:
        if not self.consumer:
            raise ConnectionError("Consumer not connected")

        try:
            # Poll for messages
            # timeout_ms in getmany
            result = await self.consumer.getmany(timeout_ms=block_ms, max_records=count)

            messages = []
            for tp, records in result.items():
                for record in records:
                    msg_id = f"{record.partition}-{record.offset}"
                    try:
                        data = json.loads(record.value.decode("utf-8"))
                        messages.append((msg_id, data))
                    except json.JSONDecodeError:
                        logger.warning(f"Skipping non-JSON message {msg_id}")
                        continue

            return messages
        except Exception as e:
            logger.error(f"Error reading batch from Kafka: {e}")
            raise

    async def ack_batch(self, message_ids: List[str]) -> None:
        if not self.consumer:
            raise ConnectionError("Consumer not connected")

        # Kafka commits offsets, not individual IDs.
        # We need to find the highest offset per partition in this batch and commit that + 1.
        # Simple approach: We assume batch processing is successful.
        # AIOKafka commit() commits the *current* position? No, commit expects offsets.
        # This is tricky without tracking TopicPartition objects.
        # Limitation: We need to parse msg_id to get partition/offset back.

        offsets_to_commit: Dict[Any, Any] = {}

        for msg_id in message_ids:
            try:
                part_str, off_str = msg_id.split("-")
                partition = int(part_str)
                offset = int(off_str)

                # We reconstruct TopicPartition
                from aiokafka import TopicPartition # type: ignore
                tp = TopicPartition(self.topic, partition)

                # We want to commit offset + 1
                new_offset = offset + 1

                if tp not in offsets_to_commit or new_offset > offsets_to_commit[tp].offset:
                     # Struct to hold offset
                     # commit expects {TopicPartition: OffsetAndMetadata(offset, metadata)}
                     # or simple {TopicPartition: offset}
                     offsets_to_commit[tp] = new_offset
            except ValueError:
                logger.warning(f"Invalid message ID format for Kafka ACK: {msg_id}")

        if offsets_to_commit:
            await self.consumer.commit(offsets_to_commit) # type: ignore

            # Durable Retry Cleanup
            if self.state_store:
                for msg_id in message_ids:
                    retry_key = f"{self.retry_tracker_prefix}{msg_id}"
                    await self.state_store.delete(retry_key)

    async def claim_stuck_messages(self, min_idle_time_ms: int = 60000, count: int = 10) -> List[Tuple[str, Dict[str, Any]]]:
        # Kafka doesn't have PEL claiming mechanim like Redis Streams.
        # It relies on rebalancing if a consumer dies.
        # "Stuck" messages inside a live consumer are app logic.
        return []

    async def increment_retry_count(self, message_id: str) -> int:
        """
        Increment and return the retry count for a message.
        If a StateStore is available, it persists the count.
        """
        if self.state_store:
            retry_key = f"{self.retry_tracker_prefix}{message_id}"
            current_count = await self.state_store.get(retry_key, 0)
            new_count = current_count + 1
            await self.state_store.put(retry_key, new_count)
            return new_count

        if not hasattr(self, "_retry_counts"):
            self._retry_counts: Dict[str, int] = {}

        self._retry_counts[message_id] = self._retry_counts.get(message_id, 0) + 1
        return self._retry_counts[message_id]

    async def move_to_dlq(self, message_id: str, data: Dict[str, Any], error: str) -> None:
         # Produce to DLQ topic
         dlq_topic = f"{self.topic}-dlq"
         if self.producer:
             data["_error"] = error
             data["_original_id"] = message_id
             payload = json.dumps(data).encode("utf-8")
             await self.producer.send_and_wait(dlq_topic, payload)

             # Clear retry state
             if self.state_store:
                 retry_key = f"{self.retry_tracker_prefix}{message_id}"
                 await self.state_store.delete(retry_key)

    async def get_pending_info(self) -> Dict[str, Any]:
        """
        Retrieves lag info from Kafka by comparing committed offsets to end offsets.
        """
        if not self.consumer:
            return {"lag": 0, "pending": 0, "status": "disconnected"}

        try:
            partitions = self.consumer.assignment()
            if not partitions:
                # If no assignment yet, we can't calculate lag
                return {"lag": 0, "pending": 0, "status": "no_assignment"}

            # Get committed offsets
            committed_tasks = [self.consumer.committed(tp) for tp in partitions]
            committed_results = await asyncio.gather(*committed_tasks)
            committed_map = dict(zip(partitions, committed_results))

            # Get end offsets
            end_offsets = await self.consumer.end_offsets(partitions)

            total_lag = 0
            for tp in partitions:
                committed_offset = committed_map.get(tp)
                end_offset = end_offsets.get(tp, 0)

                if committed_offset is not None:
                    total_lag += max(0, end_offset - committed_offset)
                else:
                    # If never committed, we assume lag is the full partition if we don't know start
                    total_lag += end_offset

            return {
                "lag": total_lag,
                "pending": total_lag,
                "partition_count": len(partitions),
                "status": "connected"
            }
        except Exception as e:
            logger.warning(f"Failed to calculate Kafka lag: {e}")
            return {"lag": 0, "pending": 0, "error": str(e)}

clone_with_topic(topic)

Create a new backend instance for a different topic, sharing the underlying connection.

Source code in pspf/connectors/kafka.py
48
49
50
51
52
53
54
55
56
def clone_with_topic(self, topic: str) -> "KafkaStreamBackend":
    """Create a new backend instance for a different topic, sharing the underlying connection."""
    return KafkaStreamBackend(
        bootstrap_servers=self.bootstrap_servers,
        topic=topic,
        group_id=self.group_id,
        client_id=self.client_id,
        state_store=self.state_store
    )

get_pending_info() async

Retrieves lag info from Kafka by comparing committed offsets to end offsets.

Source code in pspf/connectors/kafka.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
async def get_pending_info(self) -> Dict[str, Any]:
    """
    Retrieves lag info from Kafka by comparing committed offsets to end offsets.
    """
    if not self.consumer:
        return {"lag": 0, "pending": 0, "status": "disconnected"}

    try:
        partitions = self.consumer.assignment()
        if not partitions:
            # If no assignment yet, we can't calculate lag
            return {"lag": 0, "pending": 0, "status": "no_assignment"}

        # Get committed offsets
        committed_tasks = [self.consumer.committed(tp) for tp in partitions]
        committed_results = await asyncio.gather(*committed_tasks)
        committed_map = dict(zip(partitions, committed_results))

        # Get end offsets
        end_offsets = await self.consumer.end_offsets(partitions)

        total_lag = 0
        for tp in partitions:
            committed_offset = committed_map.get(tp)
            end_offset = end_offsets.get(tp, 0)

            if committed_offset is not None:
                total_lag += max(0, end_offset - committed_offset)
            else:
                # If never committed, we assume lag is the full partition if we don't know start
                total_lag += end_offset

        return {
            "lag": total_lag,
            "pending": total_lag,
            "partition_count": len(partitions),
            "status": "connected"
        }
    except Exception as e:
        logger.warning(f"Failed to calculate Kafka lag: {e}")
        return {"lag": 0, "pending": 0, "error": str(e)}

increment_retry_count(message_id) async

Increment and return the retry count for a message. If a StateStore is available, it persists the count.

Source code in pspf/connectors/kafka.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
async def increment_retry_count(self, message_id: str) -> int:
    """
    Increment and return the retry count for a message.
    If a StateStore is available, it persists the count.
    """
    if self.state_store:
        retry_key = f"{self.retry_tracker_prefix}{message_id}"
        current_count = await self.state_store.get(retry_key, 0)
        new_count = current_count + 1
        await self.state_store.put(retry_key, new_count)
        return new_count

    if not hasattr(self, "_retry_counts"):
        self._retry_counts: Dict[str, int] = {}

    self._retry_counts[message_id] = self._retry_counts.get(message_id, 0) + 1
    return self._retry_counts[message_id]

Memory (Testing)

MemoryBackend

Bases: StreamingBackend

In-Memory backend for testing/local verification. Not persistent across restarts.

Source code in pspf/connectors/memory.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class MemoryBackend(StreamingBackend):
    """
    In-Memory backend for testing/local verification.
    Not persistent across restarts.
    """
    def __init__(self, stream_key: str = "default-stream", group_name: str = "default-group") -> None:
        self._stream_key = stream_key
        self._group_name = group_name
        self._last_ts = 0
        self._last_seq = 0

        # Format: stream_key -> [ {'_id': '...', 'data': ...} ]
        self._streams: Dict[str, List[Dict[str, Any]]] = {}

        # Format: group_name -> consumer_name -> {msg_id, ...}

        # Format: group_name -> consumer_name -> {msg_id, ...}
        # Simplified PEL (Pending Entries List)
        self._pel: Dict[str, Dict[str, Dict[str, Any]]] = {}

        # Format: group_name -> stream_key -> last_read_id
        # We need to track offsets per group
        self._offsets: Dict[str, Dict[str, str]] = {}

        # Retry Tracker: msg_id -> count
        self._retries: Dict[str, int] = {}

        self.dlq: Dict[str, List[Dict[str, Any]]] = {}

        self._lock = asyncio.Lock()
        self._connected = False

    @property
    def stream_key(self) -> str:
        return self._stream_key

    @property
    def group_name(self) -> str:
        return self._group_name

    def clone_with_topic(self, topic: str) -> "MemoryBackend":
        """Create a new backend instance for a different topic, sharing the underlying memory structures."""
        mb = MemoryBackend(stream_key=topic, group_name=self.group_name)
        mb._streams = self._streams
        mb._pel = self._pel
        mb._offsets = self._offsets
        mb._retries = self._retries
        mb.dlq = self.dlq
        mb._last_ts = self._last_ts
        mb._last_seq = self._last_seq
        mb._connected = self._connected
        return mb

    async def connect(self) -> None:
        self._connected = True
        logger.info("Connected to MemoryBackend")

    async def close(self) -> None:
        self._connected = False
        logger.info("Closed MemoryBackend")

    async def ping(self) -> bool:
        if not self._connected:
            raise ConnectionError("Not connected")
        return True

    async def ensure_group_exists(self, start_id: str = "0") -> None:
        # For memory, we just init the offset structure if missing
        pass

    async def add_event(self, data: Dict[str, Any], max_len: Optional[int] = None) -> str:
        async with self._lock:
            # Generate ID: timestamp-sequence
            ts = int(time.time() * 1000)
            if ts <= self._last_ts:
                # Same ms, increment sequence
                self._last_seq += 1
                # If we drifted too far behind clock? No, just sequence matters.
                # Actually, if we are in same MS, we use same TS and increment seq.
                ts = self._last_ts
            else:
                self._last_ts = ts
                self._last_seq = 0

            msg_id = f"{ts}-{self._last_seq}"

            # Store internal structure
            msg = {"_id": msg_id, **data}

            if self.stream_key not in self._streams:
                self._streams[self.stream_key] = []

            self._streams[self.stream_key].append(msg)
            return msg_id

    async def read_batch(self, count: int = 10, block_ms: int = 1000) -> List[Tuple[str, Dict[str, Any]]]:
        # Basic implementation: read from last offset
        async with self._lock:
            stream = self._streams.get(self.stream_key, [])

            # Return last 'count' messages for now to simulate flow

            current_offset = self._offsets.get(self.group_name, {}).get(self.stream_key, "0-0")

            # Filter messages > current_offset
            new_msgs = []
            for m in stream:
                if m["_id"] > current_offset:
                    new_msgs.append(m)

            batch = new_msgs[:count]

            if batch:
                # Update offset to the last one
                last_id = batch[-1]["_id"]
                if self.group_name not in self._offsets:
                    self._offsets[self.group_name] = {}
                self._offsets[self.group_name][self.stream_key] = last_id

            # Allow clean tuple return
            return [(m["_id"], {k:v for k,v in m.items() if k != "_id"}) for m in batch]

    async def ack_batch(self, message_ids: List[str]) -> None:
        # Remove from PEL, potentially
        pass

    async def claim_stuck_messages(self, min_idle_time_ms: int = 60000, count: int = 10) -> List[Tuple[str, Dict[str, Any]]]:
        return []

    async def increment_retry_count(self, message_id: str) -> int:
        self._retries[message_id] = self._retries.get(message_id, 0) + 1
        return self._retries[message_id]

    async def move_to_dlq(self, message_id: str, data: Dict[str, Any], error: str) -> None:
        if self.stream_key not in self.dlq:
            self.dlq[self.stream_key] = []
        self.dlq[self.stream_key].append({"id": message_id, "data": data, "error": error})

    async def get_pending_info(self) -> Dict[str, Any]:
        """
        Returns dummy pending info for memory backend.
        Calculation of real lag in memory backend is possible but skipped for brevity.
        """
        # Calculate lag: total messages - processed
        total_msgs = len(self._streams.get(self.stream_key, []))
        # This is a rough approximation
        return {
            "pending": 0,
            "lag": total_msgs, 
            "consumers": 1
        }

clone_with_topic(topic)

Create a new backend instance for a different topic, sharing the underlying memory structures.

Source code in pspf/connectors/memory.py
50
51
52
53
54
55
56
57
58
59
60
61
def clone_with_topic(self, topic: str) -> "MemoryBackend":
    """Create a new backend instance for a different topic, sharing the underlying memory structures."""
    mb = MemoryBackend(stream_key=topic, group_name=self.group_name)
    mb._streams = self._streams
    mb._pel = self._pel
    mb._offsets = self._offsets
    mb._retries = self._retries
    mb.dlq = self.dlq
    mb._last_ts = self._last_ts
    mb._last_seq = self._last_seq
    mb._connected = self._connected
    return mb

get_pending_info() async

Returns dummy pending info for memory backend. Calculation of real lag in memory backend is possible but skipped for brevity.

Source code in pspf/connectors/memory.py
148
149
150
151
152
153
154
155
156
157
158
159
160
async def get_pending_info(self) -> Dict[str, Any]:
    """
    Returns dummy pending info for memory backend.
    Calculation of real lag in memory backend is possible but skipped for brevity.
    """
    # Calculate lag: total messages - processed
    total_msgs = len(self._streams.get(self.stream_key, []))
    # This is a rough approximation
    return {
        "pending": 0,
        "lag": total_msgs, 
        "consumers": 1
    }

Local Log

FileStreamBackend

Bases: StreamingBackend

Very simple file-based backend that reads/writes line-delimited JSON. Useful for testing or local-only processing.

Source code in pspf/connectors/file.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class FileStreamBackend(StreamingBackend):
    """
    Very simple file-based backend that reads/writes line-delimited JSON.
    Useful for testing or local-only processing.
    """
    def __init__(self, path: str, stream_key: str = "default", group_name: str = "default", consumer_name: str = "worker-1"):
        self.path = path
        self._stream_key = stream_key
        self._group_name = group_name
        self.consumer_name = consumer_name
        self._current_offset = 0

    @property
    def stream_key(self) -> str:
        return self._stream_key

    @property
    def group_name(self) -> str:
        return self._group_name

    def clone_with_topic(self, topic: str) -> "FileStreamBackend":
        return FileStreamBackend(
            path=self.path,
            stream_key=topic,
            group_name=self.group_name,
            consumer_name=self.consumer_name
        )

    async def connect(self) -> None:
        # Ensure file exists
        if not os.path.exists(self.path):
            with open(self.path, 'w') as f:
                pass
        logger.info(f"Connected to FileBackend at {self.path}")

    async def close(self) -> None:
        pass

    async def ping(self) -> bool:
        return os.path.exists(self.path)

    async def ensure_group_exists(self, start_id: str = "0") -> None:
        pass

    async def read_batch(self, count: int = 10, block_ms: int = 1000) -> List[Tuple[str, Dict[str, Any]]]:
        """Reads a batch of lines from the file."""
        messages: List[Tuple[str, Dict[str, Any]]] = []
        try:
            with open(self.path, 'r') as f:
                # Naive implementation: skip to current offset
                for _ in range(self._current_offset):
                    f.readline()

                for _ in range(count):
                    line = f.readline()
                    if not line:
                        break

                    self._current_offset += 1
                    msg_id = str(self._current_offset)
                    try:
                        data = json.loads(line.strip())
                        messages.append((msg_id, data))
                    except json.JSONDecodeError:
                        logger.warning(f"Skipping invalid JSON line {msg_id}")
                        continue
        except Exception as e:
            logger.error(f"Error reading file {self.path}: {e}")

        return messages

    async def ack_batch(self, message_ids: List[str]) -> None:
        # File backend is too simple for ACK tracking right now
        pass

    async def add_event(self, data: Dict[str, Any], max_len: Optional[int] = None) -> str:
        """Appends a JSON line to the file."""
        try:
            with open(self.path, 'a') as f:
                f.write(json.dumps(data) + "\n")
            # In accurate file-log we'd return the byte offset, 
            # but for this simple backend we'll just return a success dummy.
            return "ok"
        except Exception as e:
            logger.error(f"Error writing to {self.path}: {e}")
            raise

    async def claim_stuck_messages(self, min_idle_time_ms: int = 60000, count: int = 10) -> List[Tuple[str, Dict[str, Any]]]:
        return []

    async def increment_retry_count(self, message_id: str) -> int:
        return 0

    async def move_to_dlq(self, message_id: str, data: Dict[str, Any], error: str) -> None:
        dlq_path = f"{self.path}.dlq"
        with open(dlq_path, 'a') as f:
            data["_error"] = error
            f.write(json.dumps(data) + "\n")

    async def get_pending_info(self) -> Dict[str, Any]:
        return {"pending": 0, "lag": 0, "consumers": 1}

add_event(data, max_len=None) async

Appends a JSON line to the file.

Source code in pspf/connectors/file.py
85
86
87
88
89
90
91
92
93
94
95
async def add_event(self, data: Dict[str, Any], max_len: Optional[int] = None) -> str:
    """Appends a JSON line to the file."""
    try:
        with open(self.path, 'a') as f:
            f.write(json.dumps(data) + "\n")
        # In accurate file-log we'd return the byte offset, 
        # but for this simple backend we'll just return a success dummy.
        return "ok"
    except Exception as e:
        logger.error(f"Error writing to {self.path}: {e}")
        raise

read_batch(count=10, block_ms=1000) async

Reads a batch of lines from the file.

Source code in pspf/connectors/file.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def read_batch(self, count: int = 10, block_ms: int = 1000) -> List[Tuple[str, Dict[str, Any]]]:
    """Reads a batch of lines from the file."""
    messages: List[Tuple[str, Dict[str, Any]]] = []
    try:
        with open(self.path, 'r') as f:
            # Naive implementation: skip to current offset
            for _ in range(self._current_offset):
                f.readline()

            for _ in range(count):
                line = f.readline()
                if not line:
                    break

                self._current_offset += 1
                msg_id = str(self._current_offset)
                try:
                    data = json.loads(line.strip())
                    messages.append((msg_id, data))
                except json.JSONDecodeError:
                    logger.warning(f"Skipping invalid JSON line {msg_id}")
                    continue
    except Exception as e:
        logger.error(f"Error reading file {self.path}: {e}")

    return messages