Skip to content

graph

graph ¤

RGNodeMetadata = dict[str, int | float | str | bool] module-attribute ¤

PartitionDict ¤

Bases: TypedDict

The structure of a partition node in the json file.

Source code in cirkit/templates/region_graph/graph.py
28
29
30
31
32
class PartitionDict(TypedDict):
    """The structure of a partition node in the json file."""

    inputs: list[int]  # The inputs of this partition node, specified by id of region node.
    output: int  # The output of this partition node, specified by id of region node.

inputs instance-attribute ¤

output instance-attribute ¤

PartitionNode ¤

Bases: RegionGraphNode

The partition node in the region graph.

Source code in cirkit/templates/region_graph/graph.py
73
74
class PartitionNode(RegionGraphNode):
    """The partition node in the region graph."""

RegionDict ¤

Bases: TypedDict

The structure of a region node in the json file.

Source code in cirkit/templates/region_graph/graph.py
22
23
24
25
class RegionDict(TypedDict):
    """The structure of a region node in the json file."""

    scope: list[int]  # The scope of this region node, specified by id of variable.

scope instance-attribute ¤

RegionGraph ¤

Bases: DiAcyclicGraph[RegionGraphNode]

Source code in cirkit/templates/region_graph/graph.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
@final
class RegionGraph(DiAcyclicGraph[RegionGraphNode]):
    def __init__(
        self,
        nodes: Sequence[RegionGraphNode],
        in_nodes: Mapping[RegionGraphNode, Sequence[RegionGraphNode]],
        outputs: Sequence[RegionGraphNode],
    ) -> None:
        super().__init__(nodes, in_nodes, outputs)
        self._check_structure()

    def _check_structure(self) -> None:
        for node, node_children in self.nodes_inputs.items():
            if isinstance(node, RegionNode):
                for ptn in node_children:
                    if not isinstance(ptn, PartitionNode):
                        raise ValueError(
                            f"Expected partition node as children of '{node}', "
                            f"but found '{ptn}'"
                        )
                    if ptn.scope != node.scope:
                        raise ValueError(
                            f"Expected partition node with scope '{node.scope}', "
                            f"but found '{ptn.scope}"
                        )
                continue
            if not isinstance(node, PartitionNode):
                raise ValueError(
                    f"Region graph nodes must be either partition nodes or region nodes, "
                    f"found '{type(node)}'"
                )
            scopes = []
            for rgn in node_children:
                if not isinstance(rgn, RegionNode):
                    raise ValueError(
                        f"Expected region node as children of '{node}', but found '{rgn}'"
                    )
                scopes.append(rgn.scope)
            scope = Scope.union(*scopes)
            if scope != node.scope or sum(len(sc) for sc in scopes) != len(scope):
                raise ValueError(
                    f"Expecte partitioning of scope '{node.scope}', but found '{scopes}'"
                )
        for ptn in self.partition_nodes:
            rgn_outs = self.node_outputs(ptn)
            if len(rgn_outs) != 1:
                raise ValueError(
                    f"Expected each partition node to have exactly one parent region node,"
                    f" but found {len(rgn_outs)} parent nodes"
                )

    def region_inputs(self, rgn: RegionNode) -> Sequence[PartitionNode]:
        return [cast(PartitionNode, node) for node in self.node_inputs(rgn)]

    def partition_inputs(self, ptn: PartitionNode) -> Sequence[RegionNode]:
        return [cast(RegionNode, node) for node in self.node_inputs(ptn)]

    def region_outputs(self, rgn: RegionNode) -> Sequence[PartitionNode]:
        return [cast(PartitionNode, node) for node in self.node_outputs(rgn)]

    def partition_outputs(self, ptn: PartitionNode) -> Sequence[RegionNode]:
        return [cast(RegionNode, node) for node in self.node_outputs(ptn)]

    @property
    def inputs(self) -> Iterator[RegionNode]:
        return (cast(RegionNode, node) for node in super().inputs)

    @property
    def outputs(self) -> Sequence[RegionNode]:
        return [cast(RegionNode, node) for node in super().outputs]

    @property
    def region_nodes(self) -> Iterator[RegionNode]:
        """Region nodes in the graph."""
        return (node for node in self.nodes if isinstance(node, RegionNode))

    @property
    def partition_nodes(self) -> Iterator[PartitionNode]:
        """Partition nodes in the graph, which are always inner nodes."""
        return (node for node in self.nodes if isinstance(node, PartitionNode))

    @property
    def inner_nodes(self) -> Iterator[RegionGraphNode]:
        """Inner (non-input) nodes in the graph."""
        return (node for node in self.nodes if self.node_inputs(node))

    @property
    def inner_region_nodes(self) -> Iterator[RegionNode]:
        """Inner region nodes in the graph."""
        return (
            node for node in self.region_nodes if self.node_inputs(node) and self.node_outputs(node)
        )

    @cached_property
    def scope(self) -> Scope:
        return Scope.union(*(node.scope for node in self.outputs))

    @cached_property
    def num_variables(self) -> int:
        return len(self.scope)

    @cached_property
    def is_structured_decomposable(self) -> bool:
        is_structured_decomposable = True
        decompositions: dict[Scope, tuple[Scope, ...]] = {}
        for partition in self.partition_nodes:
            # The scopes are sorted by _sort_nodes(), so the tuple has a deterministic order.
            decomp = tuple(region.scope for region in self.node_inputs(partition))
            if partition.scope not in decompositions:
                decompositions[partition.scope] = decomp
            is_structured_decomposable &= decomp == decompositions[partition.scope]
        return is_structured_decomposable

    @cached_property
    def is_omni_compatible(self) -> bool:
        return all(
            len(region.scope) == 1
            for partition in self.partition_nodes
            for region in self.node_inputs(partition)
        )

    def is_compatible(self, other: "RegionGraph", /, *, scope: Iterable[int] | None = None) -> bool:
        """Test compatibility with another region graph over the given scope.

        Args:
            other (RegionGraph): The other region graph to compare with.
            scope (Optional[Iterable[int]], optional): The scope over which to check. If None, \
                will use the intersection of the scopes of the two RG. Defaults to None.

        Returns:
            bool: Whether self is compatible with other.
        """
        # _is_frozen is implicitly tested because is_smooth is set in freeze().
        scope = Scope(scope) if scope is not None else self.scope & other.scope

        # TODO: is this correct for more-than-2 partition?
        for partition1, partition2 in itertools.product(
            self.partition_nodes, other.partition_nodes
        ):
            if partition1.scope & scope != partition2.scope & scope:
                continue  # Only check partitions with the same scope.

            partition1_inputs = self.node_inputs(partition1)
            partition2_inputs = self.node_inputs(partition2)

            if any(partition1.scope <= input.scope for input in partition2_inputs) or any(
                partition2.scope <= input.scope for input in partition1_inputs
            ):
                continue  # Only check partitions not within another partition.

            adj_mat: np.ndarray[tuple[int, ...], np.dtype[np.bool_]] = np.zeros(
                (len(partition1_inputs), len(partition2_inputs)), dtype=np.bool_
            )
            for (i, region1), (j, region2) in itertools.product(
                enumerate(partition1_inputs), enumerate(partition2_inputs)
            ):
                # I.e., if scopes intersect over the scope to test.
                adj_mat[i, j] = bool(region1.scope & region2.scope & scope)
            adj_mat = adj_mat @ adj_mat.T
            # Now we have adjencency from inputs1 (of self) to inputs1. An edge means the two
            # regions must be partitioned together.

            # The number of zero eigen values for the laplacian matrix is the number of connected
            # components in a graph.
            # ANNOTATE: Numpy has typing issues.
            # CAST: Numpy has typing issues.
            deg_mat = np.diag(cast(NDArray[np.int64], adj_mat.sum(axis=1)))
            laplacian: NDArray[np.int64] = deg_mat - adj_mat
            eigen_values = np.linalg.eigvals(laplacian)
            num_connected: int = cast(NDArray[np.int64], np.isclose(eigen_values, 0).sum()).item()
            if num_connected == 1:  # All connected, meaning there's no valid partitioning.
                return False

        return True

    ####################################    (De)Serialization    ###################################
    # The RG can be dumped and loaded from json files, which can be useful when we want to save and
    # share it. The load() is another way to construct a RG other than the RG algorithms.

    def dump(self, filename: str) -> None:
        """Dump the region graph to the json file.

        The file will be opened with mode="w" and encoding="utf-8".

        Args:
            filename (str): The file name for dumping.
        """
        # NOTE: Below we don't assume the existence of RGNode.metadata["sort_key"], and try to
        #       preserve the ordering by file structure. However, the sort_key will be saved when
        #       available and with_meta enabled.

        # Store the region nodes information
        region_idx: dict[RegionNode, int] = {
            node: idx for idx, node in enumerate(self.region_nodes)
        }
        regions = {str(idx): RegionDict(scope=list(node.scope)) for node, idx in region_idx.items()}

        # Store the roots information
        roots = [str(region_idx[rgn]) for rgn in self.outputs]

        # Store the partition nodes information
        graph = []
        for partition in self.partition_nodes:
            input_idxs = [region_idx[cast(RegionNode, rgn)] for rgn in self.node_inputs(partition)]
            # partition.outputs is guaranteed to have len==1 by _validate().
            output_idx = region_idx[cast(RegionNode, self.node_outputs(partition)[0])]
            partd: PartitionDict = {"output": output_idx, "inputs": input_idxs}
            graph.append(partd)

        # Initialize the region graph json as a typed dict
        rg_json: RegionGraphJson = {"regions": regions, "roots": roots, "graph": graph}
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(rg_json, f, indent=4)

    @staticmethod
    def load(filename: str) -> "RegionGraph":
        """Load the region graph from the json file.

        The file will be opened with mode="r" and encoding="utf-8".

        Metadata is always loaded when available.

        Args:
            filename (str): The file name for loading.

        Returns:
            RegionGraph: The loaded region graph.
        """
        # NOTE: Below we don't assume the existence of RGNode.metadata["sort_key"], and try to
        #       recover the ordering from file structure. However, the sort_key will be used when
        #       available.

        with open(filename, encoding="utf-8") as f:
            # ANNOTATE: json.load gives Any.
            rg_json: RegionGraphJson = json.load(f)

        nodes: list[RegionGraphNode] = []
        in_nodes: dict[RegionGraphNode, list[RegionGraphNode]] = defaultdict(list)
        outputs: list[RegionGraphNode] = []
        region_idx: dict[int, RegionNode] = {}

        # Load the region nodes
        for idx, rgn_scope in rg_json["regions"].items():
            rgn = RegionNode(rgn_scope["scope"])
            nodes.append(rgn)
            region_idx[int(idx)] = rgn

        # Load the root region nodes
        for idx in rg_json["roots"]:
            outputs.append(region_idx[int(idx)])

        # Load the partition nodes
        for partitioning in rg_json["graph"]:
            in_rgns: list[RegionGraphNode] = [
                region_idx[int(idx)] for idx in partitioning["inputs"]
            ]
            out_rgn = region_idx[partitioning["output"]]
            ptn = PartitionNode(out_rgn.scope)
            nodes.append(ptn)
            in_nodes[out_rgn].append(ptn)
            in_nodes[ptn] = in_rgns

        return RegionGraph(nodes, in_nodes, outputs=outputs)

    # TODO: refactor the following method as to simplify its structure (e.g., remove inline methods)
    def build_circuit(
        self,
        *,
        input_factory: InputLayerFactory | Mapping[Scope, InputLayerFactory],
        sum_product: str | None = None,
        sum_weight_factory: ParameterFactory | None = None,
        nary_sum_weight_factory: ParameterFactory | None = None,
        sum_factory: SumLayerFactory | None = None,
        prod_factory: ProductLayerFactory | None = None,
        num_input_units: int = 1,
        num_sum_units: int = 1,
        num_classes: int = 1,
        factorize_multivariate: bool = True,
    ) -> Circuit:
        """Construct a symbolic circuit from a region graph.
            There are two ways to use this method. The first one is to specify a sum-product layer
            abstraction, which can be either 'cp' (the CP layer), 'cp-t' (the CP-transposed layer),
            or 'tucker' (the Tucker layer). The second one is to manually specify the factories to
            build distinct um and product layers. If the first way is chosen, then one can possibly
            use a factory that builds the symbolic parameters of the sum-product layer abstractions.
            The factory that constructs the input factory must always be specified.

        Args:
            input_factory: A factory that builds an input layer. If a Mapping between scopes and
                input layer factories is given, then a different input layer is built for each feature.
                In this case, an input layer should be specified for each variable/scope.
            sum_product: The sum-product layer to use. It can be None, 'cp', 'cp-t', or 'tucker'.
            sum_weight_factory: The factory to construct the weights of the sum layers.
                It can be None, or a parameter factory, i.e., a map
                from a shape to a symbolic parameter. If it is None, then the default
                weight factory of the sum layer is used instead.
            nary_sum_weight_factory: The factory to construct the weight of sum layers havity arity
                greater than one. If it is None, then it will have the same value and semantics of
                the given sum_weight_factory.
            sum_factory: A factory that builds a sum layer. It can be None.
            prod_factory: A factory that builds a product layer. It can be None.
            num_input_units: The number of input units.
            num_sum_units: The number of sum units per sum layer.
            num_classes: The number of output classes.
            factorize_multivariate: Whether to fully factorize input layers, when they depend on
                more than one variable.

        Returns:
            Circuit: A symbolic circuit.

        Raises:
            NotImplementedError: If an unknown 'sum_product' is given.
            ValueError: If both 'sum_product' and layer factories are specified, or none of them.
            ValueError: If 'sum_product' is specified, but 'weight_factory' is not.
            ValueError: The given region graph is malformed.
        """
        if (sum_factory is None and prod_factory is not None) or (
            sum_factory is not None and prod_factory is None
        ):
            raise ValueError(
                "Both 'sum_factory' and 'prod_factory' must be specified or none of them"
            )
        if sum_product is None and (sum_factory is None or prod_factory is None):
            raise ValueError(
                "If 'sum_product' is not given, then both 'sum_factory' and 'prod_factory'"
                " must be specified"
            )
        if sum_product is not None and (sum_factory is not None or prod_factory is not None):
            raise ValueError(
                "At most one between 'sum_product' and the pair 'sum_factory' and 'prod_factory'"
                " must be specified"
            )
        if nary_sum_weight_factory is None:
            nary_sum_weight_factory = sum_weight_factory

        layers: list[Layer] = []
        in_layers: dict[Layer, list[Layer]] = {}
        node_to_layer: dict[RegionGraphNode, Layer] = {}

        def build_cp_(
            rgn: RegionNode, rgn_partitioning: Sequence[RegionNode]
        ) -> HadamardLayer | SumLayer:
            layer_ins = [node_to_layer[rgn_in] for rgn_in in rgn_partitioning]
            denses: list[Layer] = [
                SumLayer(
                    node_to_layer[rgn_in].num_output_units,
                    num_sum_units,
                    weight_factory=sum_weight_factory,
                )
                for rgn_in in rgn_partitioning
            ]
            hadamard = HadamardLayer(num_sum_units, arity=len(rgn_partitioning))
            layers.extend(denses)
            layers.append(hadamard)
            in_layers[hadamard] = denses
            for d, li in zip(denses, layer_ins):
                in_layers[d] = [li]
            # If the region is not a root region of the region graph,
            # then make Hadamard the last layer
            if self.region_outputs(rgn):
                node_to_layer[rgn] = hadamard
                return hadamard
            # Otherwise, introduce an additional sum layer to ensure the output layer is a sum
            output_dense = SumLayer(
                hadamard.num_output_units, num_classes, weight_factory=sum_weight_factory
            )
            layers.append(output_dense)
            in_layers[output_dense] = [hadamard]
            node_to_layer[rgn] = output_dense
            return output_dense

        def build_cp_transposed_(
            rgn: RegionNode, rgn_partitioning: Sequence[RegionNode]
        ) -> SumLayer:
            layer_ins = [node_to_layer[rgn_in] for rgn_in in rgn_partitioning]
            num_in_units = list({li.num_output_units for li in layer_ins})
            if len(num_in_units) > 1:
                raise ValueError(
                    "Cannot build a CP transposed layer, as the inputs would have different units"
                )
            num_units = num_sum_units if self.region_outputs(rgn) else num_classes
            hadamard = HadamardLayer(num_in_units[0], arity=len(rgn_partitioning))
            dense = SumLayer(num_in_units[0], num_units, weight_factory=sum_weight_factory)
            layers.append(hadamard)
            layers.append(dense)
            in_layers[hadamard] = layer_ins
            in_layers[dense] = [hadamard]
            node_to_layer[rgn] = dense
            return dense

        def build_tucker_(rgn: RegionNode, rgn_partitioning: Sequence[RegionNode]) -> SumLayer:
            layer_ins = [node_to_layer[rgn_in] for rgn_in in rgn_partitioning]
            num_in_units = list({li.num_output_units for li in layer_ins})
            if len(num_in_units) > 1:
                raise ValueError(
                    "Cannot build a Tucker layer, as the inputs would have different units"
                )
            num_units = num_sum_units if self.region_outputs(rgn) else num_classes
            kronecker = KroneckerLayer(num_in_units[0], arity=len(rgn_partitioning))
            dense = SumLayer(
                kronecker.num_output_units,
                num_units,
                weight_factory=sum_weight_factory,
            )
            layers.append(kronecker)
            layers.append(dense)
            in_layers[kronecker] = layer_ins
            in_layers[dense] = [kronecker]
            node_to_layer[rgn] = dense
            return dense

        # Set the sum-product layer builder, if necessary
        sum_prod_builder_: Callable[[RegionNode, Sequence[RegionNode]], Layer] | None
        if sum_product is None:
            sum_prod_builder_ = None
        elif sum_product == "cp":
            sum_prod_builder_ = build_cp_
        elif sum_product == "cp-t":
            sum_prod_builder_ = build_cp_transposed_
        elif sum_product == "tucker":
            sum_prod_builder_ = build_tucker_
        else:
            raise NotImplementedError(f"Unknown sum-product layer abstraction called {sum_product}")

        # Loop through the region graph nodes, which are already sorted in a topological ordering
        for node in self.topological_ordering():
            if isinstance(node, PartitionNode):  # Partition node
                # If a sum-product layer abstraction has been specified,
                # then just skip partition nodes
                if sum_prod_builder_ is not None:
                    continue
                assert prod_factory is not None
                partition_inputs = self.partition_inputs(node)
                prod_inputs = [node_to_layer[rgn] for rgn in partition_inputs]
                prod_sl = prod_factory(num_sum_units, len(prod_inputs))
                layers.append(prod_sl)
                in_layers[prod_sl] = prod_inputs
                node_to_layer[node] = prod_sl
            assert isinstance(
                node, RegionNode
            ), "Region graph nodes must be either region or partition nodes"
            region_inputs = self.region_inputs(node)
            region_outputs = self.region_outputs(node)
            if not region_inputs:
                node_input_factory = (
                    input_factory[node.scope]
                    if isinstance(input_factory, Mapping)
                    else input_factory
                )
                # Input region node
                input_sl: Layer
                if factorize_multivariate and len(node.scope) > 1:
                    factorized_input_sls: list[Layer] = [
                        node_input_factory(Scope([sc]), num_input_units) for sc in node.scope
                    ]
                    input_sl = HadamardLayer(num_input_units, arity=len(factorized_input_sls))
                    layers.extend(factorized_input_sls)
                    in_layers[input_sl] = factorized_input_sls
                else:
                    input_sl = node_input_factory(node.scope, num_input_units)
                num_units = num_sum_units if self.region_outputs(node) else num_classes
                if sum_factory is None:
                    layers.append(input_sl)
                    node_to_layer[node] = input_sl
                    continue
                sum_sl = sum_factory(num_input_units, num_units)
                layers.append(input_sl)
                layers.append(sum_sl)
                in_layers[sum_sl] = [input_sl]
                node_to_layer[node] = sum_sl
            elif len(region_inputs) == 1:
                # Region node that is partitioned into one and only one way
                (ptn,) = region_inputs
                if sum_prod_builder_ is not None:
                    sum_prod_builder_(node, self.partition_inputs(ptn))
                    continue
                assert sum_factory is not None
                num_units = num_sum_units if region_outputs else num_classes
                sum_input = node_to_layer[ptn]
                sum_sl = sum_factory(sum_input.num_output_units, num_units)
                layers.append(sum_sl)
                in_layers[sum_sl] = [sum_input]
                node_to_layer[node] = sum_sl
            else:  # len(node_inputs) > 1:
                # Region node with multiple partitionings
                num_units = num_sum_units if region_outputs else num_classes
                mix_ins: list[Layer]
                if sum_prod_builder_ is not None:
                    mix_ins = [
                        sum_prod_builder_(node, self.partition_inputs(ptn)) for ptn in region_inputs
                    ]
                else:
                    assert sum_factory is not None
                    sum_ins = [node_to_layer[ptn] for ptn in region_inputs]
                    mix_ins = [sum_factory(sli.num_output_units, num_units) for sli in sum_ins]
                    layers.extend(mix_ins)
                    for mix_sl, sli in zip(mix_ins, sum_ins):
                        in_layers[mix_sl] = [sli]
                mix_sl = SumLayer(
                    num_units,
                    num_units,
                    arity=len(mix_ins),
                    weight_factory=nary_sum_weight_factory,
                )
                layers.append(mix_sl)
                in_layers[mix_sl] = mix_ins
                node_to_layer[node] = mix_sl

        outputs = [node_to_layer[rgn] for rgn in self.outputs]
        return Circuit(layers, in_layers, outputs)

