Source code for nifpga.session

"""
Session, a convenient wrapper around the low-level _NiFpga class.

Copyright (c) 2017 National Instruments
"""

from .nifpga import (_SessionType, _IrqContextType, _NiFpga, DataType,
                     OPEN_ATTRIBUTE_NO_RUN, RUN_ATTRIBUTE_WAIT_UNTIL_DONE,
                     CLOSE_ATTRIBUTE_NO_RESET_IF_LAST_SESSION, FifoProperty,
                     _fifo_properties_to_types, FlowControl, DmaBufferType,
                     FpgaViState)
from .bitfile import Bitfile
from .status import InvalidSessionError
from collections import namedtuple
import ctypes
from builtins import bytes
from math import ceil
from future.utils import iteritems


[docs]class Session(object): """ Session, a convenient wrapper around the low-level _NiFpga class. The Session class uses regular python types, provides convenient default arguments to C API functions, and makes controls, indicators, and FIFOs available by name. If any NiFpga function return status is non-zero, the appropriate exception derived from either WarningStatus or ErrorStatus is raised. Example usage of FPGA configuration functions:: with Session(bitfile="myBitfilePath.lvbitx", resource="RIO0") as session: session.run() session.download() session.abort() session.reset() Note: It is always recommended that you use a Session with a context manager (with). Opening a Session without a context manager could cause you to leak the session if :meth:`Session.close` is not called. Controls and indicators are accessed directly via a _Register object obtained from the session:: my_control = session.registers["MyControl"] my_control.write(data=4) data = my_control.read() FIFOs are accessed directly via a _FIFO object obtained from the session:: myHostToFpgaFifo = session.fifos["MyHostToFpgaFifo"] myHostToFpgaFifo.stop() actual_depth = myHostToFpgaFifo.configure(requested_depth=4096) myHostToFpgaFifo.start() empty_elements_remaining = myHostToFpgaFifo.write(data=[1, 2, 3, 4], timeout_ms=2) myFpgaToHostFifo = session.fifos["MyHostToFpgaFifo"] read_values = myFpgaToHostFifo.read(number_of_elements=4, timeout_ms=0) print(read_values.data) """
[docs] def __init__(self, bitfile, resource, no_run=False, reset_if_last_session_on_exit=False, **kwargs): """Creates a session to the specified resource with the specified bitfile. Args: bitfile (str)(Bitfile): A bitfile.Bitfile() instance or a string filepath to a bitfile. resource (str): e.g. "RIO0", "PXI1Slot2", or "rio://hostname/RIO0" or an already open session no_run (bool): If true, don't run the bitfile, just open the session. reset_if_last_session_on_exit (bool): Passed into Close on exit. Unused if not using this session as a context guard. **kwargs: Additional arguments that edit the session. """ if not isinstance(bitfile, Bitfile): """ The bitfile we were passed is a path to an lvbitx.""" bitfile = Bitfile(bitfile) self._nifpga = _NiFpga() self._session = _SessionType() open_attribute = 0 for key, value in kwargs.items(): if key == '_open_attribute': open_attribute = value if no_run: open_attribute = open_attribute | OPEN_ATTRIBUTE_NO_RUN if isinstance(resource, _SessionType): self._session = resource else: bitfile_path = bytes(bitfile.filepath, 'ascii') bitfile_signature = bytes(bitfile.signature, 'ascii') resource = bytes(resource, 'ascii') self._nifpga.Open(bitfile_path, bitfile_signature, resource, open_attribute, self._session) self._reset_if_last_session_on_exit = reset_if_last_session_on_exit self._registers = {} self._internal_registers_dict = {} base_address_on_device = bitfile.base_address_on_device() for name, bitfile_register in iteritems(bitfile.registers): assert name not in self._registers, \ "One or more registers have the same name '%s', this is not supported" % name register = self._create_register(bitfile_register, base_address_on_device) if bitfile_register.is_internal(): self._internal_registers_dict[name] = register else: self._registers[name] = register self._fifos = {} for name, bitfile_fifo in iteritems(bitfile.fifos): assert name not in self._fifos, \ "One or more FIFOs have the same name '%s', this is not supported" % name self._fifos[name] = self._create_fifo(bitfile_fifo)
def __enter__(self): return self def __exit__(self, exception_type, exception_val, trace): try: self.close(reset_if_last_session=self._reset_if_last_session_on_exit) except InvalidSessionError: pass
[docs] def close(self, reset_if_last_session=False): """ Closes the FPGA Session. Args: reset_if_last_session (bool): If True, resets the FPGA on the last close. If true, does not reset the FPGA on the last session close. """ close_attr = CLOSE_ATTRIBUTE_NO_RESET_IF_LAST_SESSION if reset_if_last_session is False else 0 self._nifpga.Close(self._session, close_attr)
[docs] def run(self, wait_until_done=False): """ Runs the FPGA VI on the target. Args: wait_until_done (bool): If true, this functions blocks until the FPGA VI stops running """ run_attr = RUN_ATTRIBUTE_WAIT_UNTIL_DONE if wait_until_done else 0 self._nifpga.Run(self._session, run_attr)
[docs] def abort(self): """ Aborts the FPGA VI. """ self._nifpga.Abort(self._session)
[docs] def download(self): """ Re-downloads the FPGA bitstream to the target. """ self._nifpga.Download(self._session)
[docs] def reset(self): """ Resets the FPGA VI. """ self._nifpga.Reset(self._session)
@property def fpga_vi_state(self): """ Returns the current state of the FPGA VI. """ state = ctypes.c_uint32() self._nifpga.GetFpgaViState(self._session, state) return FpgaViState(state.value) def _irq_ordinals_to_bitmask(self, ordinals): bitmask = 0 for ordinal in ordinals: assert 0 <= ordinal and ordinal <= 31, "Valid IRQs are 0-31: %d is invalid" % ordinal bitmask |= (1 << ordinal) return bitmask WaitOnIrqsReturnValues = namedtuple('WaitOnIrqsReturnValues', ["irqs_asserted", "timed_out"])
[docs] def wait_on_irqs(self, irqs, timeout_ms): """ Stops the calling thread until the FPGA asserts any IRQ in the irqs parameter or until the function call times out. Args: irqs: A list of irq ordinals 0-31, e.g. [0, 6, 31]. timeout_ms: The timeout to wait in milliseconds. Returns: session_wait_on_irqs (namedtuple):: session_wait_on_irqs.irqs_asserted (list): is a list of the asserted IRQs. session_wait_on_irqs.timed_out (bool): Outputs whether or not the time out expired before all irqs were asserted. """ if type(irqs) != list: irqs = [irqs] irqs_bitmask = self._irq_ordinals_to_bitmask(irqs) context = _IrqContextType() self._nifpga.ReserveIrqContext(self._session, context) irqs_asserted_bitmask = ctypes.c_uint32(0) timed_out = DataType.Bool._return_ctype()() try: self._nifpga.WaitOnIrqs(self._session, context, irqs_bitmask, timeout_ms, irqs_asserted_bitmask, timed_out) finally: self._nifpga.UnreserveIrqContext(self._session, context) irqs_asserted = [i for i in range(32) if irqs_asserted_bitmask.value & (1 << i)] return self.WaitOnIrqsReturnValues(irqs_asserted=irqs_asserted, timed_out=bool(timed_out.value))
[docs] def acknowledge_irqs(self, irqs): """ Acknowledges an IRQ or set of IRQs. Args: irqs (list): A list of irq ordinals 0-31, e.g. [0, 6, 31]. """ self._nifpga.AcknowledgeIrqs(self._session, self._irq_ordinals_to_bitmask(irqs))
def _get_unique_register_or_fifo(self, name): assert not (name in self._registers and name in self._fifos), \ "Ambiguous: '%s' is both a register and a FIFO" % name assert name in self._registers or name in self._fifos, \ "Unknown register or FIFO '%s'" % name try: return self._registers[name] except KeyError: return self._fifos[name] @property def registers(self): """ This property returns a dictionary containing all registers that are associated with the bitfile opened with the session. A register can be accessed by its unique name. """ return self._registers @property def _internal_registers(self): """ This property contains internal registers""" return self._internal_registers_dict @property def fifos(self): """ This property returns a dictionary containing all FIFOs that are associated with the bitfile opened with the session. A FIFO can be accessed by its unique name. """ return self._fifos def _create_register(self, bitfile_register, base_address_on_device): # simple C type registers use the same entrypoint as the C API if bitfile_register.type.is_c_api_type: if bitfile_register.is_array(): return _ArrayRegister(self._session, self._nifpga, bitfile_register, base_address_on_device) else: return _Register(self._session, self._nifpga, bitfile_register, base_address_on_device) else: # register that handles conversion for more complex types return _DataConvertingRegister(self._session, self._nifpga, bitfile_register, base_address_on_device) def _create_fifo(self, bitfile_fifo): if bitfile_fifo.is_fxp(): return _FxpFIFO(self._session, self._nifpga, bitfile_fifo) elif bitfile_fifo.is_composite(): return _DataConvertingFifo(self._session, self._nifpga, bitfile_fifo) else: return _FIFO(self._session, self._nifpga, bitfile_fifo)
[docs]class _Register(object): """ _Register is a private class that is a wrapper of logic that is associated with controls and indicators. All Registers will exists in a sessions session.registers property. This means that all possible registers for a given session are created during session initialization; a user should never need to create a new instance of this class. """ def __init__(self, session, nifpga, bitfile_register, base_address_on_device, read_func=None, write_func=None): self._datatype = bitfile_register.datatype self._name = bitfile_register.name self._session = session if read_func is None: self._read_func = nifpga["Read%s" % self.datatype] else: self._read_func = read_func if write_func is None: self._write_func = nifpga["Write%s" % self.datatype] else: self._write_func = write_func self._ctype_type = self._datatype._return_ctype() self._type = bitfile_register.type self._resource = bitfile_register.offset + base_address_on_device if bitfile_register.access_may_timeout(): self._resource = self._resource | 0x80000000
[docs] def __len__(self): """ A single register will always have one and only one element. Returns: (int): Always a constant 1. """ return 1
[docs] def write(self, data): """ Writes the specified data to the control or indicator Args: data (DataType.value): The data to be written into the register """ self._write_func(self._session, self._resource, data)
[docs] def read(self): """ Reads a single element from the control or indicator Returns: data (DataType.value): The data inside the register. """ data = self._ctype_type() self._read_func(self._session, self._resource, data) if self._datatype is DataType.Bool: return bool(data.value) return data.value
@property def name(self): """ Property of a register that returns the name of the control or indicator. """ return self._name @property def datatype(self): """ Property of a register that returns the datatype of the control or indicator. """ return self._datatype
[docs]class _ArrayRegister(_Register): """ _ArryRegister is a private class that inherits from _Register with additional interfaces unique to the logic of array controls and indicators. """ def __init__(self, session, nifpga, bitfile_register, base_address_on_device): super(_ArrayRegister, self).__init__(session, nifpga, bitfile_register, base_address_on_device, read_func=nifpga["ReadArray%s" % bitfile_register.datatype], write_func=nifpga["WriteArray%s" % bitfile_register.datatype]) self._num_elements = len(bitfile_register) self._ctype_type = self._ctype_type * self._num_elements
[docs] def __len__(self): """ Returns the length of the array. Returns: (int): The number of elements in the array. """ return self._num_elements
[docs] def write(self, data): """ Writes the specified array of data to the control or indicator Args: data (list): The data "array" to be written into the registers wrapped into a python list. """ # if data is not iterable make it iterable try: iter(data) except TypeError: data = [data] assert len(data) == len(self), \ "Bad data length %d for register '%s', expected %s" \ % (len(data), self._name, len(self)) buf = self._ctype_type(*data) self._write_func(self._session, self._resource, buf, len(self))
[docs] def read(self): """ Reads the entire array from the control or indicator. Returns: (list): The data in the register in a python list. """ buf = self._ctype_type() self._read_func(self._session, self._resource, buf, len(self)) val = [bool(elem) if self._datatype is DataType.Bool else elem for elem in buf] return val
class _DataConvertingRegister(_Register): """ _DataConvertingRegister does all the work of converting the LabVIEW Cluster and fixed point types into something more native to python. As a user you will read and write to this register just as you would any other register. Given the nature of fixed point there are a few caveats. Just like the other registers, we do not support users creating their instances of _FxpRegister, but after opening a session to a valid bitfile the session.registers property will contain all registers fixed point included. Fixed point registers should be easily used from python with a few caveats: 1. Trying to write a value that does not conform to boundaries of the defined register, will be coerced just as it would in labVIEW. Input a value that needs to be coerced will result in a warning to the user. A value is to be coerced if it is not a multiple of the delta value, or if it exceeds the minimum or maximum values. """ def __init__(self, session, nifpga, bitfile_register, base_address_on_device): super(_DataConvertingRegister, self).__init__( session, nifpga, bitfile_register, base_address_on_device, read_func=nifpga["ReadArray%s" % DataType.U32], write_func=nifpga["WriteArray%s" % DataType.U32]) self._transfer_len = int(ceil(self._type.size_in_bits / 32.0)) self._ctype_type = self._ctype_type * self._transfer_len def read(self): """ Reads the value from the control or indicator Returns: data (value_type): The data inside the register. """ buf = self._ctype_type() self._read_func(self._session, self._resource, buf, self._transfer_len) read_array = [elem for elem in buf] fpga_representation = self._combine_array_of_u32_into_one_value(read_array) return self._type.unpack_data(fpga_representation) def _combine_array_of_u32_into_one_value(self, data): """ This method is a helper to convert the array read from hardware and return a single value removing any excessive bits. Whenever the array is longer than 1 element, the data is left justified, we need to shift the combined data to the right before doing the conversion. For example, if the register had a word length of 54, the 54 MSB would be the fixed point bits. The 10 LSB of the combinedData must be shifted off in order to not mess up further calculations. """ combinedData = 0 for index in range(0, self._transfer_len): combinedData = (combinedData << 32) + data[index] if self._transfer_len > 1: combinedData = combinedData >> (32 * self._transfer_len - self._type.size_in_bits) return combinedData def write(self, user_input): """ Writes the user's the users input into the register as a fixed point number. Any inputs outside the bounds of this fixed point register will be coerced and the user will be warned. The user input's supported include any python Number. Args: Number: user numerical input to be converted to fixed point. (bool, Number): Tuple with the members : Boolean for overflow, user numerical input to be converted to fixed point. """ fpga_representation = self._type.pack_data(user_input, 0) arrayData = self._convert_to_u32_array(fpga_representation) buf = self._ctype_type(*arrayData) self._write_func(self._session, self._resource, buf, self._transfer_len) def _convert_to_u32_array(self, data): if self._transfer_len > 1: data = data << (32 * self._transfer_len - self._type.size_in_bits) mask_32bit = (2**32) - 1 extracted_array = [] for index in range(0, self._transfer_len): extracted_array.append(data & mask_32bit) data = data >> 32 extracted_array.reverse() return extracted_array
[docs]class _FIFO(object): """ _FIFO is a private class that is a wrapper for the logic that associated with a FIFO. All FIFOs will exists in a sessions session.fifos property. This means that all possible FIFOs for a given session are created during session initialization; a user should never need to create a new instance of this class. """ def __init__(self, session, nifpga, bitfile_fifo, datatype=None): self._datatype = datatype if self._datatype is None: self._datatype = bitfile_fifo.datatype self._number = bitfile_fifo.number self._session = session self._write_func = nifpga["WriteFifo%s" % self._datatype] self._read_func = nifpga["ReadFifo%s" % self._datatype] self._acquire_read_func = nifpga["AcquireFifoReadElements%s" % self._datatype] self._acquire_write_func = nifpga["AcquireFifoWriteElements%s" % self._datatype] self._release_elements_func = nifpga["ReleaseFifoElements"] self._nifpga = nifpga self._ctype_type = self._datatype._return_ctype() self._name = bitfile_fifo.name
[docs] def configure(self, requested_depth): """ Specifies the depth of the host memory part of the DMA FIFO. Args: requested_depth (int): The depth of the host memory part of the DMA FIFO in number of elements. Returns: actual_depth (int): The actual number of elements in the host memory part of the DMA FIFO, which may be more than the requested number. """ actual_depth = ctypes.c_size_t() self._nifpga.ConfigureFifo2(self._session, self._number, requested_depth, actual_depth) return actual_depth.value
[docs] def start(self): """ Starts the FIFO. """ self._nifpga.StartFifo(self._session, self._number)
[docs] def stop(self): """ Stops the FIFO. """ self._nifpga.StopFifo(self._session, self._number)
[docs] def write(self, data, timeout_ms=0): """ Writes the specified data to the FIFO. NOTE: If the FIFO has not been started before calling :meth:`_FIFO.write()`, then it will automatically start and continue to work as expected. Args: data (list): Data to be written to the FIFO. timeout_ms (int): The timeout to wait in milliseconds. Returns: elements_remaining (int): The number of elements remaining in the host memory part of the DMA FIFO. """ # if data is not iterable make it iterable try: iter(data) except TypeError: data = [data] buf_type = self._ctype_type * len(data) buf = buf_type(*data) empty_elements_remaining = ctypes.c_size_t() self._write_func(self._session, self._number, buf, len(data), timeout_ms, empty_elements_remaining) return empty_elements_remaining.value
ReadValues = namedtuple("ReadValues", ["data", "elements_remaining"])
[docs] def read(self, number_of_elements, timeout_ms=0): """ Read the specified number of elements from the FIFO. NOTE: If the FIFO has not been started before calling :meth:`_FIFO.read()`, then it will automatically start and continue to work as expected. Args: number_of_elements (int): The number of elements to read from the FIFO. timeout_ms (int): The timeout to wait in milliseconds. Returns: ReadValues (namedtuple):: ReadValues.data (list): containing the data from the FIFO. ReadValues.elements_remaining (int): The amount of elements remaining in the FIFO. """ buf_type = self._ctype_type * number_of_elements buf = buf_type() elements_remaining = ctypes.c_size_t() self._read_func(self._session, self._number, buf, number_of_elements, timeout_ms, elements_remaining) if self._datatype is DataType.Bool: data = [bool(elem) for elem in buf] else: data = [elem for elem in buf] return self.ReadValues(data=data, elements_remaining=elements_remaining.value)
AcquireWriteValues = namedtuple("AcquireWriteValues", ["data", "elements_acquired", "elements_remaining"]) def _acquire_write(self, number_of_elements, timeout_ms=0): """ Write the specified number of elements from the FIFO. Args: number_of_elements (int): The number of elements to read from the FIFO. timeout_ms (int): The timeout to wait in milliseconds. Returns: AcquireWriteValues(namedtuple):: AcquireWriteValues.data (ctypes.pointer): Contains the data from the FIFO. AcquireWriteValues.elements_acquired (int): The number of elements that were actually acquired. AcquireWriteValues.elements_remaining (int): The amount of elements remaining in the FIFO. """ block_out = ctypes.POINTER(self._ctype_type)() elements_acquired = ctypes.c_size_t() elements_remaining = ctypes.c_size_t() self._acquire_write_func(self._session, self._number, block_out, number_of_elements, timeout_ms, elements_acquired, elements_remaining) return self.AcquireWriteValues(data=block_out, elements_acquired=elements_acquired.value, elements_remaining=elements_remaining.value) AcquireReadValues = namedtuple("AcquireReadValues", ["data", "elements_acquired", "elements_remaining"]) def _acquire_read(self, number_of_elements, timeout_ms=0): """ Read the specified number of elements from the FIFO. Args: number_of_elements (int): The number of elements to read from the FIFO. timeout_ms (int): The timeout to wait in milliseconds. Returns: AcquireWriteValues(namedtuple): has the following members:: AcquireWriteValues.data (ctypes.pointer): Contains the data from the FIFO. AcquireWriteValues.elements_acquired (int): The number of elements that were actually acquired. AcquireWriteValues.elements_remaining (int): The amount of elements remaining in the FIFO. """ buf = self._ctype_type() buf_ptr = ctypes.pointer(buf) elements_acquired = ctypes.c_size_t() elements_remaining = ctypes.c_size_t() self._acquire_read_func(self._session, self._number, buf_ptr, number_of_elements, timeout_ms, elements_acquired, elements_remaining) return self.AcquireReadValues(data=buf_ptr, elements_acquired=elements_acquired.value, elements_remaining=elements_remaining.value) def _release_elements(self, number_of_elements): """ Releases the FIFOs elements. """ self._release_elements_func(self._session, self._number, number_of_elements)
[docs] def get_peer_to_peer_endpoint(self): """ Gets an endpoint reference to a peer-to-peer FIFO. """ endpoint = ctypes.c_uint32(0) self._nifpga.GetPeerToPeerFifoEndpoint(self._session, self._number, endpoint) return endpoint.value
[docs] def commit_configuration(self): """ Resolves and Commits property changes made to the FIFO. """ self._nifpga.CommitFifoConfiguration(self._session, self._number)
@property def name(self): """ Property of a Fifo that contains its name. """ return self._name @property def datatype(self): """ Property of a Fifo that contains its datatype. """ return self._datatype def _get_fifo_property(self, prop): prop_type = _fifo_properties_to_types[prop] value = (prop_type._return_ctype())(0) value_pointer = ctypes.pointer(value) self._nifpga['GetFifoProperty%s' % prop_type](self._session, self._number, prop.value, value_pointer) return value.value def _set_fifo_property(self, prop, value): prop_type = _fifo_properties_to_types[prop] self._nifpga['SetFifoProperty%s' % prop_type](self._session, self._number, prop.value, value) @property def buffer_allocation_granularity(self): """ The allocation granularity of the host memory part of a DMA FIFO. By default this will usually be a page size, which is optimal for most devices. This property can be used to customize it. """ return self._get_fifo_property(FifoProperty.BufferAllocationGranularityElements) @buffer_allocation_granularity.setter def buffer_allocation_granularity(self, value): self._set_fifo_property(FifoProperty.BufferAllocationGranularityElements, value) @property def buffer_size(self): """ The size in elements of the Host Memory part of a DMA FIFO. """ return self._get_fifo_property(FifoProperty.BufferSizeElements) @buffer_size.setter def buffer_size(self, value): self._set_fifo_property(FifoProperty.BufferSizeElements, value) @property def _mirror_size(self): """ The amount of elements in the Host Memory part of the DMA FIFO that mirror elements at the beginning. The Host Memory part of a DMA FIFO is a circular buffer. This means that when we hit the end of the buffer we have to deal with the logic of wrapping around the buffer. Mirrored elements are elements at the beginning of the buffer that are mapped twice in memory to the end of the buffer. Settings this value can allow us to avoid wrap arounds. This is mostly useful when using our Zero Copy API. Its not yet supported in Python though, so this property is private. """ return self._get_fifo_property(FifoProperty.MirroredElements) @_mirror_size.setter def _mirror_size(self, value): self._set_fifo_property(FifoProperty.MirroredElements, value) @property def _dma_buffer_type(self): return self._get_fifo_property(FifoProperty.DmaBufferType) @_dma_buffer_type.setter def _dma_buffer_type(self, value): if not isinstance(value, DmaBufferType): raise TypeError("_dma_buffer_type must be set to a nifpga.DmaBufferType") self._set_fifo_property(FifoProperty.DmaBufferType, value.value) @property def _dma_buffer(self): return self._get_fifo_property(FifoProperty.DmaBuffer) @_dma_buffer.setter def _dma_buffer(self, value): self._set_fifo_property(FifoProperty.DmaBuffer, value) @property def flow_control(self): """ Controls whether the FPGA will wait for the host when using FIFOs. If flow control is disabled, the FPGA will have free reign to read or write elements before the host is ready. This means the FIFO no longer acts in a First In First Out manner. For Host To Target FIFOs, this feature is useful when you want to put something like a waveform in a FIFO and let the FPGA continue reading that waveform over and over without any involvement from the host. For Target To Host FIFOs, this feature is useful when you only care about the latest data and don't care about old data. """ return self._get_fifo_property(FifoProperty.FlowControl) @flow_control.setter def flow_control(self, value): if not isinstance(value, FlowControl): raise TypeError("flow_control must be set to an nifpga.FlowControl") self._set_fifo_property(FifoProperty.FlowControl, value.value)
class _FxpFIFO(_FIFO): """ FXP FIFOs are packed up to 64bits """ def __init__(self, session, nifpga, bitfile_fifo): super(_FxpFIFO, self).__init__(session, nifpga, bitfile_fifo, datatype=DataType.U64) self._fxp = bitfile_fifo.type @property def datatype(self): return DataType.Fxp def write(self, data, timeout_ms=0): """ Writes the specified data to the FIFO. NOTE: If the FIFO has not been started before calling :meth:`_FIFO.write()`, then it will automatically start and continue to work as expected. Args: data (list): Data to be written to the FIFO. timeout_ms (int): The timeout to wait in milliseconds. Returns: elements_remaining (int): The number of elements remaining in the host memory part of the DMA FIFO. """ # if data is not iterable make it iterable try: iter(data) except TypeError: data = [data] buf_type = self._ctype_type * len(data) buf = buf_type() for i, item in enumerate(data): buf[i] = self._fxp.pack_data(item, 0) empty_elements_remaining = ctypes.c_size_t() self._write_func(self._session, self._number, buf, len(data), timeout_ms, empty_elements_remaining) return empty_elements_remaining.value def read(self, number_of_elements, timeout_ms=0): """ Read the specified number of elements from the FIFO. NOTE: If the FIFO has not been started before calling :meth:`_FIFO.read()`, then it will automatically start and continue to work as expected. Args: number_of_elements (int): The number of elements to read from the FIFO. timeout_ms (int): The timeout to wait in milliseconds. Returns: ReadValues (namedtuple):: ReadValues.data (list): containing the data from the FIFO. ReadValues.elements_remaining (int): The amount of elements remaining in the FIFO. """ buf_type = self._ctype_type * number_of_elements buf = buf_type() elements_remaining = ctypes.c_size_t() self._read_func(self._session, self._number, buf, number_of_elements, timeout_ms, elements_remaining) data = [self._fxp.unpack_data(elem) for elem in buf] return self.ReadValues(data=data, elements_remaining=elements_remaining.value) class _DataConvertingFifo(_FIFO): """ Converts FIFOs that transfer data that is packed in the FPGA representation into Python types. In this case, it converts a FIFO of clusters into lists of OrderedDicts and back. """ def __init__(self, session, nifpga, bitfile_fifo): super(_DataConvertingFifo, self).__init__(session, nifpga, bitfile_fifo, datatype=DataType.U8) self._type = bitfile_fifo.type self._transfer_size_bytes = bitfile_fifo.transfer_size_bytes self._write_func = nifpga["WriteFifoComposite"] self._read_func = nifpga["ReadFifoComposite"] self._acquire_read_func = nifpga["AcquireFifoReadElementsComposite"] self._acquire_write_func = nifpga["AcquireFifoWriteElementsComposite"] @property def datatype(self): return DataType.Cluster def write(self, data, timeout_ms=0): # If data is a dict, then the customer passed us a single cluster # put it into a list before using it. if isinstance(data, dict): data = [data] buf_type = self._ctype_type * (self._transfer_size_bytes * len(data)) buf = buf_type() # for each element, pack the data, reverse the bytes, and swap the # endianness for index, item in enumerate(data): packed_element = self._type.pack_data(item, 0) element_index = index * self._transfer_size_bytes self._convert_to_u8_array(buf, element_index, packed_element) empty_elements_remaining = ctypes.c_size_t() self._write_func(self._session, self._number, buf, self._transfer_size_bytes, len(data), timeout_ms, empty_elements_remaining) return empty_elements_remaining.value def _convert_to_u8_array(self, u8_array, element_index, data): """ Converts the data into the format expected by the FPGA and inserts it into the given u8_array at the provided element_index. """ # if the element is > 4 bytes, shift the data over if self._transfer_size_bytes > 4: data = data << (8 * self._transfer_size_bytes - self._type.size_in_bits) if self._transfer_size_bytes == 1: u8_array[element_index] = data & 0xFF elif self._transfer_size_bytes == 2: u8_array[element_index] = data & 0xFF data = data >> 8 u8_array[element_index + 1] = data & 0xFF else: # >= 4 # Insert the data such that the Most Significant Words are at the lower # indexes while each word's endianness is swapped for index in reversed(range(0, self._transfer_size_bytes, 4)): local_index = element_index + index u8_array[local_index] = data & 0xFF data = data >> 8 u8_array[local_index + 1] = data & 0xFF data = data >> 8 u8_array[local_index + 2] = data & 0xFF data = data >> 8 u8_array[local_index + 3] = data & 0xFF data = data >> 8 def read(self, number_of_elements, timeout_ms=0): buf_type = self._ctype_type * (self._transfer_size_bytes * number_of_elements) buf = buf_type() elements_remaining = ctypes.c_size_t() self._read_func(self._session, self._number, buf, self._transfer_size_bytes, number_of_elements, timeout_ms, elements_remaining) data = [] for index in range(number_of_elements): element_index = index * self._transfer_size_bytes packed_data = self._combine_array_of_u8_into_one_value(buf, element_index) unpacked_data = self._type.unpack_data(packed_data) data.append(unpacked_data) return self.ReadValues(data=data, elements_remaining=elements_remaining.value) def _combine_array_of_u8_into_one_value(self, data, element_index): """ This method is a helper to convert the array read from hardware and return a single element removing any excessive bits. First we combine the data into a single number while swapping the endianness. Whenever the array is longer than 1 word, the data is left justified, we need to shift the combined data to the right before doing the conversion. For example, if the element had a size in bits of 54, the 54 MSB would be the data bits. The 10 LSB of the combinedData must be shifted off in order to not mess up further calculations. """ combinedData = 0 if self._transfer_size_bytes >= 4: index = element_index element_end = element_index + self._transfer_size_bytes while index < element_end: combinedData = (combinedData << 8) + data[index + 3] combinedData = (combinedData << 8) + data[index + 2] combinedData = (combinedData << 8) + data[index + 1] combinedData = (combinedData << 8) + data[index] index += 4 elif self._transfer_size_bytes == 2: combinedData = data[element_index + 1] combinedData = (combinedData << 8) + data[element_index] else: combinedData = data[element_index] if self._transfer_size_bytes > 4: combinedData = combinedData >> (8 * self._transfer_size_bytes - self._type.size_in_bits) return combinedData