|
4 | 4 | import logging |
5 | 5 | from os import path |
6 | 6 | from pathlib import Path |
7 | | -import re |
8 | 7 | import sys |
9 | | -from typing import Any, Dict, List, Literal, Optional, Union, overload |
| 8 | +from typing import Any, Literal, Optional, Union, overload |
10 | 9 | from typing_extensions import Self |
11 | 10 | from jinja2 import Environment, FileSystemLoader |
| 11 | +import toml |
| 12 | + |
| 13 | +# Note that the `noqa: F401` annotations are because these imports are needed so |
| 14 | +# that the descriptors end up in the default descriptor pool, but aren't used |
| 15 | +# explicitly and thus would be otherwise flagged as unused imports. |
12 | 16 | from cel.expr import checked_pb2, eval_pb2, value_pb2 |
13 | 17 | from cel.expr.conformance.test import simple_pb2 |
14 | 18 | from cel.expr.conformance.proto2 import ( |
|
39 | 43 |
|
40 | 44 | pool = descriptor_pool.Default() # type: ignore [no-untyped-call] |
41 | 45 |
|
| 46 | +ScenarioConfigInput = Union[str, list[str], dict[Literal["tags"], list[str]]] |
42 | 47 |
|
43 | | -class SkipList: |
44 | | - def __init__(self, path: str) -> None: |
45 | | - self.list = [] |
46 | | - logger.debug(f"Reading from {path}...") |
47 | | - blank = re.compile("^[ \t]*(#|$)") |
48 | | - with open(path, encoding="utf_8") as file_handle: |
49 | | - for line in file_handle: |
50 | | - if blank.match(line) is not None: |
51 | | - continue |
52 | | - stripped = line.split("#")[0].strip() |
53 | | - tuple = stripped.split(":") |
54 | | - if len(tuple) != 3: |
55 | | - logger.warning(f"Skipping invalid line: {line}") |
| 48 | + |
| 49 | +class ScenarioConfig: |
| 50 | + def __init__( |
| 51 | + self, section: "SectionConfig", name: str, input: ScenarioConfigInput |
| 52 | + ) -> None: |
| 53 | + self.section = section |
| 54 | + self.name = name |
| 55 | + self.tags: list[str] = [] |
| 56 | + self.__has_logged_error_context = False |
| 57 | + self.__load_tags(input) |
| 58 | + |
| 59 | + def __log_error_context(self) -> None: |
| 60 | + if not self.__has_logged_error_context: |
| 61 | + logger.error( |
| 62 | + f"[{self.section.feature.name}.{self.section.name}.{self.name}]" |
| 63 | + ) |
| 64 | + |
| 65 | + def __load_tags(self, input: ScenarioConfigInput) -> None: |
| 66 | + if isinstance(input, str): |
| 67 | + if not input.startswith("@"): |
| 68 | + self.__log_error_context() |
| 69 | + logger.error( |
| 70 | + f' Skipping invalid tag (must start with "@"): {repr(input)}' |
| 71 | + ) |
| 72 | + logger.error(f" Did you mean {repr('@' + input)}?") |
| 73 | + self.tags.append(input) |
| 74 | + elif isinstance(input, list): |
| 75 | + for tag in input: |
| 76 | + if not isinstance(tag, str): |
| 77 | + self.__log_error_context() |
| 78 | + logger.error( |
| 79 | + f" Skipping invalid tag (must be a string): {repr(tag)}" |
| 80 | + ) |
56 | 81 | continue |
57 | | - self.list.append(stripped) |
| 82 | + self.__load_tags(tag) |
| 83 | + elif "tags" in input and isinstance(input["tags"], list): |
| 84 | + self.__load_tags(input["tags"]) |
| 85 | + else: |
| 86 | + self.__log_error_context() |
| 87 | + logger.error(f" Skipping invalid scenario config: {repr(input)}") |
| 88 | + |
58 | 89 |
|
59 | | - def should_skip(self, feature: str, section: str, scenario: str) -> bool: |
60 | | - return f"{feature}:{section}:{scenario}" in self.list |
| 90 | +SectionConfigInput = dict[str, ScenarioConfigInput] |
61 | 91 |
|
62 | 92 |
|
63 | | -skip_list = SkipList(f"{path.dirname(__file__)}/wip.txt") |
| 93 | +class SectionConfig: |
| 94 | + def __init__( |
| 95 | + self, feature: "FeatureConfig", name: str, input: SectionConfigInput |
| 96 | + ) -> None: |
| 97 | + self.feature = feature |
| 98 | + self.name = name |
| 99 | + self.scenarios = [] |
| 100 | + if isinstance(input, dict): |
| 101 | + for name, value in input.items(): |
| 102 | + self.scenarios.append(ScenarioConfig(self, name, value)) |
| 103 | + else: |
| 104 | + logger.error(f"[{self.feature.name}.{self.name}]") |
| 105 | + logger.error(f" Skipping invalid section config: {repr(input)}") |
| 106 | + |
| 107 | + def tags_for(self, scenario: str) -> list[str]: |
| 108 | + for s in self.scenarios: |
| 109 | + if s.name == scenario: |
| 110 | + return s.tags |
| 111 | + return [] |
| 112 | + |
| 113 | + |
| 114 | +FeatureConfigInput = dict[str, SectionConfigInput] |
| 115 | + |
| 116 | + |
| 117 | +class FeatureConfig: |
| 118 | + def __init__(self, name: str, input: FeatureConfigInput) -> None: |
| 119 | + self.name = name |
| 120 | + self.sections = [] |
| 121 | + if isinstance(input, dict): |
| 122 | + for name, value in input.items(): |
| 123 | + self.sections.append(SectionConfig(self, name, value)) |
| 124 | + else: |
| 125 | + logger.error(f"[{self.name}]") |
| 126 | + logger.error(f" Skipping invalid feature config: {repr(input)}") |
| 127 | + |
| 128 | + def tags_for(self, section: str, scenario: str) -> list[str]: |
| 129 | + for s in self.sections: |
| 130 | + if s.name == section: |
| 131 | + return s.tags_for(scenario) |
| 132 | + return [] |
| 133 | + |
| 134 | + |
| 135 | +class Config: |
| 136 | + def __init__(self, path: str) -> None: |
| 137 | + self.features: list[FeatureConfig] = [] |
| 138 | + logger.debug(f"Reading from {repr(path)}...") |
| 139 | + input = toml.load(path) |
| 140 | + |
| 141 | + if isinstance(input, dict): |
| 142 | + for name, value in input.items(): |
| 143 | + self.features.append(FeatureConfig(name, value)) |
| 144 | + else: |
| 145 | + logger.error(f"Could not read from {repr(path)}") |
| 146 | + |
| 147 | + def tags_for(self, feature: str, section: str, scenario: str) -> list[str]: |
| 148 | + for f in self.features: |
| 149 | + if f.name == feature: |
| 150 | + return f.tags_for(section, scenario) |
| 151 | + return [] |
| 152 | + |
| 153 | + |
| 154 | +wip_config = Config(f"{path.dirname(__file__)}/wip.toml") |
64 | 155 |
|
65 | 156 |
|
66 | 157 | class Result: |
@@ -490,7 +581,7 @@ class CELList(CELValue): |
490 | 581 | type_name = "celpy.celtypes.ListType" |
491 | 582 |
|
492 | 583 | def __init__( |
493 | | - self, source: Union[struct_pb2.ListValue, value_pb2.ListValue, List[CELValue]] |
| 584 | + self, source: Union[struct_pb2.ListValue, value_pb2.ListValue, list[CELValue]] |
494 | 585 | ) -> None: |
495 | 586 | if isinstance(source, (struct_pb2.ListValue, value_pb2.ListValue)): |
496 | 587 | self.values = [CELValue.from_proto(v) for v in source.values] |
@@ -524,7 +615,7 @@ class CELMap(CELValue): |
524 | 615 | type_name = "celpy.celtypes.MapType" |
525 | 616 |
|
526 | 617 | def __init__( |
527 | | - self, source: Union[struct_pb2.Struct, value_pb2.MapValue, Dict[str, CELValue]] |
| 618 | + self, source: Union[struct_pb2.Struct, value_pb2.MapValue, dict[str, CELValue]] |
528 | 619 | ) -> None: |
529 | 620 | self.value = {} |
530 | 621 | if isinstance(source, struct_pb2.Struct): |
@@ -656,7 +747,7 @@ def __repr__(self) -> str: |
656 | 747 | class CELErrorSet(CELValue): |
657 | 748 | type_name = "CELEvalError" |
658 | 749 |
|
659 | | - def __init__(self, message: Union[eval_pb2.ErrorSet, List[CELStatus], str]) -> None: |
| 750 | + def __init__(self, message: Union[eval_pb2.ErrorSet, list[CELStatus], str]) -> None: |
660 | 751 | self.errors = [] |
661 | 752 | if isinstance(message, eval_pb2.ErrorSet): |
662 | 753 | for status in message.errors: |
@@ -764,14 +855,12 @@ def __init__( |
764 | 855 | logger.debug(f"Scenario {source.name}") |
765 | 856 | self.name = source.name |
766 | 857 | self.description = source.description |
767 | | - self.tag = ( |
768 | | - "@wip\n" |
769 | | - if skip_list.should_skip(feature.name, section.name, source.name) |
770 | | - else "" |
771 | | - ) |
772 | | - self.preconditions: List[str] = [] |
773 | | - self.events: List[str] = [] |
774 | | - self.outcomes: List[str] = [] |
| 858 | + self.tag = "" |
| 859 | + for tag in wip_config.tags_for(feature.name, section.name, source.name): |
| 860 | + self.tag += f"{tag}\n" |
| 861 | + self.preconditions: list[str] = [] |
| 862 | + self.events: list[str] = [] |
| 863 | + self.outcomes: list[str] = [] |
775 | 864 |
|
776 | 865 | if source.disable_macros: |
777 | 866 | self.given("disable_macros parameter is True") |
@@ -804,7 +893,7 @@ def then(self, event: str) -> Self: |
804 | 893 | return self |
805 | 894 |
|
806 | 895 | @property |
807 | | - def steps(self) -> List[str]: |
| 896 | + def steps(self) -> list[str]: |
808 | 897 | steps = [] |
809 | 898 | if len(self.preconditions) > 0: |
810 | 899 | steps.append(f"Given {self.preconditions[0]}") |
@@ -866,7 +955,7 @@ def write_to_file(self, path: Optional[Path]) -> None: |
866 | 955 | print(gherkin) |
867 | 956 |
|
868 | 957 |
|
869 | | -def get_options(argv: List[str] = sys.argv[1:]) -> argparse.Namespace: |
| 958 | +def get_options(argv: list[str] = sys.argv[1:]) -> argparse.Namespace: |
870 | 959 | parser = argparse.ArgumentParser() |
871 | 960 | parser.add_argument( |
872 | 961 | "-v", |
|
0 commit comments