inner_nodes property ¤

Inner (non-input) nodes in the graph.

inner_region_nodes property ¤

Inner region nodes in the graph.

inputs property ¤

is_omni_compatible cached property ¤

is_structured_decomposable cached property ¤

num_variables cached property ¤

outputs property ¤

partition_nodes property ¤

Partition nodes in the graph, which are always inner nodes.

region_nodes property ¤

Region nodes in the graph.

scope cached property ¤

__init__(nodes, in_nodes, outputs) ¤

Source code in cirkit/templates/region_graph/graph.py
81
82
83
84
85
86
87
88
def __init__(
    self,
    nodes: Sequence[RegionGraphNode],
    in_nodes: Mapping[RegionGraphNode, Sequence[RegionGraphNode]],
    outputs: Sequence[RegionGraphNode],
) -> None:
    super().__init__(nodes, in_nodes, outputs)
    self._check_structure()

build_circuit(*, input_factory, sum_product=None, sum_weight_factory=None, nary_sum_weight_factory=None, sum_factory=None, prod_factory=None, num_input_units=1, num_sum_units=1, num_classes=1, factorize_multivariate=True) ¤

Construct a symbolic circuit from a region graph. There are two ways to use this method. The first one is to specify a sum-product layer abstraction, which can be either 'cp' (the CP layer), 'cp-t' (the CP-transposed layer), or 'tucker' (the Tucker layer). The second one is to manually specify the factories to build distinct um and product layers. If the first way is chosen, then one can possibly use a factory that builds the symbolic parameters of the sum-product layer abstractions. The factory that constructs the input factory must always be specified.

