Skip to content

registry

registry ¤

OPERATOR_REGISTRY = ContextVar('OPERATOR_REGISTRY', default=OperatorRegistry.from_default_rules()) module-attribute ¤

Context variable holding the current global operator registry. This is updated when entering an operator registry context.

OperatorNotFound ¤

Bases: Exception

Source code in cirkit/symbolic/registry.py
12
13
14
15
16
17
18
class OperatorNotFound(Exception):
    def __init__(self, op: LayerOperator):
        super().__init__()
        self._operator = op

    def __repr__(self) -> str:
        return f"Symbolic operator named '{self._operator.name}' not found"

_operator = op instance-attribute ¤

__init__(op) ¤

Source code in cirkit/symbolic/registry.py
13
14
15
def __init__(self, op: LayerOperator):
    super().__init__()
    self._operator = op

__repr__() ¤

Source code in cirkit/symbolic/registry.py
17
18
def __repr__(self) -> str:
    return f"Symbolic operator named '{self._operator.name}' not found"

OperatorRegistry ¤

Bases: AbstractContextManager

Source code in cirkit/symbolic/registry.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class OperatorRegistry(AbstractContextManager):
    def __init__(self):
        # The symbolic operator rule specifications, for each symbolic operator over layers
        self._rules: dict[LayerOperator, LayerOperatorSpecs] = defaultdict(dict)

        # The token used to restore the operator registry context
        self._token: Token[OperatorRegistry] | None = None

    @classmethod
    def from_default_rules(cls) -> "OperatorRegistry":
        registry = cls()
        for op, funcs in DEFAULT_OPERATOR_RULES.items():
            for f in funcs:
                registry.add_rule(op, f)
        return registry

    @property
    def operators(self) -> Iterable[LayerOperator]:
        return self._rules.keys()

    def __enter__(self) -> "OperatorRegistry":
        self._token = OPERATOR_REGISTRY.set(self)
        return self

    def __exit__(
        self,
        __exc_type: type[BaseException] | None,
        __exc_value: BaseException | None,
        __traceback: TracebackType | None,
    ) -> bool | None:
        OPERATOR_REGISTRY.reset(self._token)
        self._token = None
        return None

    def has_rule(self, op: LayerOperator, *signature: type[Layer]) -> bool:
        if op not in self._rules:
            return False
        op_rules = self._rules[op]
        known_signatures = op_rules.keys()
        if signature in known_signatures:
            return True
        for s in known_signatures:
            if len(signature) != len(s):
                continue
            if all(issubclass(x[0], x[1]) for x in zip(signature, s)):
                return True
        return False

    def retrieve_rule(self, op: LayerOperator, *signature: type[Layer]) -> LayerOperatorFunc:
        if op not in self._rules:
            raise OperatorNotFound(op)
        op_rules = self._rules[op]
        known_signatures = op_rules.keys()
        if signature in known_signatures:
            return op_rules[signature]
        raise OperatorSignatureNotFound(op, *signature)

    def add_rule(self, op: LayerOperator, func: LayerOperatorFunc):
        args = func.__annotations__.copy()
        arg_names = args.keys()
        if "return" not in arg_names or not issubclass(args["return"], CircuitBlock):
            raise ValueError(
                f"The function is not an operator over symbolic layers.\nIdentifier: {func.__name__}\nAnnotations: {args}"
            )
        del args["return"]
        arg_names = list(args.keys())
        arg_types = [args[a] for a in arg_names]
        arg_layer_types = [
            x for x in enumerate(arg_types) if isinstance(x[1], type) and issubclass(x[1], Layer)
        ]
        arg_layer_types_locs, signature = zip(*arg_layer_types)
        if arg_layer_types_locs != tuple(range(len(arg_layer_types_locs))):
            raise ValueError(
                "The layer operands should be the first arguments of the operator rule function"
            )
        self._rules[op][signature] = func

_rules = defaultdict(dict) instance-attribute ¤

_token = None instance-attribute ¤

operators property ¤

__enter__() ¤

Source code in cirkit/symbolic/registry.py
53
54
55
def __enter__(self) -> "OperatorRegistry":
    self._token = OPERATOR_REGISTRY.set(self)
    return self

__exit__(__exc_type, __exc_value, __traceback) ¤

