|
1 | 1 | import warnings |
2 | 2 | from collections.abc import Generator |
| 3 | +from datetime import datetime, timedelta, timezone |
3 | 4 | from typing import Any |
4 | 5 |
|
5 | 6 | from kubernetes.dynamic import DynamicClient |
@@ -90,6 +91,81 @@ def get( |
90 | 91 | timeout=timeout, |
91 | 92 | ) |
92 | 93 |
|
| 94 | + @staticmethod |
| 95 | + def _parse_timestamp(event: Any) -> datetime | None: |
| 96 | + """Parse event timestamp, preferring lastTimestamp over creationTimestamp.""" |
| 97 | + timestamp = event.get("lastTimestamp") or event.get("metadata", {}).get("creationTimestamp") |
| 98 | + if not timestamp: |
| 99 | + return None |
| 100 | + try: |
| 101 | + return datetime.fromisoformat(timestamp.replace("Z", "+00:00")) |
| 102 | + except (ValueError, TypeError): |
| 103 | + LOGGER.debug(f"Failed to parse event timestamp: {timestamp}") |
| 104 | + return None |
| 105 | + |
| 106 | + @classmethod |
| 107 | + def list( |
| 108 | + cls, |
| 109 | + client: DynamicClient, |
| 110 | + namespace: str | None = None, |
| 111 | + field_selector: str | None = None, |
| 112 | + label_selector: str | None = None, |
| 113 | + since_seconds: int = 300, |
| 114 | + ) -> list[Any]: |
| 115 | + """ |
| 116 | + List existing K8s events using a standard API list call (not watch). |
| 117 | +
|
| 118 | + Unlike ``Event.get()`` which uses watch and streams events in real-time, |
| 119 | + this method returns already-existing events immediately. |
| 120 | +
|
| 121 | + Args: |
| 122 | + client: K8s dynamic client. |
| 123 | + namespace: Filter events to this namespace. |
| 124 | + field_selector: Filter events by fields (e.g. ``"type==Warning"``). |
| 125 | + label_selector: Filter events by labels. |
| 126 | + since_seconds: Only return events from the last N seconds (default: 300 = 5 minutes). |
| 127 | +
|
| 128 | + Returns: |
| 129 | + List of event resource objects, sorted by ``lastTimestamp`` descending (most recent first). |
| 130 | +
|
| 131 | + Example: |
| 132 | + List Warning events from the last 5 minutes in a namespace:: |
| 133 | +
|
| 134 | + events = Event.list( |
| 135 | + client=client, |
| 136 | + namespace="my-namespace", |
| 137 | + field_selector="type==Warning", |
| 138 | + ) |
| 139 | + """ |
| 140 | + LOGGER.info("Listing events") |
| 141 | + LOGGER.debug( |
| 142 | + f"list events parameters: namespace={namespace}," |
| 143 | + f" field_selector='{field_selector}', label_selector='{label_selector}'," |
| 144 | + f" since_seconds={since_seconds}" |
| 145 | + ) |
| 146 | + |
| 147 | + resource = client.resources.get(api_version=cls.api_version, kind=cls.__name__) |
| 148 | + kwargs: dict[str, Any] = {} |
| 149 | + if namespace: |
| 150 | + kwargs["namespace"] = namespace |
| 151 | + if field_selector: |
| 152 | + kwargs["field_selector"] = field_selector |
| 153 | + if label_selector: |
| 154 | + kwargs["label_selector"] = label_selector |
| 155 | + |
| 156 | + response = resource.get(**kwargs) |
| 157 | + events = response.items or [] |
| 158 | + |
| 159 | + cutoff = datetime.now(tz=timezone.utc) - timedelta(seconds=since_seconds) |
| 160 | + timed_events: list[tuple[datetime, Any]] = [] |
| 161 | + for event in events: |
| 162 | + event_time = cls._parse_timestamp(event) |
| 163 | + if event_time and event_time >= cutoff: |
| 164 | + timed_events.append((event_time, event)) |
| 165 | + |
| 166 | + timed_events.sort(key=lambda pair: pair[0], reverse=True) |
| 167 | + return [event for _, event in timed_events] |
| 168 | + |
93 | 169 | @classmethod |
94 | 170 | def delete_events( |
95 | 171 | cls, |
|
0 commit comments