Parameters:

Name Type Description Default
input_factory InputLayerFactory | Mapping[Scope, InputLayerFactory]

A factory that builds an input layer. If a Mapping between scopes and input layer factories is given, then a different input layer is built for each feature. In this case, an input layer should be specified for each variable/scope.

required
sum_product str | None

The sum-product layer to use. It can be None, 'cp', 'cp-t', or 'tucker'.

None
sum_weight_factory ParameterFactory | None

The factory to construct the weights of the sum layers. It can be None, or a parameter factory, i.e., a map from a shape to a symbolic parameter. If it is None, then the default weight factory of the sum layer is used instead.

None
nary_sum_weight_factory ParameterFactory | None

The factory to construct the weight of sum layers havity arity greater than one. If it is None, then it will have the same value and semantics of the given sum_weight_factory.

None
sum_factory SumLayerFactory | None

A factory that builds a sum layer. It can be None.

None
prod_factory ProductLayerFactory | None

A factory that builds a product layer. It can be None.

None
num_input_units int

The number of input units.

1
num_sum_units int

The number of sum units per sum layer.

1
num_classes int

The number of output classes.

1
factorize_multivariate bool

Whether to fully factorize input layers, when they depend on more than one variable.

True

Returns:

Name Type Description
Circuit Circuit

