# PyTA Project: Augment CFG Edges with Z3 Constraints

Today's task requires a combination of various components of PythonTA introduced in previous articles, including control flow graph module, Z3 visitor, and Z3 expression wrapper. In this task, we will augment each control flow graph edge with a list a Z3 constraints that represents the logical constraints of function arguments for this edge to be reached in the program. New constraints are created from function preconditions specified in the function docstring, `if`

conditions, and `while`

conditions. For example, considering the CFG below:

In this graph, `x`

and `y`

are function parameters, and the logical constraint on the starting edge is the function precondition. When the graph reaches an `if`

statement or `while`

loop, edges after these statements are added with the `if/while`

condition (or its negation on the `False`

branch). In addition, if a function argument is being reassigned, we would discard every Z3 constraint involving this variable in the edges following the reassignment, as in this case the variable's value becomes indeterminant.

### The Z3Environment Class

For this task, we first need to come up with a way to keep track of logical constraints and reassignment status of variables along an execution path. The `Z3Environment`

class is used for this purpose. It has three attributes: `variable_unassigned`

tracks whether a parameter has not been reassigned. `variable_type`

stores the data type of each function parameter extracted of parameter type annotations. Currently, only `int`

, `float`

,`bool`

, `str`

types and `list/set/tuple`

literals can be processed by `ExprWrapper`

, the class the converts python types to corresponding Z3 types. `constraints`

represents the list of Z3 constraints along the current execution path.

```
class Z3Environment:
"""Z3 Environment stores the Z3 variables and constraints in the current CFG path
variable_unassigned:
A dictionary mapping each variable in the current environment to a boolean indicating
whether it has been reassigned (False) or remains unassigned (True).
variable_type:
A dictionary mapping each variable in the current environment to its data type.
constraints:
A list of Z3 constraints in the current environment.
"""
variable_unassigned: Dict[str, bool]
variable_type: Dict[str, str]
constraints: List[ExprRef]
def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
"""Initialize the environment with function parameters and preconditions"""
self.variable_unassigned = {var: True for var in variables.keys()}
self.variable_type = variables
self.constraints = constraints.copy()
```

For adding new Z3 constraints, we need the following two methods: `add_constraint`

appends a Z3 constraint to the list of constraints, and `parse_constraint`

converts an Astroid AST to an equivalent Z3 expression. Later, during the graph traverse, we will need to first call `parse_constraint`

to convert the python expression (represented as Astroid AST) to Z3 constraint, and then add it to environment with `add_constraint`

```
def add_constraint(self, constraint: ExprRef) -> None:
"""
Add a new z3 constraint to environment
"""
self.constraints.append(constraint)
def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
"""
Parse an Astroid node to a Z3 constraint
Return the resulting expression
"""
ew = ExprWrapper(node, self.variable_type)
try:
return ew.reduce()
except (Z3Exception, Z3ParseException):
return None
```

To handle variable reassignment, the `assign`

method is called when reaching a reassignment statement, which marks the status of variable as `False`

(not unassigned, which is, being assigned).

```
def assign(self, name: str) -> None:
"""Handle a variable assignment statement"""
if name in self.variable_unassigned:
self.variable_unassigned[name] = False
```

Finally, putting everything together, the `update_constraints`

method is where we generate the constraints for an edge and remove constraints with reassigned variables. It needs a helper function `_get_vars`

, which returns the variables included in a given Z3 expression.

```
def update_constraints(self) -> List[ExprRef]:
"""
Returns all z3 constraints in the environments
Removes constraints with reassigned variables
"""
updated_constraints = []
for constraint in self.constraints:
# discard expressions with reassigned variables
variables = _get_vars(constraint)
reassigned = any(
not self.variable_unassigned.get(variable, False) for variable in variables
)
if not reassigned:
updated_constraints.append(constraint)
self.constraints = updated_constraints
return updated_constraints.copy()
def _get_vars(expr: ExprRef) -> Set[str]:
"""
Retrieve all z3 variables from a z3 expression
"""
variables = set()
def traverse(e):
if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
variables.add(e.decl().name())
elif hasattr(e, "children"):
for child in e.children():
traverse(child)
traverse(expr)
return variables
```

### Depth-First Traverse Through CFG

