from __future__ import division
import numpy as np
import nengo.utils.numpy as npext
from nengo.exceptions import SignalError
from nengo.utils.compat import StringIO, is_integer
[docs]class Signal(object):
"""Represents data or views onto data within a Nengo simulation.
Signals are tightly coupled to NumPy arrays, which is how live data is
represented in a Nengo simulation. Signals provide a view onto the
important metadata of the live NumPy array, and maintain the original
value of the array in order to reset the simulation to the initial state.
Parameters
----------
initial_value : array_like
The initial value of the signal. Much of the metadata tracked by the
Signal is based on this array as well (e.g., dtype).
name : str, optional (Default: None)
Name of the signal. Primarily used for debugging.
If None, the memory location of the Signal will be used.
base : Signal, optional (Default: None)
The base signal, if this signal is a view on another signal.
Linking the two signals with the ``base`` argument is necessary
to ensure that their live data is also linked.
readonly : bool, optional (Default: False)
Whether this signal and its related live data should be marked as
readonly. Writing to these arrays will raise an exception.
"""
# Set assert_named_signals True to raise an Exception
# if model.signal is used to create a signal with no name.
# This can help to identify code that's creating un-named signals,
# if you are trying to track down mystery signals that are showing
# up in a model.
assert_named_signals = False
def __init__(self, initial_value, name=None, base=None, readonly=False):
self._initial_value = np.asarray(initial_value).view()
self._initial_value.setflags(write=False)
if base is not None:
assert isinstance(base, Signal) and not base.is_view
# make sure initial_value uses the same data as base.initial_value
assert (npext.array_base(initial_value) is
npext.array_base(base.initial_value))
self._base = base
if self.assert_named_signals:
assert name
self._name = name
self._readonly = bool(readonly)
def __getitem__(self, item):
"""Index or slice into array"""
if not isinstance(item, tuple):
item = (item,)
if not all(is_integer(i) or isinstance(i, slice) for i in item):
raise SignalError("Can only index or slice into signals")
if all(map(is_integer, item)):
# turn one index into slice to get a view from numpy
item = item[:-1] + (slice(item[-1], item[-1]+1),)
return Signal(self._initial_value[item],
name="%s[%s]" % (self.name, item),
base=self.base)
def __repr__(self):
return "Signal(%s, shape=%s)" % (self._name, self.shape)
@property
def base(self):
"""(Signal or None) The base signal, if this signal is a view.
Linking the two signals with the ``base`` argument is necessary
to ensure that their live data is also linked.
"""
return self if self._base is None else self._base
@property
def dtype(self):
"""(numpy.dtype) Data type of the signal (e.g., float64)."""
return self.initial_value.dtype
@property
def elemoffset(self):
"""(int) Offset of data from base in elements."""
return self.offset // self.itemsize
@property
def elemstrides(self):
"""(int) Strides of data in elements."""
return tuple(s // self.itemsize for s in self.strides)
@property
def initial_value(self):
"""(numpy.ndarray) Initial value of the signal.
Much of the metadata tracked by the Signal is based on this array
as well (e.g., dtype).
"""
return self._initial_value
@initial_value.setter
def initial_value(self, val):
raise SignalError("Cannot change initial value after initialization")
@property
def is_view(self):
"""(bool) True if this Signal is a view on another Signal."""
return self._base is not None
@property
def itemsize(self):
"""(int) Size of an array element in bytes."""
return self.initial_value.itemsize
@property
def name(self):
"""(str) Name of the signal. Primarily used for debugging."""
return self._name if self._name is not None else ("0x%x" % id(self))
@name.setter
def name(self, name):
self._name = name
@property
def ndim(self):
"""(int) Number of array dimensions."""
return self.initial_value.ndim
@property
def offset(self):
"""(int) Offset of data from base in bytes."""
return npext.array_offset(self.initial_value)
@property
def readonly(self):
"""(bool) Whether associated live data can be changed."""
return self._readonly
@readonly.setter
def readonly(self, readonly):
self._readonly = bool(readonly)
@property
def shape(self):
"""(tuple) Tuple of array dimensions."""
return self.initial_value.shape
@property
def size(self):
"""(int) Total number of elements."""
return self.initial_value.size
@property
def strides(self):
"""(tuple) Strides of data in bytes."""
return self.initial_value.strides
[docs] def column(self):
"""Return a view on this signal with column vector shape."""
return self.reshape((self.size, 1))
[docs] def may_share_memory(self, other):
"""Determine if two signals might overlap in memory.
This comparison is not exact and errs on the side of false positives.
See `numpy.may_share_memory` for more details.
Parameters
----------
other : Signal
The other signal we are investigating.
"""
return np.may_share_memory(self.initial_value, other.initial_value)
[docs] def reshape(self, *shape):
"""Return a view on this signal with a different shape.
Note that ``reshape`` cannot change the overall size of the signal.
See `numpy.reshape` for more details.
Any number of integers can be passed to this method,
describing the desired shape of the returned signal.
"""
return Signal(self._initial_value.reshape(*shape),
name="%s.reshape(%s)" % (self.name, shape),
base=self.base)
[docs] def row(self):
"""Return a view on this signal with row vector shape."""
return self.reshape((1, self.size))
class SignalDict(dict):
"""Map from Signal -> ndarray
This dict subclass ensures that the ndarray values aren't overwritten,
and instead data are written into them, which ensures that
these arrays never get copied, which wastes time and space.
Use ``init`` to set the ndarray initially.
"""
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
if isinstance(key, Signal) and key.base is not key:
# return a view on the base signal
base = dict.__getitem__(self, key.base)
return np.ndarray(
buffer=base, dtype=key.dtype, shape=key.shape,
offset=key.offset, strides=key.strides)
else:
raise
def __setitem__(self, key, val):
"""Ensures that ndarrays stay in the same place in memory.
Unlike normal dicts, this means that you cannot add a new key
to a SignalDict using __setitem__. This is by design, to avoid
silent typos when debugging Simulator. Every key must instead
be explicitly initialized with SignalDict.init.
"""
self[key][...] = val
def __str__(self):
"""Pretty-print the signals and current values."""
sio = StringIO()
for k in self:
sio.write("%s %s\n" % (repr(k), repr(self[k])))
return sio.getvalue()
def init(self, signal):
"""Set up a permanent mapping from signal -> ndarray."""
if signal in self:
raise SignalError("Cannot add signal twice")
x = signal.initial_value
if signal.is_view:
if signal.base not in self:
self.init(signal.base)
# get a view onto the base data
offset = npext.array_offset(x)
view = np.ndarray(shape=x.shape, strides=x.strides, offset=offset,
dtype=x.dtype, buffer=self[signal.base].data)
view.setflags(write=not signal.readonly)
dict.__setitem__(self, signal, view)
else:
x = x.view() if signal.readonly else x.copy()
dict.__setitem__(self, signal, x)
def reset(self, signal):
"""Reset ndarray to the base value of the signal that maps to it"""
if not signal.readonly:
self[signal] = signal.initial_value