A symbolic circuit.

Raises:

Type Description
NotImplementedError

If an unknown 'sum_product' is given.

ValueError

If both 'sum_product' and layer factories are specified, or none of them.

ValueError

If 'sum_product' is specified, but 'weight_factory' is not.

ValueError

The given region graph is malformed.

Source code in cirkit/templates/region_graph/graph.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def build_circuit(
    self,
    *,
    input_factory: InputLayerFactory | Mapping[Scope, InputLayerFactory],
    sum_product: str | None = None,
    sum_weight_factory: ParameterFactory | None = None,
    nary_sum_weight_factory: ParameterFactory | None = None,
    sum_factory: SumLayerFactory | None = None,
    prod_factory: ProductLayerFactory | None = None,
    num_input_units: int = 1,
    num_sum_units: int = 1,
    num_classes: int = 1,
    factorize_multivariate: bool = True,
) -> Circuit:
    """Construct a symbolic circuit from a region graph.
        There are two ways to use this method. The first one is to specify a sum-product layer
        abstraction, which can be either 'cp' (the CP layer), 'cp-t' (the CP-transposed layer),
        or 'tucker' (the Tucker layer). The second one is to manually specify the factories to
        build distinct um and product layers. If the first way is chosen, then one can possibly
        use a factory that builds the symbolic parameters of the sum-product layer abstractions.
        The factory that constructs the input factory must always be specified.

    Args:
        input_factory: A factory that builds an input layer. If a Mapping between scopes and
            input layer factories is given, then a different input layer is built for each feature.
            In this case, an input layer should be specified for each variable/scope.
        sum_product: The sum-product layer to use. It can be None, 'cp', 'cp-t', or 'tucker'.
        sum_weight_factory: The factory to construct the weights of the sum layers.
            It can be None, or a parameter factory, i.e., a map
            from a shape to a symbolic parameter. If it is None, then the default
            weight factory of the sum layer is used instead.
        nary_sum_weight_factory: The factory to construct the weight of sum layers havity arity
            greater than one. If it is None, then it will have the same value and semantics of
            the given sum_weight_factory.
        sum_factory: A factory that builds a sum layer. It can be None.
        prod_factory: A factory that builds a product layer. It can be None.
        num_input_units: The number of input units.
        num_sum_units: The number of sum units per sum layer.
        num_classes: The number of output classes.
        factorize_multivariate: Whether to fully factorize input layers, when they depend on
            more than one variable.

    Returns:
        Circuit: A symbolic circuit.

    Raises:
        NotImplementedError: If an unknown 'sum_product' is given.
        ValueError: If both 'sum_product' and layer factories are specified, or none of them.
        ValueError: If 'sum_product' is specified, but 'weight_factory' is not.
        ValueError: The given region graph is malformed.
    """
    if (sum_factory is None and prod_factory is not None) or (
        sum_factory is not None and prod_factory is None
    ):
        raise ValueError(
            "Both 'sum_factory' and 'prod_factory' must be specified or none of them"
        )
    if sum_product is None and (sum_factory is None or prod_factory is None):
        raise ValueError(
            "If 'sum_product' is not given, then both 'sum_factory' and 'prod_factory'"
            " must be specified"
        )
    if sum_product is not None and (sum_factory is not None or prod_factory is not None):
        raise ValueError(
            "At most one between 'sum_product' and the pair 'sum_factory' and 'prod_factory'"
            " must be specified"
        )
    if nary_sum_weight_factory is None:
        nary_sum_weight_factory = sum_weight_factory

    layers: list[Layer] = []
    in_layers: dict[Layer, list[Layer]] = {}
    node_to_layer: dict[RegionGraphNode, Layer] = {}

    def build_cp_(
        rgn: RegionNode, rgn_partitioning: Sequence[RegionNode]
    ) -> HadamardLayer | SumLayer:
        layer_ins = [node_to_layer[rgn_in] for rgn_in in rgn_partitioning]
        denses: list[Layer] = [
            SumLayer(
                node_to_layer[rgn_in].num_output_units,
                num_sum_units,
                weight_factory=sum_weight_factory,
            )
            for rgn_in in rgn_partitioning
        ]
        hadamard = HadamardLayer(num_sum_units, arity=len(rgn_partitioning))
        layers.extend(denses)
        layers.append(hadamard)
        in_layers[hadamard] = denses
        for d, li in zip(denses, layer_ins):
            in_layers[d] = [li]
        # If the region is not a root region of the region graph,
        # then make Hadamard the last layer
        if self.region_outputs(rgn):
            node_to_layer[rgn] = hadamard
            return hadamard
        # Otherwise, introduce an additional sum layer to ensure the output layer is a sum
        output_dense = SumLayer(
            hadamard.num_output_units, num_classes, weight_factory=sum_weight_factory
        )
        layers.append(output_dense)
        in_layers[output_dense] = [hadamard]
        node_to_layer[rgn] = output_dense
        return output_dense

    def build_cp_transposed_(
        rgn: RegionNode, rgn_partitioning: Sequence[RegionNode]
    ) -> SumLayer:
        layer_ins = [node_to_layer[rgn_in] for rgn_in in rgn_partitioning]
        num_in_units = list({li.num_output_units for li in layer_ins})
        if len(num_in_units) > 1:
            raise ValueError(
                "Cannot build a CP transposed layer, as the inputs would have different units"
            )
        num_units = num_sum_units if self.region_outputs(rgn) else num_classes
        hadamard = HadamardLayer(num_in_units[0], arity=len(rgn_partitioning))
        dense = SumLayer(num_in_units[0], num_units, weight_factory=sum_weight_factory)
        layers.append(hadamard)
        layers.append(dense)
        in_layers[hadamard] = layer_ins
        in_layers[dense] = [hadamard]
        node_to_layer[rgn] = dense
        return dense

    def build_tucker_(rgn: RegionNode, rgn_partitioning: Sequence[RegionNode]) -> SumLayer:
        layer_ins = [node_to_layer[rgn_in] for rgn_in in rgn_partitioning]
        num_in_units = list({li.num_output_units for li in layer_ins})
        if len(num_in_units) > 1:
            raise ValueError(
                "Cannot build a Tucker layer, as the inputs would have different units"
            )
        num_units = num_sum_units if self.region_outputs(rgn) else num_classes
        kronecker = KroneckerLayer(num_in_units[0], arity=len(rgn_partitioning))
        dense = SumLayer(
            kronecker.num_output_units,
            num_units,
            weight_factory=sum_weight_factory,
        )
        layers.append(kronecker)
        layers.append(dense)
        in_layers[kronecker] = layer_ins
        in_layers[dense] = [kronecker]
        node_to_layer[rgn] = dense
        return dense

    # Set the sum-product layer builder, if necessary
    sum_prod_builder_: Callable[[RegionNode, Sequence[RegionNode]], Layer] | None
    if sum_product is None:
        sum_prod_builder_ = None
    elif sum_product == "cp":
        sum_prod_builder_ = build_cp_
    elif sum_product == "cp-t":
        sum_prod_builder_ = build_cp_transposed_
    elif sum_product == "tucker":
        sum_prod_builder_ = build_tucker_
    else:
        raise NotImplementedError(f"Unknown sum-product layer abstraction called {sum_product}")

    # Loop through the region graph nodes, which are already sorted in a topological ordering
    for node in self.topological_ordering():
        if isinstance(node, PartitionNode):  # Partition node
            # If a sum-product layer abstraction has been specified,
            # then just skip partition nodes
            if sum_prod_builder_ is not None:
                continue
            assert prod_factory is not None
            partition_inputs = self.partition_inputs(node)
            prod_inputs = [node_to_layer[rgn] for rgn in partition_inputs]
            prod_sl = prod_factory(num_sum_units, len(prod_inputs))
            layers.append(prod_sl)
            in_layers[prod_sl] = prod_inputs
            node_to_layer[node] = prod_sl
        assert isinstance(
            node, RegionNode
        ), "Region graph nodes must be either region or partition nodes"
        region_inputs = self.region_inputs(node)
        region_outputs = self.region_outputs(node)
        if not region_inputs:
            node_input_factory = (
                input_factory[node.scope]
                if isinstance(input_factory, Mapping)
                else input_factory
            )
            # Input region node
            input_sl: Layer
            if factorize_multivariate and len(node.scope) > 1:
                factorized_input_sls: list[Layer] = [
                    node_input_factory(Scope([sc]), num_input_units) for sc in node.scope
                ]
                input_sl = HadamardLayer(num_input_units, arity=len(factorized_input_sls))
                layers.extend(factorized_input_sls)
                in_layers[input_sl] = factorized_input_sls
            else:
                input_sl = node_input_factory(node.scope, num_input_units)
            num_units = num_sum_units if self.region_outputs(node) else num_classes
            if sum_factory is None:
                layers.append(input_sl)
                node_to_layer[node] = input_sl
                continue
            sum_sl = sum_factory(num_input_units, num_units)
            layers.append(input_sl)
            layers.append(sum_sl)
            in_layers[sum_sl] = [input_sl]
            node_to_layer[node] = sum_sl
        elif len(region_inputs) == 1:
            # Region node that is partitioned into one and only one way
            (ptn,) = region_inputs
            if sum_prod_builder_ is not None:
                sum_prod_builder_(node, self.partition_inputs(ptn))
                continue
            assert sum_factory is not None
            num_units = num_sum_units if region_outputs else num_classes
            sum_input = node_to_layer[ptn]
            sum_sl = sum_factory(sum_input.num_output_units, num_units)
            layers.append(sum_sl)
            in_layers[sum_sl] = [sum_input]
            node_to_layer[node] = sum_sl
        else:  # len(node_inputs) > 1:
            # Region node with multiple partitionings
            num_units = num_sum_units if region_outputs else num_classes
            mix_ins: list[Layer]
            if sum_prod_builder_ is not None:
                mix_ins = [
                    sum_prod_builder_(node, self.partition_inputs(ptn)) for ptn in region_inputs
                ]
            else:
                assert sum_factory is not None
                sum_ins = [node_to_layer[ptn] for ptn in region_inputs]
                mix_ins = [sum_factory(sli.num_output_units, num_units) for sli in sum_ins]
                layers.extend(mix_ins)
                for mix_sl, sli in zip(mix_ins, sum_ins):
                    in_layers[mix_sl] = [sli]
            mix_sl = SumLayer(
                num_units,
                num_units,
                arity=len(mix_ins),
                weight_factory=nary_sum_weight_factory,
            )
            layers.append(mix_sl)
            in_layers[mix_sl] = mix_ins
            node_to_layer[node] = mix_sl

    outputs = [node_to_layer[rgn] for rgn in self.outputs]
    return Circuit(layers, in_layers, outputs)

