Source code for pyrtl.core

""" The core abstraction for hardware in PyRTL.

Included in this file you will find:

* `LogicNet` -- the core class holding a "net" in the netlist
* `Block` -- a collection of nets with associated access and error checking
* `working_block` -- the "current" Block to which, by default, all created nets are added
* `modes` -- access methods for "modes" such as debug

"""
import collections
import re
import keyword

from .pyrtlexceptions import PyrtlError, PyrtlInternalError


# -----------------------------------------------------------------
#    __        __   __
#   |__) |    /  \ /  ` |__/
#   |__) |___ \__/ \__, |  \
#

[docs] class LogicNet(collections.namedtuple('LogicNet', ['op', 'op_param', 'args', 'dests'])): """ The basic immutable datatype for storing a "net" in a netlist. This is used for the internal representation that Python stores knowledge of what this is, and how it is used is only required for advanced users of PyRTL. A 'net' is a structure in Python that is representative of hardware logic operations. These include binary operations, such as `and` `or` and `not`, arithmetic operations such as `+` and `-`, as well as other operations such as Memory ops, and concat, split, wire, and reg logic. The details of what is allowed in each of these fields is defined in the comments of :class:`.Block`, and is checked by :func:`.Block.sanity_check` Logical Operations: ===== ======== ======== ====== ==== op op_param args dests ===== ======== ======== ====== ==== ``&`` `None` `a1, a2` `out` AND two wires together, put result into `out` ``|`` `None` `a1, a2` `out` OR two wires together, put result into `out` ``^`` `None` `a1, a2` `out` XOR two wires together, put result into `out` ``n`` `None` `a1, a2` `out` NAND two wires together, put result into `out` ``~`` `None` `a1` `out` invert one wire, put result into `out` ``+`` `None` `a1, a2` `out` add `a1` and `a2`, put result into `out` ``len(out) == max(len(a1), len(a2)) + 1`` works with both unsigned and two's complement ``-`` `None` `a1, a2` `out` subtract `a2` from `a1`, put result into `out` ``len(out) == max(len(a1), len(a2)) + 1`` works with both unsigned and two's complement ``*`` `None` `a1, a2` `out` multiply `a1` & `a2`, put result into `out` ``len(out) == len(a1) + len(a2)`` assumes unsigned, but :func:`.signed_mult` provides wrapper ``=`` `None` `a1, a2` `out` check `a1` & `a2` equal, put result into `out` (0 | 1) ``<`` `None` `a1, a2` `out` check `a2` greater than `a1`, put result into `out` (0 | 1) ``>`` `None` `a1, a2` `out` check `a1` greater than `a2`, put result into `out` (0 | 1) ``w`` `None` `w1` `w2` connects `w1` to `w2` directional wire with no logical operation ``x`` `None` `x`, `out` multiplexer: when `x` == 0 connect `a1` to `out` when `x` == 1 connect `a2` to `out` `x` must be one bit and ``len(a1) == len(a2)`` `a1, a2` ``c`` `None` `\\*args` `out` concatenates `\\*args` (wires) into single WireVector puts first arg at MSB, last arg at LSB ``s`` `sel` `wire` `out` selects bits from wire based on `sel` (slicing syntax) puts selected bits into `out` ``r`` `None` `next` `r1` on positive clock edge: copies `next` to `r1` ``m`` `memid`, `addr` `data` read address addr of mem (w/ id `memid`), put it into `data` `mem` ``@`` `memid`, `addr` write data to mem (w/ id `memid`) at address `addr` request write enable (`wr_en`) `mem` `data`, `wr_en` ===== ======== ======== ====== ==== """ def __str__(self): rhs = ', '.join(str(x) for x in self.args) lhs = ', '.join(str(x) for x in self.dests) options = '' if self.op_param is None else '(' + str(self.op_param) + ')' from .helperfuncs import _currently_in_jupyter_notebook if _currently_in_jupyter_notebook(): # Output the working block as a Latex table # Escape all Underscores rhs = rhs.replace('_', "\\_") lhs = lhs.replace('_', "\\_") options = options.replace('_', "\\_") if self.op in '&|': return "{} & \\leftarrow \\{} \\, - & {} {} \\\\".format( lhs, self.op, rhs, options) elif self.op in "wn+-*<>xcsr": return "{} & \\leftarrow {} \\, - & {} {} \\\\".format( lhs, self.op, rhs, options) elif self.op in "=": return "{} & \\leftarrow \\, {} \\, - & {} {} \\\\".format( lhs, self.op, rhs, options) elif self.op in "^": return "{} & \\leftarrow \\oplus \\, - & {} {} \\\\".format( lhs, rhs, options) elif self.op in "~": return "{} & \\leftarrow \\sim \\, - & {} {} \\\\".format( lhs, rhs, options) elif self.op in 'm@': memid, memblock = self.op_param extrainfo = 'memid=' + str(memid) extrainfo = extrainfo.replace("_", "\\_") name = memblock.name name = name.replace("_", "\\_") if self.op == 'm': return "{} & \\leftarrow m \\, - & {}[{}]({}) \\\\".format( lhs, name, rhs, extrainfo) else: addr, data, we = (str(x) for x in self.args) addr = addr.replace("_", "\\_") data = data.replace("_", "\\_") we = we.replace("_", "\\_") return "{}[{}] & \\leftarrow @ \\, - & {} we={} ({}) \\\\".format( name, addr, data, we, extrainfo) else: raise PyrtlInternalError('error, unknown op "%s"' % str(self.op)) else: # not in ipython if self.op in 'w~&|^n+-*<>=xcsr': options = ' ' + options if options else '' return "{} <-- {} -- {}{}".format(lhs, self.op, rhs, options) elif self.op in 'm@': memid, memblock = self.op_param extrainfo = 'memid=' + str(memid) if self.op == 'm': return "{} <-- m -- {}[{}]({})".format(lhs, memblock.name, rhs, extrainfo) else: addr, data, we = (str(x) for x in self.args) return "{}[{}] <-- @ -- {} we={} ({})".format( memblock.name, addr, data, we, extrainfo) else: raise PyrtlInternalError('error, unknown op "%s"' % str(self.op)) def __hash__(self): # it seems that namedtuple is not always hashable return hash(tuple(self)) def __eq__(self, other): # We can't be going and calling __eq__ recursively on the logic nets for all of # the args and dests because that will actually *create* new logic nets which is # very much not what people would expect to happen. Instead we define equality # as the immutable fields being equal and the list of args and dests being # references to the same objects. return (self.op == other.op and self.op_param == other.op_param and len(self.args) == len(other.args) and len(self.dests) == len(other.dests) and all(self.args[i] is other.args[i] for i in range(len(self.args))) and all(self.dests[i] is other.dests[i] for i in range(len(self.dests)))) def __ne__(self, other): return not self.__eq__(other) def _compare_error(self, other): """ Throw error when LogicNets are compared. Comparisons get you in a bad place between while you can compare op and op_param safely, the args and dests are references to mutable objects with comparison operators overloaded. """ raise PyrtlError('Greater than and less than comparisons between' ' LogicNets are not supported') __lt__ = _compare_error __gt__ = _compare_error __le__ = _compare_error __ge__ = _compare_error
[docs] class Block(object): """Block encapsulates a netlist. A Block in PyRTL is the class that stores a netlist and provides basic access and error checking members. Each block has well defined inputs and outputs, and contains both the basic logic elements and references to the wires and memories that connect them together. The logic structure is primarily contained in :attr:`.Block.logic` which holds a set of :class:`LogicNets <.LogicNet>`. Each :class:`~.LogicNet` describes a primitive operation (such as an adder or memory). The primitive is described by a 4-tuple of: 1) the op (a single character describing the operation such as ``+`` or ``r``), 2) a set of hard-wired `op_params` (such as the constants to select from the "selection" op). 3) the tuple `args` which list the WireVectors hooked up as inputs to this particular net. 4) the tuple `dests` which list the WireVectors hooked up as output for this particular net. Below is a list of the basic operations. These properties (more formally specified) should all be checked by the class method :func:`.Block.sanity_check`. * Most logical and arithmetic ops are pretty self explanatory. Each takes exactly two arguments, and they should perform the arithmetic or logical operation specified. OPS: ``&``, ``|``, ``^``, ``n``, ``~``, ``+``, ``-``, ``*``. All inputs must be the same bitwidth. Logical operations produce as many bits as are in the input, while ``+`` and ``-`` produce n+1 bits, and ``*`` produces 2n bits. * In addition there are some operations for performing comparisons that should perform the operation specified. The ``=`` op is checking to see if the bits of the vectors are equal, while ``<`` and ``>`` do unsigned arithmetic comparison. All comparisons generate a single bit of output (1 for true, 0 for false). * The ``w`` operator is simply a directional wire and has no logic function. * The ``x`` operator is a multiplexer which takes a select bit and two signals. If the value of the select bit is 0 it selects the second argument; if it is 1 it selects the third argument. Select must be a single bit, while the other two arguments must be the same length. * The ``c`` operator is the concatenation operator and combines any number of WireVectors (``a``, ``b``, ..., ``z``) into a single new WireVector with ``a`` in the MSB and ``z`` (or whatever is last) in the LSB position. * The ``s`` operator is the selection operator and chooses, based on the `op_param` specified, a subset of the logic bits from a WireVector to select. Repeats are accepted. * The ``r`` operator is a register and on posedge, simply copies the value from the input to the output of the register. * The ``m`` operator is a memory block read port, which supports async reads (acting like combinational logic). Multiple read (and write) ports are possible to the same memory but each ``m`` defines only one of those. The `op_param` is a tuple containing two references: the mem id, and a reference to the MemBlock containing this port. The MemBlock should only be used for debug and sanity checks. Each read port has one `addr` (an arg) and one `data` (a dest). * The ``@`` (update) operator is a memory block write port, which supports synchronous writes (writes are "latched" at positive edge). Multiple write (and read) ports are possible to the same memory but each ``@`` defines only one of those. The `op_param` is a tuple containing two references: the mem id, and a reference to the MemoryBlock. Writes have three args (`addr`, `data`, and write enable `we_en`). The dests should be an empty tuple. You will not see a written value change until the following cycle. If multiple writes happen to the same address in the same cycle the behavior is currently undefined. The connecting elements (args and dests) should be WireVectors or derived from WireVector, and should be registered with the block using :func:`.Block.add_wirevector`. Nets should be registered using :func:`.Block.add_net`. In addition, there is a member :attr:`.Block.legal_ops` which defines the set of operations that can be legally added to the block. By default it is set to all of the above defined operations, but it can be useful in certain cases to only allow a subset of operations (such as when transforms are being done that are "lowering" the blocks to more primitive ops). """ def __init__(self): """Creates an empty hardware block.""" self.logic = set() # set of nets, each is a LogicNet named tuple self.wirevector_set = set() # set of all WireVectors self.wirevector_by_name = {} # map from name->WireVector, used for performance # pre-synthesis WireVectors to post-synthesis vectors self.legal_ops = set('w~&|^n+-*<>=xcsrm@') # set of legal OPS self.rtl_assert_dict = {} # map from WireVectors -> exceptions, used by rtl_assert self.memblock_by_name = {} # map from name->memblock, for easy access to memblock objs def __str__(self): """String form has one LogicNet per line.""" from .helperfuncs import _currently_in_jupyter_notebook, _print_netlist_latex if _currently_in_jupyter_notebook(): _print_netlist_latex(list(self)) return ' ' else: return '\n'.join(str(net) for net in self) def add_wirevector(self, wirevector): """ Add a WireVector object to the block. :param WireVector wirevector: WireVector object added to block """ self.sanity_check_wirevector(wirevector) self.wirevector_set.add(wirevector) self.wirevector_by_name[wirevector.name] = wirevector def remove_wirevector(self, wirevector): """ Remove a WireVector object from the block. :param WireVector wirevector: WireVector object removed from block """ self.wirevector_set.remove(wirevector) del self.wirevector_by_name[wirevector.name] def add_net(self, net): """ Add a net to the logic of the block. :param LogicNet net: LogicNet object added to block The passed net, which must be of type LogicNet, is checked and then added to the block. No wires are added by this member, they must be added seperately with :func:`.Block.add_wirevector`.""" self.sanity_check_net(net) self.logic.add(net) def _add_memblock(self, mem): """ Registers a memory to the block. Note that this is done automatically when a memory block is created and isn't intended for use by PyRTL end users. This is so non-local memories can be accessed later on (e.g. for instantiating during a simulation). """ self.sanity_check_memblock(mem) self.memblock_by_name[mem.name] = mem def get_memblock_by_name(self, name, strict=False): """ Get a reference to a memory stored in this block by name. :param str name: name of MemBlock object :param bool strict: Determines if PyrtlError or None is thrown on no match. Defaults to False. :return: a MemBlock object with specified name By fallthrough, if a matching MemBlock cannot be found the value None is returned. However, if the argument strict is set to True, then this will instead throw a PyrtlError when no match is found. This useful for when a block defines its own internal memory block, and during simulation you want to instantiate that memory with certain values for testing. Since the Simulation constructor requires a reference to the memory object itself, but the block you're testing defines the memory internally, this allows you to get the object reference. Note that this requires you know the name of the memory block, meaning that you most likely need to have named it yourself. Example:: def special_memory(read_addr, write_addr, data, wen): mem = pyrtl.MemBlock(bitwidth=32, addrwidth=5, name='special_mem') mem[write_addr] <<= pyrtl.MemBlock.EnabledWrite(data, wen & (write_addr > 0)) return mem[read_addr] read_addr = pyrtl.Input(5, 'read_addr') write_addr = pyrtl.Input(5, 'write_addr') data = pyrtl.Input(32, 'data') wen = pyrtl.Input(1, 'wen') res = pyrtl.Output(32, 'res') res <<= special_memory(read_addr, write_addr, data, wen) # Can only access it after the `special_memory` block has been instantiated/called special_mem = pyrtl.working_block().get_memblock_by_name('special_mem') sim = pyrtl.Simulation(memory_value_map={ special_mem: { 0: 5, 1: 6, 2: 7, } }) inputs = { 'read_addr': '012012', 'write_addr': '012012', 'data': '890333', 'wen': '111000', } expected = { 'res': '567590', } sim.step_multiple(inputs, expected) """ if name in self.memblock_by_name: return self.memblock_by_name[name] elif strict: raise PyrtlError('error, block does not have a memblock named %s' % name) else: return None def wirevector_subset(self, cls=None, exclude=tuple()): """Return set of WireVectors, filtered by the type or tuple of types provided as `cls`. :param cls: Type of returned WireVector objects :param exclude: Type of WireVector objects to exclude :return: Set of WireVector objects that are both a cls type and not a excluded type If no `cls` is specified, the full set of WireVectors associated with the Block are returned. If `cls` is a single type, or a tuple of types, only those WireVectors of the matching types will be returned. This is helpful for getting all inputs, outputs, or registers of a block for example. Examples:: inputs = pyrtl.working_block().wirevector_subset(pyrtl.Input) outputs = pyrtl.working_block().wirevector_subset(pyrtl.Output) # returns set of all non-input WireVectors non_inputs = pyrtl.working_block().wirevector_subset(exclude=pyrtl.Input) """ if cls is None: initial_set = self.wirevector_set else: initial_set = (x for x in self.wirevector_set if isinstance(x, cls)) if exclude == tuple(): return set(initial_set) else: return set(x for x in initial_set if not isinstance(x, exclude)) def logic_subset(self, op=None): """Return set of LogicNets, filtered by the type(s) of logic op provided as op. :param op: Operation of LogicNet to filter by. Defaults to None. :return: set of LogicNets with corresponding op If no `op` is specified, the full set of LogicNets associated with the Block are returned. This is helpful for getting all memories of a block for example. """ if op is None: return self.logic else: return set(x for x in self.logic if x.op in op) def get_wirevector_by_name(self, name, strict=False): """Return the WireVector matching name. :param str name: name of WireVector object :param bool strict: Determines if PyrtlError or None is thrown on no match. Defaults to False. :return: a WireVector object with specified name By fallthrough, if a matching WireVector cannot be found the value None is returned. However, if the argument strict is set to True, then this will instead throw a PyrtlError when no match is found. """ if name in self.wirevector_by_name: return self.wirevector_by_name[name] elif strict: raise PyrtlError('error, block does not have a WireVector named %s' % name) else: return None class _NetConnectionsDict(dict): """ Dictionary wrapper for returning the results of the enclosing function. User doesn't need to know about this class; it's only for delivering a nice error message when _MemIndexed is used as a lookup key. """ def __missing__(self, key): from .memory import _MemIndexed if isinstance(key, _MemIndexed): raise PyrtlError( "Cannot look up a _MemIndexed object's source or destination net. " "Try using its '.wire' attribute as the lookup key instead." ) else: raise KeyError(key) def net_connections(self, include_virtual_nodes=False): """Returns a representation of the current block useful for creating a graph. :param bool include_virtual_nodes: if enabled, the wire itself will be used to signal an external source or sink (such as the source for an Input net). If disabled, these nodes will be excluded from the adjacency dictionaries :return: Two dictionaries: one that maps WireVectors to the logic net that creates their signal (`wire_src_dict`) and one that maps WireVectors to a list of logic nets that use the signal (`wire_sink_dict`). These dictionaries make the creation of a graph much easier, as well as facilitate other places in which one would need wire source and wire sink information. Look at :func:`.net_graph` for one such graph that uses the information from this function. """ src_list = {} dst_list = {} def add_wire_src(edge, node): if edge in src_list: raise PyrtlError('Wire "{}" has multiple drivers: [{}] and [{}] (check for ' 'multiple assignments with "<<=" or accidental mixing of ' '"|=" and "<<=")' .format(edge, str(src_list[edge]).strip(), str(node).strip())) src_list[edge] = node def add_wire_dst(edge, node): if edge in dst_list: # if node in dst_list[edge]: # raise PyrtlError("The net already exists in the graph") dst_list[edge].append(node) else: dst_list[edge] = [node] if include_virtual_nodes: from .wire import Input, Output, Const for wire in self.wirevector_subset((Input, Const)): add_wire_src(wire, wire) for wire in self.wirevector_subset(Output): add_wire_dst(wire, wire) for net in self.logic: for arg in set(net.args): # prevents unexpected duplicates when doing b <<= a & a add_wire_dst(arg, net) for dest in net.dests: add_wire_src(dest, net) return Block._NetConnectionsDict(src_list), Block._NetConnectionsDict(dst_list) def _repr_svg_(self): """ IPython display support for Block. """ from .visualization import block_to_svg return block_to_svg(self) def __iter__(self): """ BlockIterator iterates over the block passed on init in topographic order. The input is a Block, and when a LogicNet is returned it is always the case that all of its "parents" have already been returned earlier in the iteration. Note: this method will throw an error if there are loops in the logic that do not involve registers. Also, the order of the nets is not guaranteed to be the same over multiple iterations. """ from .wire import Input, Const, Register src_dict, dest_dict = self.net_connections() to_clear = self.wirevector_subset((Input, Const, Register)) cleared = set() remaining = self.logic.copy() try: while len(to_clear): wire_to_check = to_clear.pop() cleared.add(wire_to_check) if wire_to_check in dest_dict: for gate in dest_dict[wire_to_check]: # loop over logicnets not yet returned if all(arg in cleared for arg in gate.args): # if all args ready yield gate remaining.remove(gate) if gate.op != 'r': to_clear.update(gate.dests) except KeyError as e: raise PyrtlError("Cannot Iterate through malformed block") from e if len(remaining) != 0: from pyrtl.helperfuncs import find_and_print_loop find_and_print_loop(self) raise PyrtlError("Failure in Block Iterator due to non-register loops") def sanity_check(self): """ Check block and throw PyrtlError or PyrtlInternalError if there is an issue. Should not modify anything, only check data structures to make sure they have been built according to the assumptions stated in the Block comments. """ from .wire import Input, Const, Output from .helperfuncs import get_stack, get_stacks # check for valid LogicNets (and wires) for net in self.logic: self.sanity_check_net(net) for w in self.wirevector_subset(): if w.bitwidth is None: raise PyrtlError( 'error, missing bitwidth for WireVector "%s" \n\n %s' % (w.name, get_stack(w))) # check for unique names wirevector_names_set = set(x.name for x in self.wirevector_set) if len(self.wirevector_set) != len(wirevector_names_set): wirevector_names_list = [x.name for x in self.wirevector_set] for w in wirevector_names_set: wirevector_names_list.remove(w) raise PyrtlError('Duplicate wire names found for the following ' 'different signals: %s (make sure you are not using "tmp" ' 'or "const_" as a signal name because those are reserved for ' 'internal use)' % repr(wirevector_names_list)) # The following line also checks for duplicate wire drivers wire_src_dict, wire_dst_dict = self.net_connections() dest_set = set(wire_src_dict.keys()) arg_set = set(wire_dst_dict.keys()) full_set = dest_set | arg_set connected_minus_allwires = full_set.difference(self.wirevector_set) if len(connected_minus_allwires) > 0: bad_wire_names = '\n '.join(str(x) for x in connected_minus_allwires) raise PyrtlError('Unknown wires found in net:\n %s \n\n %s' % (bad_wire_names, get_stacks(*connected_minus_allwires))) all_input_and_consts = self.wirevector_subset((Input, Const)) # Check for wires that aren't connected to anything (inputs and consts can be unconnected) allwires_minus_connected = self.wirevector_set.difference(full_set) allwires_minus_connected = allwires_minus_connected.difference(all_input_and_consts) if len(allwires_minus_connected) > 0: bad_wire_names = '\n '.join(str(x) for x in allwires_minus_connected) raise PyrtlError('Wires declared but not connected:\n %s \n\n %s' % (bad_wire_names, get_stacks(*allwires_minus_connected))) # Check for wires that are inputs to a logicNet, but are not block inputs and are never # driven. ins = arg_set.difference(dest_set) undriven = ins.difference(all_input_and_consts) if len(undriven) > 0: raise PyrtlError('Wires used but never driven: %s \n\n %s' % ([w.name for w in undriven], get_stacks(*undriven))) # Check for async memories not specified as such self.sanity_check_memory_sync(wire_src_dict) # Check that all mappings in wirevector_by_name are consistent bad_wv_by_name = [w for n, w in self.wirevector_by_name.items() if n != w.name] if bad_wv_by_name: raise PyrtlInternalError('Wires with inconsistent entry in wirevector_by_name ' 'dict: %s' % [w.name for w in bad_wv_by_name]) # Check that all wires are in wirevector_by_name wv_by_name_set = set(self.wirevector_by_name.keys()) missing_wires = wirevector_names_set.difference(wv_by_name_set) if missing_wires: raise PyrtlInternalError('Missing entries in wirevector_by_name for the ' 'following wires: %s' % missing_wires) unknown_wires = wv_by_name_set.difference(wirevector_names_set) if unknown_wires: raise PyrtlInternalError('Unknown wires found in wirevector_by_name: %s' % unknown_wires) if debug_mode: # Check for wires that are destinations of a logicNet, but are not outputs and are never # used as args. outs = dest_set.difference(arg_set) unused = outs.difference(self.wirevector_subset(Output)) if len(unused) > 0: names = [w.name for w in unused] print('Warning: Wires driven but never used { %s } ' % names) print(get_stacks(*unused)) def sanity_check_memory_sync(self, wire_src_dict=None): """ Check that all memories are synchronous unless explicitly specified as async. While the semantics of 'm' memories reads is asynchronous, if you want your design to use a block ram (on an FPGA or otherwise) you want to make sure the index is available at the beginning of the clock edge. This check will walk the logic structure and throw an error on any memory if finds that has an index that is not ready at the beginning of the cycle. """ sync_mems = set(m for m in self.logic_subset('m') if not m.op_param[1].asynchronous) if not len(sync_mems): return # nothing to check here if wire_src_dict is None: wire_src_dict, wdd = self.net_connections() from .wire import Input, Const sync_src = 'r' sync_prop = 'wcs' for net in sync_mems: wires_to_check = list(net.args) while len(wires_to_check): wire = wires_to_check.pop() if isinstance(wire, (Input, Const)): continue src_net = wire_src_dict[wire] if src_net.op == sync_src: continue elif src_net.op in sync_prop: wires_to_check.extend(src_net.args) else: raise PyrtlError( 'memory "%s" is not specified as asynchronous but has an index ' '"%s" that is not ready at the start of the cycle due to net "%s"' % (net.op_param[1].name, net.args[0].name, str(src_net))) def sanity_check_wirevector(self, w): """ Check that w is a valid WireVector type. """ from .wire import WireVector if not isinstance(w, WireVector): raise PyrtlError( 'error attempting to pass an input of type "%s" ' 'instead of WireVector' % type(w)) def sanity_check_memblock(self, m): """ Check that m is a valid memblock type. """ from .memory import MemBlock if not isinstance(m, MemBlock): raise PyrtlError( 'error attempting to pass an input of type "%s" ' 'instead of MemBlock' % type(m)) def sanity_check_net(self, net): """ Check that net is a valid LogicNet. """ from .wire import Input, Output, Const, Register from .memory import MemBlock # general sanity checks that apply to all operations if not isinstance(net, LogicNet): raise PyrtlInternalError('error, net must be of type LogicNet') if not isinstance(net.args, tuple): raise PyrtlInternalError('error, LogicNet args must be tuple') if not isinstance(net.dests, tuple): raise PyrtlInternalError('error, LogicNet dests must be tuple') for w in net.args + net.dests: self.sanity_check_wirevector(w) if w._block is not self: raise PyrtlInternalError('error, net references different block') if w not in self.wirevector_set: raise PyrtlInternalError('error, net with unknown source "%s"' % w.name) # checks that input and output WireVectors are not misused bad_dests = set(filter(lambda w: isinstance(w, (Input, Const)), net.dests)) if bad_dests: raise PyrtlInternalError('error, Inputs, Consts cannot be destinations to a net (%s)' % ','.join(map(str, bad_dests))) bad_args = set(filter(lambda w: isinstance(w, (Output)), net.args)) if bad_args: raise PyrtlInternalError('error, Outputs cannot be arguments for a net (%s)' % ','.join(map(str, bad_args))) if net.op not in self.legal_ops: raise PyrtlInternalError('error, net op "%s" not from acceptable set %s' % (net.op, self.legal_ops)) # operation-specific checks on arguments if net.op in 'w~rsm' and len(net.args) != 1: raise PyrtlInternalError('error, op only allowed 1 argument') if net.op in '&|^n+-*<>=' and len(net.args) != 2: raise PyrtlInternalError('error, op only allowed 2 arguments') if net.op == 'x': if len(net.args) != 3: raise PyrtlInternalError('error, op only allowed 3 arguments') if net.args[1].bitwidth != net.args[2].bitwidth: raise PyrtlInternalError('error, args have mismatched bitwidths') if net.args[0].bitwidth != 1: raise PyrtlInternalError('error, mux select must be a single bit') if net.op == '@' and len(net.args) != 3: raise PyrtlInternalError('error, op only allowed 3 arguments') if net.op in '&|^n+-*<>=' and net.args[0].bitwidth != net.args[1].bitwidth: raise PyrtlInternalError('error, args have mismatched bitwidths') if net.op in 'm@' and net.args[0].bitwidth != net.op_param[1].addrwidth: raise PyrtlInternalError('error, mem addrwidth mismatch') if net.op == '@' and net.args[1].bitwidth != net.op_param[1].bitwidth: raise PyrtlInternalError('error, mem bitwidth mismatch') if net.op == '@' and net.args[2].bitwidth != 1: raise PyrtlInternalError('error, mem write enable must be 1 bit') # operation-specific checks on op_params if net.op in 'w~&|^n+-*<>=xcr' and net.op_param is not None: raise PyrtlInternalError('error, op_param should be None') if net.op == 's': if not isinstance(net.op_param, tuple): raise PyrtlInternalError('error, select op requires tuple op_param') for p in net.op_param: if not isinstance(p, int): raise PyrtlInternalError('error, select op_param requires ints') if p < 0 or p >= net.args[0].bitwidth: raise PyrtlInternalError('error, op_param out of bounds') if net.op in 'm@': if not isinstance(net.op_param, tuple): raise PyrtlInternalError('error, mem op requires tuple op_param') if len(net.op_param) != 2: raise PyrtlInternalError('error, mem op requires 2 op_params in tuple') if not isinstance(net.op_param[0], int): raise PyrtlInternalError('error, mem op requires first operand as int') if not isinstance(net.op_param[1], MemBlock): raise PyrtlInternalError('error, mem op requires second operand of a memory type') # operation-specific checks on destinations if net.op in 'w~&|^n+-*<>=xcsrm' and len(net.dests) != 1: raise PyrtlInternalError('error, op only allowed 1 destination') if net.op == '@' and net.dests != (): raise PyrtlInternalError('error, mem write dest should be empty tuple') if net.op == 'r' and not isinstance(net.dests[0], Register): raise PyrtlInternalError('error, dest of next op should be a Register') # check destination validity if net.op in 'w~&|^nr' and net.dests[0].bitwidth > net.args[0].bitwidth: raise PyrtlInternalError('error, upper bits of destination unassigned') if net.op in '<>=' and net.dests[0].bitwidth != 1: raise PyrtlInternalError('error, destination should be of bitwidth=1') if net.op in '+-' and net.dests[0].bitwidth > net.args[0].bitwidth + 1: raise PyrtlInternalError('error, upper bits of destination unassigned') if net.op == '*' and net.dests[0].bitwidth > 2 * net.args[0].bitwidth: raise PyrtlInternalError('error, upper bits of destination unassigned') if net.op == 'x' and net.dests[0].bitwidth > net.args[1].bitwidth: raise PyrtlInternalError('error, upper bits of mux output undefined') if net.op == 'c' and net.dests[0].bitwidth > sum(x.bitwidth for x in net.args): raise PyrtlInternalError('error, upper bits of concat output undefined') if net.op == 's' and net.dests[0].bitwidth > len(net.op_param): raise PyrtlInternalError('error, upper bits of select output undefined') if net.op == 'm' and net.dests[0].bitwidth != net.op_param[1].bitwidth: raise PyrtlInternalError('error, mem read dest bitwidth mismatch')
[docs] class PostSynthBlock(Block): """ This is a block with extra metadata required to maintain the pre-synthesis interface during post-synthesis. It currently holds the following instance attributes: `io_map`: a map from old IO WireVector to a list of new IO WireVectors it maps to; this is a list because for unmerged IO vectors, each old N-bit IO WireVector maps to N new 1-bit IO WireVectors. `reg_map`: a map from old register to a list of new registers; a list because post-synthesis, each N-bit register has been mapped to N 1-bit registers `mem_map`: a map from old memory block to the new memory block """ def __init__(self): super(PostSynthBlock, self).__init__() self.io_map = collections.defaultdict(list) self.reg_map = collections.defaultdict(list) self.mem_map = {}
# ----------------------------------------------------------------------- # __ __ __ __ __ __ # | | / \ |__) |__/ | |\ | / _` |__) | / \ / ` |__/ # |/\| \__/ | \ | \ | | \| \__> |__) |___ \__/ \__, | \ # # Right now we use singleton_block to store the one global # block, but in the future we should support multiple Blocks. # The argument "singleton_block" should never be passed. _singleton_block = Block() # settings help tweak the behavior of pyrtl as needed, especially # when there is a trade off between speed and debugability. These # are useful for developers to adjust behaviors in the different modes # but should not be set directly by users. debug_mode = False _setting_keep_wirevector_call_stack = False _setting_slower_but_more_descriptive_tmps = False def _get_debug_mode(): return debug_mode def _get_useful_callpoint_name(): """ Attempts to find the lowest user-level call into the PyRTL module. :return (string, int) or None: the file name and line number respectively This function walks back the current frame stack attempting to find the first frame that is not part of the pyrtl module. The filename (stripped of path and .py extention) and line number of that call are returned. This point should be the point where the user-level code is making the call to some pyrtl intrisic (for example, calling "mux"). If the attempt to find the callpoint fails for any reason, None is returned. """ if not _setting_slower_but_more_descriptive_tmps: return None import inspect loc = None frame_stack = inspect.stack() try: for frame in frame_stack: modname = inspect.getmodule(frame[0]).__name__ if not modname.startswith('pyrtl.'): full_filename = frame[0].f_code.co_filename filename = full_filename.split('/')[-1].rstrip('.py') lineno = frame[0].f_lineno loc = (filename, lineno) break except Exception: loc = None finally: del frame_stack return loc
[docs] def working_block(block=None): """ Convenience function for capturing the current working block. If a block is not passed, or if the block passed is None, then this will return the "current working block". However, if a block is passed in it will simply return that block instead. This feature is useful in allowing functions to "override" the current working block. """ if block is None: return _singleton_block elif not isinstance(block, Block): raise PyrtlError('error, expected instance of Block as block argument') else: return block
[docs] def reset_working_block(): """ Reset the working block to be empty. """ global _singleton_block _singleton_block = Block()
[docs] class set_working_block(object): """ Set the working block to be the block passed as argument. Compatible with the `with` statement. Sanity checks will only be run if the new block is different from the original block. """ @staticmethod def _set_working_block(block, no_sanity_check=False): global _singleton_block if not isinstance(block, Block): raise PyrtlError('error, expected instance of Block as block argument') if block is not _singleton_block: # don't update if the blocks are the same if not no_sanity_check: block.sanity_check() _singleton_block = block def __init__(self, block, no_sanity_check=False): self.old_block = working_block() # for with statement compatibility self._set_working_block(working_block(block), no_sanity_check) def __enter__(self): return self.old_block def __exit__(self, exc_type, exc_val, exc_tb): self._set_working_block(self.old_block, no_sanity_check=True)
[docs] def temp_working_block(): """ Set the working block to be new temporary block. If used with the `with` statement the block will be reset to the original value (at the time of call) at exit of the context. """ return set_working_block(Block())
[docs] def set_debug_mode(debug=True): """ Set the global debug mode. :param bool debug: Optional boolean paramter to which debug mode will be set This function will set the debug mode to the specified value. Debug mode is, by default, set to off to keep the performance of the system. With debug mode set to true, all temporary WireVectors created will be given a name based on the line of code on which they were created and a snapshot of the call-stack for those WireVectors will be kept as well. """ global debug_mode global _setting_keep_wirevector_call_stack global _setting_slower_but_more_descriptive_tmps debug_mode = debug _setting_keep_wirevector_call_stack = debug _setting_slower_but_more_descriptive_tmps = debug
_py_regex = r'^[^\d\W]\w*\Z' class _NameIndexer(object): """ Provides internal names that are based on a prefix and an index. """ def __init__(self, internal_prefix='_sani_temp'): self.internal_prefix = internal_prefix self.internal_index = 0 def make_valid_string(self): """ Build a valid string based on the prefix and internal index. """ return self.internal_prefix + str(self.next_index()) def next_index(self): index = self.internal_index self.internal_index += 1 return index class _NameSanitizer(_NameIndexer): """ Sanitizes the names so that names can be used in places that don't allow for arbitrary names while not mangling valid names. Put the values you want to validate into make_valid_string the first time you want to sanitize a particular string (or before the first time), and retrieve from the _NameSanitizer through indexing directly thereafter eg: sani["__&sfhs"] for retrieval after the first time """ def __init__(self, identifier_regex_str, internal_prefix='_sani_temp', map_valid_vals=True, extra_checks=lambda x: True, allow_duplicates=False): if identifier_regex_str[-1] != '$': identifier_regex_str += '$' self.identifier = re.compile(identifier_regex_str) self.val_map = {} self.map_valid = map_valid_vals self.extra_checks = extra_checks self.allow_dups = allow_duplicates super(_NameSanitizer, self).__init__(internal_prefix) def __getitem__(self, item): """ Get a value from the sanitizer """ if not self.map_valid and self.is_valid_str(item): return item return self.val_map[item] def is_valid_str(self, string): return self.identifier.match(string) and self.extra_checks(string) def make_valid_string(self, string=''): """ Inputting a value for the first time. """ if not self.is_valid_str(string): if string in self.val_map and not self.allow_dups: raise IndexError("Value {} has already been given to the sanitizer".format(string)) internal_name = super(_NameSanitizer, self).make_valid_string() self.val_map[string] = internal_name return internal_name else: if self.map_valid: self.val_map[string] = string return string class _PythonSanitizer(_NameSanitizer): """ Name Sanitizer specifically built for Python identifers. """ def __init__(self, internal_prefix='_sani_temp', map_valid_vals=True): super(_PythonSanitizer, self).__init__(_py_regex, internal_prefix, map_valid_vals) self.extra_checks = lambda s: not keyword.iskeyword(s)