-
Notifications
You must be signed in to change notification settings - Fork 63
Expand file tree
/
Copy pathpath.py
More file actions
258 lines (219 loc) · 8.47 KB
/
path.py
File metadata and controls
258 lines (219 loc) · 8.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""Structural path parser for the hq query language."""
import re
from dataclasses import dataclass
from typing import List, Optional, Tuple
class QuerySyntaxError(Exception):
"""Raised when a structural path cannot be parsed."""
@dataclass(frozen=True)
class PathSegment:
"""A single segment in a structural path."""
name: str # identifier or "*" for wildcard
select_all: bool # True if [*] suffix
index: Optional[int] # integer if [N] suffix, None otherwise
recursive: bool = False # True for ".." recursive descent
predicate: object = None # PredicateNode if [select(...)] suffix
type_filter: Optional[str] = None # e.g. "function_call" in function_call:name
skip_labels: bool = False # True if ~ suffix (skip remaining block labels)
# Optional type qualifier prefix: type_filter:name~?[bracket]?
_SEGMENT_RE = re.compile(
r"^(?:([a-z_]+):)?([a-zA-Z_][a-zA-Z0-9_-]*|\*)(~)?(?:\[(\*|[0-9]+)\])?\??$"
)
def parse_path(path_str: str) -> List[PathSegment]: # pylint: disable=too-many-locals
"""Parse a structural path string into segments.
Grammar:
path := segment ("." segment)*
segment := name ("[*]" | "[" INT "]")?
name := "*" | IDENTIFIER
Raises QuerySyntaxError on invalid input.
"""
if not path_str or not path_str.strip():
raise QuerySyntaxError("Empty path")
# jq compat: .[] is an alias for [*]
path_str = path_str.replace(".[]", "[*]")
segments: List[PathSegment] = []
parts = _split_path(path_str)
for is_recursive, part in parts:
# Check for [select(...)] syntax
select_match = _extract_select(part)
if select_match is not None:
seg_name, predicate, type_filter, skip, sel_all, sel_idx = select_match
segments.append(
PathSegment(
name=seg_name,
select_all=sel_all,
index=sel_idx,
recursive=is_recursive,
predicate=predicate,
type_filter=type_filter,
skip_labels=skip,
)
)
continue
match = _SEGMENT_RE.match(part)
if not match:
raise QuerySyntaxError(f"Invalid path segment: {part!r} in {path_str!r}")
type_filter = match.group(1) # optional "type:" prefix
name = match.group(2)
skip_labels = match.group(3) is not None # "~" suffix
bracket = match.group(4)
if bracket is None:
segments.append(
PathSegment(
name=name,
select_all=False,
index=None,
recursive=is_recursive,
type_filter=type_filter,
skip_labels=skip_labels,
)
)
elif bracket == "*":
segments.append(
PathSegment(
name=name,
select_all=True,
index=None,
recursive=is_recursive,
type_filter=type_filter,
skip_labels=skip_labels,
)
)
else:
segments.append(
PathSegment(
name=name,
select_all=False,
index=int(bracket),
recursive=is_recursive,
type_filter=type_filter,
skip_labels=skip_labels,
)
)
return segments
# pylint: disable-next=too-many-statements
def _split_path(path_str: str) -> List[Tuple[bool, str]]:
"""Split a path string into (is_recursive, segment_text) pairs.
Handles both single dots (normal) and double dots (recursive descent).
Bracket-aware: dots inside ``[...]`` are not treated as separators.
"""
result: List[Tuple[bool, str]] = []
i = 0
current: List[str] = []
bracket_depth = 0
paren_depth = 0
while i < len(path_str):
char = path_str[i]
if char == "[":
bracket_depth += 1
current.append(char)
i += 1
elif char == "]":
bracket_depth -= 1
current.append(char)
i += 1
elif char == "(":
paren_depth += 1
current.append(char)
i += 1
elif char == ")":
paren_depth -= 1
current.append(char)
i += 1
elif char == '"':
# Consume entire quoted string, respecting escaped quotes
current.append(char)
i += 1
while i < len(path_str) and path_str[i] != '"':
if path_str[i] == "\\" and i + 1 < len(path_str):
current.append(path_str[i])
i += 1
current.append(path_str[i])
i += 1
if i < len(path_str):
current.append(path_str[i])
i += 1
elif char == "." and bracket_depth == 0 and paren_depth == 0:
# Emit current segment if any
if current:
result.append((False, "".join(current)))
current = []
elif not result:
raise QuerySyntaxError(f"Path cannot start with '.': {path_str!r}")
# Check for ".." (recursive descent)
if i + 1 < len(path_str) and path_str[i + 1] == ".":
i += 2 # skip both dots
# Collect the next segment (respecting brackets)
next_seg: List[str] = []
bracket_depth = 0
while i < len(path_str):
char = path_str[i]
if char == "[":
bracket_depth += 1
elif char == "]":
bracket_depth -= 1
elif char == "." and bracket_depth == 0:
break
next_seg.append(char)
i += 1
if not next_seg:
raise QuerySyntaxError(f"Expected segment after '..': {path_str!r}")
result.append((True, "".join(next_seg)))
else:
i += 1 # skip single dot
else:
current.append(char)
i += 1
if current:
result.append((False, "".join(current)))
if not result:
raise QuerySyntaxError(f"Empty path: {path_str!r}")
return result
def _extract_select(part: str) -> Optional[tuple]: # pylint: disable=too-many-locals
"""Extract ``name[select(...)]`` from a segment string.
Returns ``(name, predicate_node)`` or ``None`` if not a select bracket.
"""
select_marker = "[select("
idx = part.find(select_marker)
if idx == -1:
return None
seg_name = part[:idx]
if not seg_name or not re.match(
r"^(?:[a-z_]+:)?(?:[a-zA-Z_][a-zA-Z0-9_-]*|\*)~?$", seg_name
):
raise QuerySyntaxError(f"Invalid segment name before [select(): {seg_name!r}")
# Parse optional type_filter:name prefix
type_filter = None
if ":" in seg_name:
type_filter, seg_name = seg_name.split(":", 1)
# Parse optional ~ suffix
skip_labels = seg_name.endswith("~")
if skip_labels:
seg_name = seg_name[:-1]
# Find matching )] for select(...), allowing optional trailing [*] or [N]
inner_start = idx + len(select_marker)
close_idx = part.find(")]", inner_start)
if close_idx == -1:
raise QuerySyntaxError(f"Expected )] at end of select bracket in: {part!r}")
inner = part[inner_start:close_idx]
tail = part[close_idx + 2 :] # text after ")]"
from hcl2.query.predicate import parse_predicate
predicate = parse_predicate(inner)
# Parse optional trailing [*] or [N] after [select(...)], with optional ?
select_all = True # default: select returns all matches
index = None
if tail:
# Strip trailing ? (optional operator is a no-op at segment level)
clean_tail = tail.rstrip("?")
if clean_tail:
tail_match = re.match(r"^\[(\*|[0-9]+)\]$", clean_tail)
if not tail_match:
raise QuerySyntaxError(
f"Unexpected suffix after [select(...)]: {tail!r} in {part!r}"
)
bracket = tail_match.group(1)
if bracket == "*":
select_all = True
else:
select_all = False
index = int(bracket)
return (seg_name, predicate, type_filter, skip_labels, select_all, index)