dump(filename) ¤

Dump the region graph to the json file.

The file will be opened with mode="w" and encoding="utf-8".

Parameters:

Name Type Description Default
filename str

The file name for dumping.

required
Source code in cirkit/templates/region_graph/graph.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def dump(self, filename: str) -> None:
    """Dump the region graph to the json file.

    The file will be opened with mode="w" and encoding="utf-8".

    Args:
        filename (str): The file name for dumping.
    """
    # NOTE: Below we don't assume the existence of RGNode.metadata["sort_key"], and try to
    #       preserve the ordering by file structure. However, the sort_key will be saved when
    #       available and with_meta enabled.

    # Store the region nodes information
    region_idx: dict[RegionNode, int] = {
        node: idx for idx, node in enumerate(self.region_nodes)
    }
    regions = {str(idx): RegionDict(scope=list(node.scope)) for node, idx in region_idx.items()}

    # Store the roots information
    roots = [str(region_idx[rgn]) for rgn in self.outputs]

    # Store the partition nodes information
    graph = []
    for partition in self.partition_nodes:
        input_idxs = [region_idx[cast(RegionNode, rgn)] for rgn in self.node_inputs(partition)]
        # partition.outputs is guaranteed to have len==1 by _validate().
        output_idx = region_idx[cast(RegionNode, self.node_outputs(partition)[0])]
        partd: PartitionDict = {"output": output_idx, "inputs": input_idxs}
        graph.append(partd)

    # Initialize the region graph json as a typed dict
    rg_json: RegionGraphJson = {"regions": regions, "roots": roots, "graph": graph}
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(rg_json, f, indent=4)

