"""
Helper functions for reading and writing hardware files.
Each of the functions in inputoutput take a block and a file descriptor.
The functions provided either read the file and update the Block
accordingly, or write information from the Block out to the file.
"""
import re
import collections
import tempfile
import os
import subprocess
import sys
import functools
import operator
from .pyrtlexceptions import PyrtlError, PyrtlInternalError
from .core import working_block, _NameSanitizer
from .wire import WireVector, Input, Output, Const, Register, next_tempvar_name
from .corecircuits import concat_list, rtl_all, rtl_any, select
from .memory import RomBlock
from .passes import two_way_concat, one_bit_selects
def _natural_sort_key(key):
""" Convert the key into a form such that it will be sorted naturally,
e.g. such that "tmp4" appears before "tmp18".
"""
def convert(text):
return int(text) if text.isdigit() else text
return [convert(c) for c in re.split(r'(\d+)', key)]
def _net_sorted(logic, name_mapper=lambda w: w.name):
# Sort nets based on the name of the destination
# wire, unless it's a memory write net.
def natural_keys(n):
if n.op == '@':
# Sort based on the name of the wr_en wire, since
# this particular net is used within 'always begin ... end'
# blocks for memory update logic.
key = str(n.args[2])
else:
key = name_mapper(n.dests[0])
return _natural_sort_key(key)
return sorted(logic, key=natural_keys)
def _name_sorted(wires, name_mapper=lambda w: w.name):
return sorted(wires, key=lambda w: _natural_sort_key(name_mapper(w)))
# -----------------------------------------------------------------
# __ ___
# |__) | | |___
# |__) |___ | |
class Subcircuit:
"""
This is a way to create and track per-module-instance wire names, so there
are not name clashes when we instantiate a module more than once.
"""
def __init__(self, model, is_top=False, clk_set={'clk'}, block=None):
self.model = model
self.is_top = is_top
self.clk_set = clk_set
self.block = working_block(block)
self.inputs = {}
self.outputs = {}
self.wirevector_by_name = {}
def add_input(self, original_name, wire):
self.inputs[original_name] = wire
self.wirevector_by_name[original_name] = wire
def add_output(self, original_name, wire):
self.outputs[original_name] = wire
self.wirevector_by_name[original_name] = wire
def add_reg(self, original_name, wire):
self.wirevector_by_name[original_name] = wire
def add_clock(self, clock_name):
self.clk_set.add(clock_name)
def twire(self, x):
""" Find or make wire named x and return it. """
s = self.wirevector_by_name.get(x)
if s is None:
# Purposefully *not* setting its name to 'x', so we don't have name clashes
s = WireVector(bitwidth=1)
self.wirevector_by_name[x] = s
return s
# ----------------------------------------------------------------
# ___ __ __ __
# \ / |__ |__) | | / \ / _`
# \/ |___ | \ | |___ \__/ \__>
#
def input_from_verilog(verilog, clock_name='clk', toplevel=None, leave_in_dir=None, block=None):
""" Read an open Verilog file or string as input via Yosys conversion, updating the block.
:param verilog: An open Verilog file to read
:param clock_name: The name of the clock (defaults to 'clk')
:param toplevel: Name of top-level module to instantiate; if None, defaults to first model
defined in the Verilog file
:param bool leave_in_dir: If True, save the intermediate BLIF file created in
the given directory
:param block: The block where the logic will be added
Note: This function is essentially a wrapper for `input_from_blif()`, with the added convenience
of turning the Verilog into BLIF for import for you. This function passes a set of commands to
Yosys as a script that normally produces BLIF files that can be successuflly imported into
PyRTL via `input_from_blif()`. If the Yosys conversion fails here, we recommend you create your
own custom Yosys script to try and produce BLIF yourself. Then you can import BLIF directly via
`input_from_blif()`.
"""
# Dev Notes:
# 1) We pass in an open file or string to keep the same API as input_from_blif (even though
# we are essentially putting that Verilog code back in a file for Yosys to operate on).
# 2) The Yosys script does not have a call to `flatten`, since we now have support for
# multi-module BLIFs. `flatten` replaces the .subckt (i.e. the module inclusions) with their
# actual contents, so if things aren't working for some reason, add this command for help.
# 3) The Yosys script *does* have a call to `setundef -zero -undriven`, which we use to set
# undriven nets to 0; we do this so PyRTL doesn't complain about used but undriven wires.
try:
verilog_string = verilog.read()
except AttributeError:
if isinstance(verilog, str):
verilog_string = verilog
else:
raise PyrtlError('input_from_verilog expecting either open file or string')
block = working_block(block)
# Create a temporary Verilog file
temp_vd, tmp_verilog_path = tempfile.mkstemp(prefix='pyrtl_verilog', suffix='.v',
dir=leave_in_dir, text=True)
with open(tmp_verilog_path, 'w') as f:
f.write(verilog_string)
# Create a temporary BLIF file
temp_bd, tmp_blif_path = tempfile.mkstemp(prefix='pyrtl_blif', suffix='.blif',
dir=leave_in_dir, text=True)
yosys_arg_template = (
"-p "
"read_verilog %s; "
"synth %s; "
"setundef -zero -undriven; "
"opt; "
"write_blif %s; "
)
yosys_arg = yosys_arg_template % (tmp_verilog_path,
('-top ' + toplevel) if toplevel is not None else '-auto-top',
tmp_blif_path)
try:
os.close(temp_vd)
os.close(temp_bd)
# call yosys on the script, and grab the output
_yosys_output = subprocess.check_output(['yosys', yosys_arg])
with open(tmp_blif_path) as blif:
input_from_blif(blif, block=block, clock_name=clock_name, top_model=toplevel)
except (subprocess.CalledProcessError, ValueError) as e:
print('Error with call to yosys...', file=sys.stderr)
print('---------------------------------------------', file=sys.stderr)
print(str(e.output).replace('\\n', '\n'), file=sys.stderr)
print('---------------------------------------------', file=sys.stderr)
raise PyrtlError('Yosys callfailed')
except OSError as e:
print('Error with call to yosys...', file=sys.stderr)
raise PyrtlError('Call to yosys failed (not installed or on path?)')
finally:
os.remove(tmp_verilog_path)
if leave_in_dir is None:
os.remove(tmp_blif_path)
[docs]
def output_to_verilog(dest_file, add_reset=True, block=None):
""" A function to walk the block and output it in Verilog format to the open file.
:param dest_file: Open file where the Verilog output will be written
:param Union[bool, str] add_reset: If reset logic should be added. Allowable options are:
False (meaning no reset logic is added), True (default, for adding synchronous
reset logic), and `asynchronous` (for adding asynchronous reset logic).
:param block: Block to be walked and exported
The registers will be set to their `reset_value`, if specified, otherwise 0.
"""
if not isinstance(add_reset, bool):
if add_reset != 'asynchronous':
raise PyrtlError("Invalid add_reset option %s. Acceptable options are "
"False, True, and 'asynchronous'")
block = working_block(block)
file = dest_file
internal_names = _VerilogSanitizer('_ver_out_tmp_')
if add_reset: # True or 'asynchronous'
if block.get_wirevector_by_name('rst') is not None:
raise PyrtlError("Found a user-defined wire named 'rst'. Pass in "
"'add_reset=False' to use your existing reset logic.")
for wire in block.wirevector_set:
internal_names.make_valid_string(wire.name)
def varname(wire):
return internal_names[wire.name]
_to_verilog_header(file, block, varname, add_reset)
_to_verilog_combinational(file, block, varname)
_to_verilog_sequential(file, block, varname, add_reset)
_to_verilog_memories(file, block, varname)
_to_verilog_footer(file)
def OutputToVerilog(dest_file, block=None):
""" A deprecated function to output Verilog, use "output_to_verilog" instead. """
return output_to_verilog(dest_file, block)
class _VerilogSanitizer(_NameSanitizer):
_ver_regex = r'[_A-Za-z][_a-zA-Z0-9\$]*$'
_verilog_reserved = \
"""always and assign automatic begin buf bufif0 bufif1 case casex casez cell cmos
config deassign default defparam design disable edge else end endcase endconfig
endfunction endgenerate endmodule endprimitive endspecify endtable endtask
event for force forever fork function generate genvar highz0 highz1 if ifnone
incdir include initial inout input instance integer join large liblist library
localparam macromodule medium module nand negedge nmos nor noshowcancelledno
not notif0 notif1 or output parameter pmos posedge primitive pull0 pull1
pulldown pullup pulsestyle_oneventglitch pulsestyle_ondetectglitch remos real
realtime reg release repeat rnmos rpmos rtran rtranif0 rtranif1 scalared
showcancelled signed small specify specparam strong0 strong1 supply0 supply1
table task time tran tranif0 tranif1 tri tri0 tri1 triand trior trireg unsigned
use vectored wait wand weak0 weak1 while wire wor xnor xor
"""
def __init__(self, internal_prefix='_sani_temp', map_valid_vals=True):
self._verilog_reserved_set = frozenset(self._verilog_reserved.split())
super(_VerilogSanitizer, self).__init__(self._ver_regex, internal_prefix,
map_valid_vals, self._extra_checks)
def _extra_checks(self, str):
return (str not in self._verilog_reserved_set # is not a Verilog reserved keyword
and str != 'clk' # not the clock signal
and len(str) <= 1024) # not too long to be a Verilog id
def _verilog_vector_size_decl(n):
return '' if n == 1 else '[{:d}:0]'.format(n - 1)
def _verilog_vector_decl(w):
return _verilog_vector_size_decl(len(w))
def _verilog_block_parts(block):
inputs = block.wirevector_subset(Input)
outputs = block.wirevector_subset(Output)
registers = block.wirevector_subset(Register)
wires = block.wirevector_subset() - (inputs | outputs | registers)
memories = {n.op_param[1] for n in block.logic_subset('m@')}
return inputs, outputs, registers, wires, memories
def _to_verilog_header(file, block, varname, add_reset):
""" Print the header of the verilog implementation. """
def name_sorted(wires):
return _name_sorted(wires, name_mapper=varname)
def name_list(wires):
return [varname(w) for w in wires]
print('// Generated automatically via PyRTL', file=file)
print('// As one initial test of synthesis, map to FPGA with:', file=file)
print('// yosys -p "synth_xilinx -top toplevel" thisfile.v\n', file=file)
inputs, outputs, registers, wires, memories = _verilog_block_parts(block)
# module name
io_list = ['clk'] + name_list(name_sorted(inputs)) + name_list(name_sorted(outputs))
if add_reset:
io_list.insert(1, 'rst')
if any(w.startswith('tmp') for w in io_list):
raise PyrtlError('input or output with name starting with "tmp" indicates unnamed IO')
io_list_str = ', '.join(io_list)
print('module toplevel({:s});'.format(io_list_str), file=file)
# inputs and outputs
print(' input clk;', file=file)
if add_reset:
print(' input rst;', file=file)
for w in name_sorted(inputs):
print(' input{:s} {:s};'.format(_verilog_vector_decl(w), varname(w)), file=file)
for w in name_sorted(outputs):
print(' output{:s} {:s};'.format(_verilog_vector_decl(w), varname(w)), file=file)
print('', file=file)
# memories and registers
for m in sorted(memories, key=lambda m: m.id):
memwidth_str = _verilog_vector_size_decl(m.bitwidth)
memsize_str = _verilog_vector_size_decl(1 << m.addrwidth)
print(' reg{:s} mem_{}{:s}; //{}'.format(memwidth_str, m.id,
memsize_str, m.name), file=file)
for w in name_sorted(registers):
print(' reg{:s} {:s};'.format(_verilog_vector_decl(w), varname(w)), file=file)
if (memories or registers):
print('', file=file)
# wires
for w in name_sorted(wires):
print(' wire{:s} {:s};'.format(_verilog_vector_decl(w), varname(w)), file=file)
print('', file=file)
# Write the initial values for read-only memories.
# If we ever add support outside of simulation for initial values
# for MemBlocks, that would also go here.
roms = {m for m in memories if isinstance(m, RomBlock)}
for m in sorted(roms, key=lambda m: m.id):
print(' initial begin', file=file)
for i in range(1 << m.addrwidth):
mem_elem_str = 'mem_{}[{:d}]'.format(m.id, i)
mem_data_str = "{:d}'h{:x}".format(m.bitwidth, m._get_read_data(i))
print(' {:s}={:s};'.format(mem_elem_str, mem_data_str), file=file)
print(' end', file=file)
print('', file=file)
def _to_verilog_combinational(file, block, varname):
""" Print the combinational logic of the verilog implementation. """
def name_sorted(wires):
return _name_sorted(wires, name_mapper=varname)
print(' // Combinational', file=file)
# assign constants (these could be folded for readability later)
for const in name_sorted(block.wirevector_subset(Const)):
print(' assign {:s} = {:d};'.format(varname(const), const.val), file=file)
# walk the block and output combination logic
for net in _net_sorted(block.logic, varname):
if net.op in 'w~': # unary ops
opstr = '' if net.op == 'w' else net.op
t = (varname(net.dests[0]), opstr, varname(net.args[0]))
print(' assign %s = %s%s;' % t, file=file)
elif net.op in '&|^+-*<>': # binary ops
t = (varname(net.dests[0]), varname(net.args[0]),
net.op, varname(net.args[1]))
print(' assign %s = %s %s %s;' % t, file=file)
elif net.op == '=':
t = (varname(net.dests[0]), varname(net.args[0]),
varname(net.args[1]))
print(' assign %s = %s == %s;' % t, file=file)
elif net.op == 'x':
# note that the argument order for 'x' is backwards from the ternary operator
t = (varname(net.dests[0]), varname(net.args[0]),
varname(net.args[2]), varname(net.args[1]))
print(' assign %s = %s ? %s : %s;' % t, file=file)
elif net.op == 'c':
catlist = ', '.join([varname(w) for w in net.args])
t = (varname(net.dests[0]), catlist)
print(' assign %s = {%s};' % t, file=file)
elif net.op == 's':
# someone please check if we need this special handling for scalars
catlist = ', '.join([varname(net.args[0]) + '[%s]' % str(i)
if len(net.args[0]) > 1 else varname(net.args[0])
for i in reversed(net.op_param)])
t = (varname(net.dests[0]), catlist)
print(' assign %s = {%s};' % t, file=file)
elif net.op in 'rm@':
pass # do nothing for registers and memories
else:
raise PyrtlInternalError("nets with op '{}' not supported".format(net.op))
print('', file=file)
def _to_verilog_sequential(file, block, varname, add_reset):
""" Print the sequential logic of the verilog implementation. """
if not block.logic_subset(op='r'):
return
print(' // Registers', file=file)
if add_reset == 'asynchronous':
print(' always @(posedge clk or posedge rst)', file=file)
else:
print(' always @(posedge clk)', file=file)
print(' begin', file=file)
if add_reset:
print(' if (rst) begin', file=file)
for net in _net_sorted(block.logic, varname):
if net.op == 'r':
dest = varname(net.dests[0])
rval = net.dests[0].reset_value
if rval is None:
rval = 0
print(' {:s} <= {:d};'.format(dest, rval), file=file)
print(' end', file=file)
print(' else begin', file=file)
else:
print(' begin', file=file)
for net in _net_sorted(block.logic, varname):
if net.op == 'r':
dest, src = (varname(net.dests[0]), varname(net.args[0]))
print(' {:s} <= {:s};'.format(dest, src), file=file)
print(' end', file=file)
print(' end', file=file)
print('', file=file)
def _to_verilog_memories(file, block, varname):
""" Print the memories of the verilog implementation. """
memories = {n.op_param[1] for n in block.logic_subset('m@')}
for m in sorted(memories, key=lambda m: m.id):
print(' // Memory mem_{}: {}'.format(m.id, m.name), file=file)
writes = [net for net in _net_sorted(block.logic_subset('@'), varname)
if net.op_param[1] == m]
if writes:
print(' always @(posedge clk)', file=file)
print(' begin', file=file)
for net in writes:
t = (varname(net.args[2]), net.op_param[0],
varname(net.args[0]), varname(net.args[1]))
print((' if (%s) begin\n'
' mem_%s[%s] <= %s;\n'
' end') % t, file=file)
print(' end', file=file)
reads = [net for net in _net_sorted(block.logic_subset('m'), varname)
if net.op_param[1] == m]
for net in reads:
dest = varname(net.dests[0])
m_id = net.op_param[0]
index = varname(net.args[0])
print(' assign {:s} = mem_{}[{:s}];'.format(dest, m_id, index), file=file)
print('', file=file)
def _to_verilog_footer(file):
print('endmodule\n', file=file)
[docs]
def output_verilog_testbench(dest_file, simulation_trace=None, toplevel_include=None,
vcd="waveform.vcd", cmd=None, add_reset=True, block=None):
"""Output a Verilog testbench for the block/inputs used in the simulation
trace.
:param dest_file: an open file to which the test bench will be printed.
:param SimulationTrace simulation_trace: a simulation trace from which the
inputs will be extracted for inclusion in the test bench. The test
bench generated will just replay the inputs played to the simulation
cycle by cycle. The default values for all registers and memories will
be based on the trace, otherwise they will be initialized to 0.
:param str toplevel_include: name of the file containing the toplevel
module this testbench is testing. If not None, an `include` directive
will be added to the top.
:param str vcd: By default the testbench generator will include a command
in the testbench to write the output of the testbench execution to a
.vcd file (via `$dumpfile`), and this parameter is the string of the
name of the file to use. If None is specified instead, then no
`dumpfile` will be used.
:param str cmd: The string passed as cmd will be copied verbatim into the
testbench just before the end of each cycle. This is useful for doing
things like printing specific values out during testbench evaluation
(e.g. ``cmd='$display("%d", out);'`` will instruct the testbench to
print the value of `out` every cycle which can then be compared easy
with a reference).
:param Union[bool, str] add_reset: If reset logic should be
added. Allowable options are: False (meaning no reset logic is added),
True (default, for adding synchronous reset logic), and `asynchronous`
(for adding asynchronous reset logic). The value passed in here should
match the argument passed to :func:`.output_to_verilog`.
:param Block block: Block containing design to test.
If `add_reset` is not False, a `rst` wire is added and will passed as an
input to the instantiated toplevel module. The `rst` wire will be held low
in the testbench, because initialization here occurs via the `initial`
block. It is provided for consistency with :func:`.output_to_verilog`.
The test bench does not return any values.
Example 1 (writing testbench to a string)::
with io.StringIO() as tbfile:
pyrtl.output_verilog_testbench(dest_file=tbfile, simulation_trace=sim_trace)
Example 2 (testbench in same file as verilog)::
with open('hardware.v', 'w') as fp:
output_to_verilog(fp)
output_verilog_testbench(fp, sim.tracer, vcd=None, cmd='$display("%d", out);')
"""
if not isinstance(add_reset, bool):
if add_reset != 'asynchronous':
raise PyrtlError("Invalid add_reset option %s. Acceptable options are "
"False, True, and 'asynchronous'")
block = working_block(block)
if add_reset: # True or 'asynchronous'
if block.get_wirevector_by_name('rst') is not None:
raise PyrtlError("Found a user-defined wire named 'rst'. Pass in "
"'add_reset=False' to use your existing reset logic.")
inputs, outputs, registers, wires, memories = _verilog_block_parts(block)
ver_name = _VerilogSanitizer('_ver_out_tmp_')
for wire in block.wirevector_set:
ver_name.make_valid_string(wire.name)
def name_sorted(wires):
return _name_sorted(wires, name_mapper=lambda w: ver_name[w.name])
def name_list(wires):
return [ver_name[w.name] for w in wires]
def init_regvalue(r):
if simulation_trace:
rval = simulation_trace.init_regvalue.get(r)
# Currently, the simulation stores the initial value for all registers
# in init_regvalue, so rval should not be None at this point.
# For the strange case where the trace was made by hand/other special use
# cases, check it against None anyway.
if rval is None:
rval = r.reset_value
if rval is None:
rval = simulation_trace.default_value
return rval
else:
return 0
def init_memvalue(m, ix):
# Return None if not present, or if already equal to default value, so we know not to
# emit any additional Verilog initing this mem address.
if simulation_trace:
if m not in simulation_trace.init_memvalue:
return None
v = simulation_trace.init_memvalue[m].get(ix, simulation_trace.default_value)
return None if v == simulation_trace.default_value else v
else:
return None
def default_value():
return simulation_trace.default_value if simulation_trace else 0
# Output an include, if given
if toplevel_include:
print('`include "{:s}"'.format(toplevel_include), file=dest_file)
print('', file=dest_file)
# Output header
print('module tb();', file=dest_file)
# Declare all block inputs as reg
print(' reg clk;', file=dest_file)
if add_reset:
print(' reg rst;', file=dest_file)
for w in name_sorted(inputs):
print(' reg{:s} {:s};'.format(_verilog_vector_decl(w), ver_name[w.name]),
file=dest_file)
# Declare all block outputs as wires
for w in name_sorted(outputs):
print(' wire{:s} {:s};'.format(_verilog_vector_decl(w), ver_name[w.name]),
file=dest_file)
print('', file=dest_file)
# Declare an integer used for init of memories
print(' integer tb_iter;', file=dest_file)
# Instantiate logic block
io_list = ['clk'] + name_list(name_sorted(inputs)) + name_list(name_sorted(outputs))
if add_reset:
io_list.insert(1, 'rst')
io_list_str = ['.{0:s}({0:s})'.format(w) for w in io_list]
print(' toplevel block({:s});\n'.format(', '.join(io_list_str)), file=dest_file)
# Generate clock signal
print(' always', file=dest_file)
print(' #5 clk = ~clk;\n', file=dest_file)
# Move through all steps of trace, writing out input assignments per cycle
print(' initial begin', file=dest_file)
# If a VCD output is requested, set that up
if vcd:
print(' $dumpfile ("%s");' % vcd, file=dest_file)
print(' $dumpvars;\n', file=dest_file)
# Initialize clk, and all the registers and memories
print(' clk = 0;', file=dest_file)
if add_reset:
print(' rst = 0;', file=dest_file)
for r in name_sorted(registers):
print(' block.%s = %d;' % (ver_name[r.name], init_regvalue(r)), file=dest_file)
for m in sorted(memories, key=lambda m: m.id):
max_iter = 1 << m.addrwidth
print(' for (tb_iter = 0; tb_iter < %d; tb_iter++) '
'begin block.mem_%s[tb_iter] = %d; end' % (max_iter, m.id, default_value()),
file=dest_file)
for ix in range(max_iter):
# Now just individually update the memory values that aren't the default
val = init_memvalue(m.id, ix)
if val is not None:
print(' block.mem_%s[%d] = %d;' % (m.id, ix, val), file=dest_file)
if simulation_trace:
tracelen = max(len(t) for t in simulation_trace.trace.values())
for i in range(tracelen):
for w in name_sorted(inputs):
print(' {:s} = {:s}{:d};'.format(
ver_name[w.name],
"{:d}'d".format(len(w)),
simulation_trace.trace[w.name][i]), file=dest_file)
if cmd:
print(' %s' % cmd, file=dest_file)
print('\n #10', file=dest_file)
# Footer
print(' $finish;', file=dest_file)
print(' end', file=dest_file)
print('endmodule', file=dest_file)
# ----------------------------------------------------------------
# ___ __ __ ___
# |___ | |__) |__) | |
# | | | \ | \ | |___
#
[docs]
def output_to_firrtl(open_file, rom_blocks=None, block=None):
"""Output the block as FIRRTL code to the output file.
:param open_file: File to write to
:param rom_blocks: List of ROM blocks to be initialized
:param block: Block to use (defaults to working block)
If ROM is initialized in PyRTL code, you can pass in the `rom_blocks` as a
list `[rom1, rom2, ...]`.
"""
block = working_block(block)
# FIRRTL only allows 'bits' operations to have two parameters: a high and low
# index representing the inclusive bounds of a contiguous range. PyRTL uses
# slice syntax, which aren't always contiguous, so we need to convert them.
one_bit_selects(block=block) # pylint: disable=no-value-for-parameter,unexpected-keyword-arg
# FIRRTL only allows 'concatenate' operations to have two arguments,
# but PyRTL's 'c' op allows an arbitrary number of wires. We need to convert
# these n-way concats to series of two-way concats accordingly.
two_way_concat(block=block) # pylint: disable=no-value-for-parameter,unexpected-keyword-arg
f = open_file
# write out all the implicit stuff
f.write("circuit Example :\n")
f.write(" module Example :\n")
f.write(" input clock : Clock\n input reset : UInt<1>\n")
# write out IO signals, wires and registers
for wire in _name_sorted(block.wirevector_subset(Input)):
f.write(" input %s : UInt<%d>\n" % (wire.name, wire.bitwidth))
for wire in _name_sorted(block.wirevector_subset(Output)):
f.write(" output %s : UInt<%d>\n" % (wire.name, wire.bitwidth))
for wire in _name_sorted(block.wirevector_subset(exclude=(Input, Output, Register, Const))):
f.write(" wire %s : UInt<%d>\n" % (wire.name, wire.bitwidth))
for wire in _name_sorted(block.wirevector_subset(Register)):
f.write(" reg %s : UInt<%d>, clock\n" % (wire.name, wire.bitwidth))
for wire in _name_sorted(block.wirevector_subset(Const)):
# some const is in the form like const_0_1'b1, is this legal operation?
wire.name = wire.name.split("'").pop(0)
f.write(" node %s = UInt<%d>(%d)\n" % (wire.name, wire.bitwidth, wire.val))
f.write("\n")
# write "Main"
node_cntr = 0
initializedMem = []
for log_net in _net_sorted(block.logic_subset()):
if log_net.op == '&':
f.write(" %s <= and(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == '|':
f.write(" %s <= or(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == '^':
f.write(" %s <= xor(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == 'n':
f.write(" node T_%d = and(%s, %s)\n" % (node_cntr, log_net.args[0].name,
log_net.args[1].name))
f.write(" %s <= not(T_%d)\n" % (log_net.dests[0].name, node_cntr))
node_cntr += 1
elif log_net.op == '~':
f.write(" %s <= not(%s)\n" % (log_net.dests[0].name, log_net.args[0].name))
elif log_net.op == '+':
f.write(" %s <= add(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == '-':
f.write(" %s <= sub(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == '*':
f.write(" %s <= mul(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == '=':
f.write(" %s <= eq(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == '<':
f.write(" %s <= lt(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == '>':
f.write(" %s <= gt(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == 'w':
f.write(" %s <= %s\n" % (log_net.dests[0].name, log_net.args[0].name))
elif log_net.op == 'x':
f.write(" %s <= mux(%s, %s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[2].name, log_net.args[1].name))
elif log_net.op == 'c':
if len(log_net.args) != 2:
raise PyrtlInternalError(
"Expected concat net to have only two "
"argument wires; has %d" % len(log_net.args)
)
f.write(" %s <= cat(%s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.args[1].name))
elif log_net.op == 's':
if len(log_net.op_param) != 1:
raise PyrtlInternalError(
"Expected select net to have single "
"select bit; has %d" % len(log_net.op_param)
)
f.write(" %s <= bits(%s, %s, %s)\n" % (log_net.dests[0].name, log_net.args[0].name,
log_net.op_param[0], log_net.op_param[0]))
elif log_net.op == 'r':
f.write(" %s <= mux(reset, UInt<%s>(0), %s)\n" %
(log_net.dests[0].name, log_net.dests[0].bitwidth, log_net.args[0].name))
elif log_net.op == 'm':
# if there are rom blocks, need to be initialized
if rom_blocks is not None:
if not log_net.op_param[0] in initializedMem:
initializedMem.append(log_net.op_param[0])
# find corresponding rom block according to memid
curr_rom = next((x for x in rom_blocks if x.id == log_net.op_param[0]), None)
f.write(" wire %s : UInt<%s>[%s]\n" %
(log_net.op_param[1].name, log_net.op_param[1].bitwidth,
2**log_net.op_param[1].addrwidth))
# if rom data is a function, calculate the data first
if callable(curr_rom.data):
romdata = [curr_rom.data(i) for i in range(2**curr_rom.addrwidth)]
curr_rom.data = romdata
# write rom block initialization data
for i in range(len(curr_rom.data)):
f.write(" %s[%s] <= UInt<%s>(%s)\n" %
(log_net.op_param[1].name, i, log_net.op_param[1].bitwidth,
curr_rom.data[i]))
# write the connection
f.write(" %s <= %s[%s]\n" % (log_net.dests[0].name, log_net.op_param[1].name,
log_net.args[0].name))
else:
if not log_net.op_param[0] in initializedMem:
initializedMem.append(log_net.op_param[0])
f.write(" cmem %s_%s : UInt<%s>[%s]\n" %
(log_net.op_param[1].name, log_net.op_param[0],
log_net.op_param[1].bitwidth, 2**log_net.op_param[1].addrwidth))
f.write(" infer mport T_%d = %s_%s[%s], clock\n" %
(node_cntr, log_net.op_param[1].name, log_net.op_param[0],
log_net.args[0].name))
f.write(" %s <= T_%d\n" % (log_net.dests[0].name, node_cntr))
node_cntr += 1
elif log_net.op == '@':
if not log_net.op_param[0] in initializedMem:
initializedMem.append(log_net.op_param[0])
f.write(" cmem %s_%s : UInt<%s>[%s]\n" %
(log_net.op_param[1].name, log_net.op_param[0],
log_net.op_param[1].bitwidth, 2**log_net.op_param[1].addrwidth))
f.write(" when %s :\n" % log_net.args[2].name)
f.write(" infer mport T_%d = %s_%s[%s], clock\n" %
(node_cntr, log_net.op_param[1].name, log_net.op_param[0],
log_net.args[0].name))
f.write(" T_%d <= %s\n" % (node_cntr, log_net.args[1].name))
f.write(" skip\n")
node_cntr += 1
else:
pass
return 0
# -----------------------------------------------------------------
# __ __ __ __ ___ __
# | /__` / /\ /__` |__) |__ |\ | / |__|
# | .__/ \__ /~~\ .__/ |__) |___ | \| \__ | |
def input_from_iscas_bench(bench, block=None):
''' Import an ISCAS .bench file
:param file bench: an open ISCAS .bench file to read
:param block: block to add the imported logic (defaults to current working block)
'''
import pyparsing
from pyparsing import (Word, Literal, OneOrMore, ZeroOrMore,
Suppress, Group, Keyword, one_of)
block = working_block(block)
try:
bench_string = bench.read()
except AttributeError:
if isinstance(bench, str):
bench_string = bench
else:
raise PyrtlError('input_from_bench expecting either open file or string')
def SKeyword(x):
return Suppress(Keyword(x))
def SLiteral(x):
return Suppress(Literal(x))
# NOTE: The acceptable signal characters are based on viewing the I/O names
# in the available ISCAS benchmark files, and may not be complete.
signal_start = pyparsing.alphas + pyparsing.nums + '$:[]_<>\\/?'
signal_middle = pyparsing.alphas + pyparsing.nums + '$:[]_<>\\/.?-'
signal_id = Word(signal_start, signal_middle)
gate_names = "AND OR NAND NOR XOR NOT BUFF DFF"
src_list = Group(signal_id + ZeroOrMore(SLiteral(",") + signal_id))("src_list")
net_def = Group(signal_id("dst") + SLiteral("=") + one_of(gate_names)("gate")
+ SLiteral("(") + src_list + SLiteral(")"))("net_def")
input_def = Group(SKeyword("INPUT") + SLiteral("(")
+ signal_id + SLiteral(")"))("input_def")
output_def = Group(SKeyword("OUTPUT") + SLiteral("(")
+ signal_id + SLiteral(")"))("output_def")
command_def = input_def | output_def | net_def
commands = OneOrMore(command_def)("command_list")
parser = commands.ignore(pyparsing.pythonStyleComment)
# Begin actually reading and parsing the BENCH file
result = parser.parseString(bench_string, parseAll=True)
output_to_internal = {} # dict: name -> wire
def twire(name):
""" Find or make wire named 'name' and return it. """
w = output_to_internal.get(name)
if w is None:
w = block.wirevector_by_name.get(name)
if w is None:
w = WireVector(bitwidth=1, name=name)
return w
for cmd in result["command_list"]:
if cmd.getName() == "input_def":
_wire_in = Input(bitwidth=1, name=str(cmd[0]), block=block)
elif cmd.getName() == "output_def":
# Create internal wire for indirection, since Outputs can't be inputs to nets in PyRTL
wire_internal = WireVector(bitwidth=1, block=block)
wire_out = Output(bitwidth=1, name=str(cmd[0]), block=block)
wire_out <<= wire_internal
output_to_internal[cmd[0]] = wire_internal
elif cmd.getName() == "net_def":
srcs = cmd["src_list"]
if cmd["gate"] == "AND":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]) & twire(srcs[1])
elif cmd["gate"] == "OR":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]) | twire(srcs[1])
elif cmd["gate"] == "NAND":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]).nand(twire(srcs[1]))
elif cmd["gate"] == "NOR":
dst_wire = twire(cmd["dst"])
dst_wire <<= ~(twire(srcs[0]) | twire(srcs[1]))
elif cmd["gate"] == "XOR":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]) ^ twire(srcs[1])
elif cmd["gate"] == "NOT":
dst_wire = twire(cmd["dst"])
dst_wire <<= ~twire(srcs[0])
elif cmd["gate"] == "BUFF":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0])
elif cmd["gate"] == "DFF":
dst_wire = twire(cmd["dst"])
reg = Register(bitwidth=1)
reg.next <<= twire(srcs[0])
dst_wire <<= reg
else:
raise PyrtlError("Unexpected gate {%s}" % cmd["gate"])
# Benchmarks like c1196, b18, etc. have inputs and outputs by the
# same name, that are therefore directly connected. This pass will
# rename the outputs so that this is still okay.
for o in block.wirevector_subset(Output):
inputs = [i for i in block.wirevector_subset(Input) if i.name == o.name]
if inputs:
if len(inputs) > 1:
raise PyrtlError("More than one input found with the name %s" % inputs[0].name)
i = inputs[0]
o_internal = twire(o.name)
o_internal <<= i
o.name = next_tempvar_name()
# Ensure the input is the one mapped by the original name
block.wirevector_by_name[i.name] = i
print("Found input and output wires with the same name. "
"Output '%s' has now been renamed to '%s'." % (i.name, o.name))