Source code in cirkit/symbolic/registry.py
57
58
59
60
61
62
63
64
65
def __exit__(
    self,
    __exc_type: type[BaseException] | None,
    __exc_value: BaseException | None,
    __traceback: TracebackType | None,
) -> bool | None:
    OPERATOR_REGISTRY.reset(self._token)
    self._token = None
    return None

__init__() ¤

Source code in cirkit/symbolic/registry.py
34
35
36
37
38
39
def __init__(self):
    # The symbolic operator rule specifications, for each symbolic operator over layers
    self._rules: dict[LayerOperator, LayerOperatorSpecs] = defaultdict(dict)

    # The token used to restore the operator registry context
    self._token: Token[OperatorRegistry] | None = None

add_rule(op, func) ¤

Source code in cirkit/symbolic/registry.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def add_rule(self, op: LayerOperator, func: LayerOperatorFunc):
    args = func.__annotations__.copy()
    arg_names = args.keys()
    if "return" not in arg_names or not issubclass(args["return"], CircuitBlock):
        raise ValueError(
            f"The function is not an operator over symbolic layers.\nIdentifier: {func.__name__}\nAnnotations: {args}"
        )
    del args["return"]
    arg_names = list(args.keys())
    arg_types = [args[a] for a in arg_names]
    arg_layer_types = [
        x for x in enumerate(arg_types) if isinstance(x[1], type) and issubclass(x[1], Layer)
    ]
    arg_layer_types_locs, signature = zip(*arg_layer_types)
    if arg_layer_types_locs != tuple(range(len(arg_layer_types_locs))):
        raise ValueError(
            "The layer operands should be the first arguments of the operator rule function"
        )
    self._rules[op][signature] = func

from_default_rules() classmethod ¤

Source code in cirkit/symbolic/registry.py
41
42
43
44
45
46
47
@classmethod
def from_default_rules(cls) -> "OperatorRegistry":
    registry = cls()
    for op, funcs in DEFAULT_OPERATOR_RULES.items():
        for f in funcs:
            registry.add_rule(op, f)
    return registry

has_rule(op, *signature) ¤

Source code in cirkit/symbolic/registry.py
67
68
69
70
71
72
73
74
75
76
77
78
79
def has_rule(self, op: LayerOperator, *signature: type[Layer]) -> bool:
    if op not in self._rules:
        return False
    op_rules = self._rules[op]
    known_signatures = op_rules.keys()
    if signature in known_signatures:
        return True
    for s in known_signatures:
        if len(signature) != len(s):
            continue
        if all(issubclass(x[0], x[1]) for x in zip(signature, s)):
            return True
    return False

retrieve_rule(op, *signature) ¤

Source code in cirkit/symbolic/registry.py
81
82
83
84
85
86
87
88
def retrieve_rule(self, op: LayerOperator, *signature: type[Layer]) -> LayerOperatorFunc:
    if op not in self._rules:
        raise OperatorNotFound(op)
    op_rules = self._rules[op]
    known_signatures = op_rules.keys()
    if signature in known_signatures:
        return op_rules[signature]
    raise OperatorSignatureNotFound(op, *signature)

OperatorSignatureNotFound ¤

Bases: Exception

Source code in cirkit/symbolic/registry.py
21
22
23
24
25
26
27
28
29
30
class OperatorSignatureNotFound(Exception):
    def __init__(self, op: LayerOperator, *signature: type[Layer]):
        super().__init__()
        self._operator = op
        self._signature = tuple(signature)

    def __str__(self) -> str:
        signature_repr = ", ".join(cls.__name__ for cls in self._signature)
        operator_repr = self._operator.name
        return f"Symbolic operator '{operator_repr}' for signature ({signature_repr}) not found"

_operator = op instance-attribute ¤

_signature = tuple(signature) instance-attribute ¤

__init__(op, *signature) ¤

Source code in cirkit/symbolic/registry.py
22
23
24
25
def __init__(self, op: LayerOperator, *signature: type[Layer]):
    super().__init__()
    self._operator = op
    self._signature = tuple(signature)

__str__() ¤

Source code in cirkit/symbolic/registry.py
27
28
29
30
def __str__(self) -> str:
    signature_repr = ", ".join(cls.__name__ for cls in self._signature)
    operator_repr = self._operator.name
    return f"Symbolic operator '{operator_repr}' for signature ({signature_repr}) not found"