is_compatible(other, /, *, scope=None) ¤

Test compatibility with another region graph over the given scope.

Parameters:

Name Type Description Default
other RegionGraph

The other region graph to compare with.

required
scope Optional[Iterable[int]]

The scope over which to check. If None, will use the intersection of the scopes of the two RG. Defaults to None.

None

Returns:

Name Type Description
bool bool

Whether self is compatible with other.

Source code in cirkit/templates/region_graph/graph.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def is_compatible(self, other: "RegionGraph", /, *, scope: Iterable[int] | None = None) -> bool:
    """Test compatibility with another region graph over the given scope.

    Args:
        other (RegionGraph): The other region graph to compare with.
        scope (Optional[Iterable[int]], optional): The scope over which to check. If None, \
            will use the intersection of the scopes of the two RG. Defaults to None.

    Returns:
        bool: Whether self is compatible with other.
    """
    # _is_frozen is implicitly tested because is_smooth is set in freeze().
    scope = Scope(scope) if scope is not None else self.scope & other.scope

    # TODO: is this correct for more-than-2 partition?
    for partition1, partition2 in itertools.product(
        self.partition_nodes, other.partition_nodes
    ):
        if partition1.scope & scope != partition2.scope & scope:
            continue  # Only check partitions with the same scope.

        partition1_inputs = self.node_inputs(partition1)
        partition2_inputs = self.node_inputs(partition2)

        if any(partition1.scope <= input.scope for input in partition2_inputs) or any(
            partition2.scope <= input.scope for input in partition1_inputs
        ):
            continue  # Only check partitions not within another partition.

        adj_mat: np.ndarray[tuple[int, ...], np.dtype[np.bool_]] = np.zeros(
            (len(partition1_inputs), len(partition2_inputs)), dtype=np.bool_
        )
        for (i, region1), (j, region2) in itertools.product(
            enumerate(partition1_inputs), enumerate(partition2_inputs)
        ):
            # I.e., if scopes intersect over the scope to test.
            adj_mat[i, j] = bool(region1.scope & region2.scope & scope)
        adj_mat = adj_mat @ adj_mat.T
        # Now we have adjencency from inputs1 (of self) to inputs1. An edge means the two
        # regions must be partitioned together.

        # The number of zero eigen values for the laplacian matrix is the number of connected
        # components in a graph.
        # ANNOTATE: Numpy has typing issues.
        # CAST: Numpy has typing issues.
        deg_mat = np.diag(cast(NDArray[np.int64], adj_mat.sum(axis=1)))
        laplacian: NDArray[np.int64] = deg_mat - adj_mat
        eigen_values = np.linalg.eigvals(laplacian)
        num_connected: int = cast(NDArray[np.int64], np.isclose(eigen_values, 0).sum()).item()
        if num_connected == 1:  # All connected, meaning there's no valid partitioning.
            return False

    return True