After setting up the Z3 environment, there is another preparation we need to make: getting all the paths along the control flow graph. The standard way to do so is to implement a depth-first search through the graph. I have created an implementation of DFS in a previous article (https://raineyang.hashnode.dev/pyta-project-depth-first-search-for-control-flow-graph). However, that one traverses through the nodes in the graph, while today we need to get all the edges along each path, which I implement in the code below:

```
def _get_paths(self) -> List[List[CFGEdge]]:
"""Get edges that represent paths from start to end node in depth-first order."""
paths = []
def _visited(
edge: CFGEdge, visited_edges: Set[CFGEdge], visited_nodes: Set[CFGBlock]
) -> bool:
return edge in visited_edges or edge.target in visited_nodes
def _dfs(
current_edge: CFGEdge,
current_path: List[CFGEdge],
visited_edges: Set[CFGEdge],
visited_nodes: Set[CFGBlock],
):
# note: both visited edges and visited nodes need to be tracked to correctly handle cycles
if _visited(current_edge, visited_edges, visited_nodes):
return
visited_edges.add(current_edge)
visited_nodes.add(current_edge.source)
current_path.append(current_edge)
if current_edge.target == self.end or all(
_visited(edge, visited_edges, visited_nodes)
for edge in current_edge.target.successors
):
paths.append(current_path.copy())
else:
for edge in current_edge.target.successors:
_dfs(edge, current_path, visited_edges, visited_nodes)
current_path.pop()
visited_edges.remove(current_edge)
_dfs(self.start.successors[0], [], set(), set())
return paths
```

The algorithm is basically the same as the version with nodes. One thing worth noting is that we need to keep track of both visited nodes and edges, and visit an edge only if the edge itself and its target node are not visited, to handle cycles correctly. Considering the case below: in this `while`

loop structure, the desired behavior is to form two execution paths, one into the loop and one out of the loop.

However, if we only keep track of unvisited edges, the first traverse will go through both edges in the cycle, leading to the one path below. Thus, we also need to stop at visited nodes to handle cycles.

### Add Z3 Constraints to CFG Edges

Now it's time to put everything together in the `update_edge_z3_constraint`

method in `ControlFlowGraph`

class:

```
def update_edge_z3_constraints(self) -> None:
"""Traverse through edges and add Z3 constraints on each edge.
Constraints are generated from:
- Function preconditions
- If conditions
- While conditions
Constraints with reassigned variables are not included in subsequent edges."""
```

First, we iterate through the paths generated in `_get_paths`

method and create a new `Z3Environment`

along each path. An edge with a non-None `condition`

indicates a `if`

or `while`

condition, which we add it (or its negation, if the edge is labeled `False`

) to the current `Z3Environment`

. Finally, we invoke `update_constraints`

to every edge in the path.

```
for path_id, path in enumerate(self._get_paths()):
# starting a new path
z3_environment = Z3Environment(self._z3_vars, self.precondition_constraints)
for edge in path:
# traverse through edge
if edge.condition is not None:
condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
if condition_z3_constraint is not None:
if edge.label == "True":
z3_environment.add_constraint(condition_z3_constraint)
elif edge.label == "False":
z3_environment.add_constraint(Not(condition_z3_constraint))
edge.z3_constraints[path_id] = z3_environment.update_constraints()
```

One thing to note about is that we are storing Z3 constraints on each edge in a `Dict[int, List]`

, where the key is `path_id`

and value is the list of constraints. This is because one edge can be part of multiple paths, each with its own sets of constraints. The `path_id`

will be used to distinguish which list of constraints belongs to which path.

```
# traverse into target node
for node in edge.target.statements:
if isinstance(node, Assign):
# mark reassigned variables
for target in node.targets:
if isinstance(target, AssignName):
z3_environment.assign(target.name)
```

After that, we need to iterate through statements in the node after the edge to determine if there are any arguments being reassigned.

At this point, the main part of the task is completed, but there are still a few minor points worth noting. Firstly, `z3`

is an optional dependency in PythonTA, so we should also make sure that the functionality of control flow graph itself it not affected even if z3-related features are not enabled. We need to put the import statements related to z3 in a `try/except`

block, which marks the variable `z3_dependency_available`

as `False`

if an `ImportError`

occurs.

```
try:
from z3 import Z3_OP_UNINTERPRETED, ExprRef, Not, Z3Exception, is_const
from ..transforms import ExprWrapper
z3_dependency_available = True
except ImportError:
ExprRef = Any
ExprWrapper = Any
Not = Any
Z3Exception = Any
is_const = Any
Z3_OP_UNINTERPRETED = Any
z3_dependency_available = False
```

If z3 is not successfully imported, we would disable the feature `update_edge_z3_constraints`

```
def update_edge_z3_constraints(self) -> None:
"""Traverse through edges and add Z3 constraints on each edge.
Constraints are generated from:
- Function preconditions
- If conditions
- While conditions
Constraints with reassigned variables are not included in subsequent edges."""
if not z3_dependency_available:
return
```

Secondly, we need to invoke `Z3Visitor`

at some point to set up initial z3 constraints from function precondition docstrings. Initially I try to directly integrate it into `CFGVisitor`

module. However, this turns out to be less optimal as it introduces coupling between two modules that are, in principle, unrelated. Instead, inside `visit_functiondef`

method in `CFGVisitor`

we would conditionally call `update_edge_z3_constraints`

only if the `FunctionDef`

node has the `z3_constraints`

attribute added by `Z3Visitor`

```
if hasattr(func, "z3_constraints"):
self._current_cfg.precondition_constraints = func.z3_constraints
self._current_cfg.update_edge_z3_constraints()
```