load(filename) staticmethod ¤

Load the region graph from the json file.

The file will be opened with mode="r" and encoding="utf-8".

Metadata is always loaded when available.

Parameters:

Name Type Description Default
filename str

The file name for loading.

required

Returns:

Name Type Description
RegionGraph RegionGraph

The loaded region graph.

Source code in cirkit/templates/region_graph/graph.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
@staticmethod
def load(filename: str) -> "RegionGraph":
    """Load the region graph from the json file.

    The file will be opened with mode="r" and encoding="utf-8".

    Metadata is always loaded when available.

    Args:
        filename (str): The file name for loading.

    Returns:
        RegionGraph: The loaded region graph.
    """
    # NOTE: Below we don't assume the existence of RGNode.metadata["sort_key"], and try to
    #       recover the ordering from file structure. However, the sort_key will be used when
    #       available.

    with open(filename, encoding="utf-8") as f:
        # ANNOTATE: json.load gives Any.
        rg_json: RegionGraphJson = json.load(f)

    nodes: list[RegionGraphNode] = []
    in_nodes: dict[RegionGraphNode, list[RegionGraphNode]] = defaultdict(list)
    outputs: list[RegionGraphNode] = []
    region_idx: dict[int, RegionNode] = {}

    # Load the region nodes
    for idx, rgn_scope in rg_json["regions"].items():
        rgn = RegionNode(rgn_scope["scope"])
        nodes.append(rgn)
        region_idx[int(idx)] = rgn

    # Load the root region nodes
    for idx in rg_json["roots"]:
        outputs.append(region_idx[int(idx)])

    # Load the partition nodes
    for partitioning in rg_json["graph"]:
        in_rgns: list[RegionGraphNode] = [
            region_idx[int(idx)] for idx in partitioning["inputs"]
        ]
        out_rgn = region_idx[partitioning["output"]]
        ptn = PartitionNode(out_rgn.scope)
        nodes.append(ptn)
        in_nodes[out_rgn].append(ptn)
        in_nodes[ptn] = in_rgns

    return RegionGraph(nodes, in_nodes, outputs=outputs)

partition_inputs(ptn) ¤

Source code in cirkit/templates/region_graph/graph.py
133
134
def partition_inputs(self, ptn: PartitionNode) -> Sequence[RegionNode]:
    return [cast(RegionNode, node) for node in self.node_inputs(ptn)]

partition_outputs(ptn) ¤

Source code in cirkit/templates/region_graph/graph.py
139
140
def partition_outputs(self, ptn: PartitionNode) -> Sequence[RegionNode]:
    return [cast(RegionNode, node) for node in self.node_outputs(ptn)]

region_inputs(rgn) ¤

Source code in cirkit/templates/region_graph/graph.py
130
131
def region_inputs(self, rgn: RegionNode) -> Sequence[PartitionNode]:
    return [cast(PartitionNode, node) for node in self.node_inputs(rgn)]

region_outputs(rgn) ¤

Source code in cirkit/templates/region_graph/graph.py
136
137
def region_outputs(self, rgn: RegionNode) -> Sequence[PartitionNode]:
    return [cast(PartitionNode, node) for node in self.node_outputs(rgn)]

RegionGraphJson ¤

Bases: TypedDict

The structure of the region graph json file.

Source code in cirkit/templates/region_graph/graph.py
35
36
37
38
39
40
41
42
43
class RegionGraphJson(TypedDict):
    """The structure of the region graph json file."""

    # The regions of RG represented by a mapping from id in str to either a dict or only the scope.
    regions: dict[str, RegionDict]
    # The list of region node roots str ids in the RG
    roots: list[str]
    # The graph of RG represented by a list of partitions.
    graph: list[PartitionDict]

graph instance-attribute ¤

regions instance-attribute ¤

roots instance-attribute ¤

RegionGraphNode ¤

Bases: ABC

The abstract base class for nodes in region graphs.

Source code in cirkit/templates/region_graph/graph.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class RegionGraphNode(ABC):
    """The abstract base class for nodes in region graphs."""

    def __init__(self, scope: Iterable[int] | Scope) -> None:
        """Init class.

        Args:
            scope (Scope): The scope of this node.
        """
        if not scope:
            raise ValueError("The scope of a region graph node must not be empty.")
        super().__init__()
        self.scope = Scope(scope)

    def __repr__(self) -> str:
        """Generate the repr string of the node.

        Returns:
            str: The str representation of the node.
        """
        return f"{type(self).__name__}@0x{id(self):x}({self.scope})"

scope = Scope(scope) instance-attribute ¤

__init__(scope) ¤

Init class.

Parameters:

Name Type Description Default
scope Scope

The scope of this node.

required
Source code in cirkit/templates/region_graph/graph.py
49
50
51
52
53
54
55
56
57
58
def __init__(self, scope: Iterable[int] | Scope) -> None:
    """Init class.

    Args:
        scope (Scope): The scope of this node.
    """
    if not scope:
        raise ValueError("The scope of a region graph node must not be empty.")
    super().__init__()
    self.scope = Scope(scope)

__repr__() ¤

Generate the repr string of the node.

Returns:

Name Type Description
str str

The str representation of the node.

Source code in cirkit/templates/region_graph/graph.py
60
61
62
63
64
65
66
def __repr__(self) -> str:
    """Generate the repr string of the node.

    Returns:
        str: The str representation of the node.
    """
    return f"{type(self).__name__}@0x{id(self):x}({self.scope})"

RegionNode ¤

Bases: RegionGraphNode

The region node in the region graph.

Source code in cirkit/templates/region_graph/graph.py
69
70
class RegionNode(RegionGraphNode):
    """The region node in the region graph."""