Source code for pyripherals.peripherals.DDR3

from ..core import Endpoint
from ..utils import test_bit, gen_mask, custom_signed_to_int
import numpy as np
import time
import os
import h5py


[docs]class DDR3(): """ The DDR is divided into 2 buffers. Each buffer has an incoming and outgoing FIFO. **1st buffer:** * Function generator like data that provides a data-stream to the DACs * Write from the host when DAC_WRITE_ENABLE is set (clear dac read and clear adc read and write) * Organized into groups of 16 bits to 8 channels * Read to the DACs with set_dac_read * 6 AD5453 (14 bits each) * 2 DAC80508 (16 bits each) * DAC80508 channel data in the MSBs of the AD5453 stream * FIFO: OpalKelly in -> out to DDR [32bits x 1024words = 4096 bytes] * FIFO: DDR in -> DAC data out [out: 256bits x 256 = 8192 bytes] **2nd buffer:** * Buffering for ADC data (AD7961; ADS8686), DAC outgoing data, timestamps. * ADC data writes to the DDR set_adc_write() * ADC data in DDR can be read to the host with set_adc_read() * The DDR througput is sufficient for 1st buffer reading while reading and writing to the 2nd buffer * FIFO: OpalKelly in -> out to DDR [in: 32bits x 1024words = 4096 bytes] * FIFO: DDR in -> OpalKelly out [256bits x 128 = 4096 bytes] OpalKelly block size is half this. **Expected sequence of operations:** 1) At startup write pattern for DACs using write_channels() [set_dac_write(), clear_dac_write() are called within write_channels()] 2) set_dac_read() # starts DAC data output to DACs via SPI and then set_adc_write() ADC data captured into DDR 3) set_adc_read() # allows host to read PipeOut as PipeOut is continuously filled if emptied **ADC data ordering and saving:** * deswizzle() * save_data() **DDR configuration bits:** * DAC_WRITE_ENABLE * DAC_READ_ENABLE * ADC_WRITE_ENABLE * ADC_TRANSFER_ENABLE """ BLOCK_SIZE = 2048 # 1/2 the incoming FIFO depth in bytes (size of the BlockPipeIn) # number of channels that the DDR is striped between (for DACs) NUM_CHANNELS = 8 UPDATE_PERIOD = 400e-9 # 2.5 MHz -- requires SCLK ~ 50 MHZ PORT1_INDEX = 0x3_7f_ff_f8 NUM_ADC_CHANNELS = 8 # number of 2 byte chunks in DDR ADC_PERIOD = 200e-9 # the index is the DDR address that the circular buffer stops at. # need to write all the way up to this stoping point otherwise the SPI output will glitch SAMPLE_SIZE = int((PORT1_INDEX + 8)/4) def __init__(self, fpga, endpoints=None, data_version='TIMESTAMPS'): if endpoints is None: endpoints = Endpoint.get_chip_endpoints('DDR3') self.fpga = fpga self.endpoints = endpoints self.data_version = data_version # sets deswizzling mode self.data_arrays = [] for i in range(DDR3.NUM_CHANNELS): self.data_arrays.append(np.zeros( DDR3.SAMPLE_SIZE).astype(np.uint16)) self.clear_adc_debug()
[docs] def set_adc_debug(self): """Set the ADC debug bit. That bit multiplexes a counter to ADC channel 0 and bits 47:0 of the DAC data to ADC channels 1,2,3. Not supported in all versions of the FPGA design. """ self.fpga.set_wire_bit(self.endpoints['ADC_DEBUG'].address, self.endpoints['ADC_DEBUG'].bit_index_low)
[docs] def clear_adc_debug(self): """Clear the ADC debug bit. DDR ADC data will be from the ADC. """ self.fpga.clear_wire_bit(self.endpoints['ADC_DEBUG'].address, self.endpoints['ADC_DEBUG'].bit_index_low)
[docs] @staticmethod def make_flat_voltage(amplitude): """Return a constant unit16 array of value amplitude. Array length based on the sample_size parameter. The conversion from float or int voltage to int digital (binary) code should take place BEFORE this function. Parameters ---------- amplitude: int Digital (binary) value of the flat voltage Returns ------- amplitude : numpy.ndarray to be assigned to DDR data array """ amplitude = np.ones(DDR3.SAMPLE_SIZE)*amplitude amplitude = amplitude.astype(np.uint16) return amplitude
[docs] @staticmethod def closest_frequency(freq): """Determine closest frequency so the waveform evenly divides into the length of the DDR3 Parameters ---------- freq : float Desired frequency Returns ------- new_frequency : float The closest possible frequency """ samples_per_period = (1/freq) / DDR3.UPDATE_PERIOD if samples_per_period <= 2: print('Frequency is too high for the DDR update rate') return None total_periods = DDR3.SAMPLE_SIZE/samples_per_period # round and recalculate frequency round_total_periods = np.round(total_periods) round_samples_per_period = DDR3.SAMPLE_SIZE / \ round_total_periods new_frequency = 1 / \ (DDR3.UPDATE_PERIOD * round_samples_per_period) return new_frequency
[docs] @staticmethod def make_sine_wave(amplitude, frequency, offset=0x2000, actual_frequency=True): """Return a sine-wave array for writing to DDR. The conversion from float or int voltage to int digital (binary) code should take place BEFORE this function. Parameters ---------- amplitude : int Digital (binary) value of the sine wave. frequency : float Desired frequency in Hz. offset : int Digital (binary) value offset. actual_frequency : bool Decide whether closest frequency that fits an integer number of periods is used. Returns ------- ddr_seq : numpy.ndarray to be assigned to DDR data array frequency : float actual frequency after closest_frequency """ if (amplitude) > offset: print('Error: amplitude in sine-wave is too large') return -1 if actual_frequency: frequency = DDR3.closest_frequency(frequency) t = np.arange(0, DDR3.UPDATE_PERIOD*DDR3.SAMPLE_SIZE, DDR3.UPDATE_PERIOD) # print('length of time axis after creation ', len(t)) ddr_seq = (amplitude)*np.sin(t*frequency*2*np.pi) + offset if any(ddr_seq < 0) or any(ddr_seq > (2**16-1)): print('Error: Uint16 overflow in make sine wave') return -1 ddr_seq = ddr_seq.astype(np.uint16) return ddr_seq, frequency
[docs] @staticmethod def make_ramp(start, stop, step, actual_length=True): """Create a ramp signal to write to the DDR. The conversion from float or int voltage to int digital (binary) code should take place BEFORE this function. Parameters ---------- start : int Digital (binary) value to start the ramp at stop : int Digital (binary) value to stop the ramp at step : int Digital (binary) code to step by Returns ------- ddr_seq : numpy.ndarray to be assigned to DDR data array """ # change the stop value for integer number of cycles ramp_seq = np.arange(start, stop, step) len_ramp_seq = len(ramp_seq) if actual_length: length = int( DDR3.SAMPLE_SIZE/np.round(DDR3.SAMPLE_SIZE/len_ramp_seq)) stop = start + length*step ramp_seq = np.arange(start, stop, step) num_tiles = DDR3.SAMPLE_SIZE//len(ramp_seq) extras = DDR3.SAMPLE_SIZE % len(ramp_seq) ddr_seq = np.tile(ramp_seq, num_tiles) ddr_seq = np.hstack((ddr_seq, ramp_seq[0:extras])) ddr_seq = ddr_seq.astype(np.uint16) return ddr_seq
[docs] @staticmethod def make_step(low, high, length, actual_length=True, duty=50): """Return a step signal (square wave) to write to the DDR. The conversion from float or int voltage to int digital (binary) code should take place BEFORE this function. Parameters ---------- low : int Digital (binary) code for the low value of the step. high : int Digital (binary) code for the high value of the step. length : TODO add type for length TODO add description for length actual_length : bool TODO add description for actual_length duty : int or float Duty cycle percentage. Enter as a percentage [0.0, 100.0]. Returns ------- ddr_seq : numpy.ndarray to be assigned to DDR data array """ if actual_length: length = int( DDR3.SAMPLE_SIZE/np.round(DDR3.SAMPLE_SIZE/length)) l_first = int(length/100*duty) l_end = int(length/100*(100-duty)) ramp_seq = np.concatenate((np.ones(l_first)*low, np.ones(l_end)*high)) num_tiles = DDR3.SAMPLE_SIZE//len(ramp_seq) extras = DDR3.SAMPLE_SIZE % len(ramp_seq) ddr_seq = np.tile(ramp_seq, num_tiles) ddr_seq = np.hstack((ddr_seq, ramp_seq[0:extras])) ddr_seq = ddr_seq.astype(np.uint16) return ddr_seq
[docs] def write_channels(self, set_ddr_read=True): """Write the channels as striped data to the DDR.""" data = np.zeros( int(len(self.data_arrays[0])*DDR3.NUM_CHANNELS)) data = data.astype(np.uint16) for i in range(DDR3.NUM_CHANNELS): if i % 2 == 0: # extra order swap on the 32 bit wide pipe data[(7-i - 1)::8] = self.data_arrays[i] else: data[(7-i + 1)::8] = self.data_arrays[i] print('Length of data DDR data [2 byte words] = {}'.format(len(data))) return self.write_buf(bytearray(data), set_ddr_read=set_ddr_read)
[docs] def write_buf(self, buf, set_ddr_read=True): """Write a bytearray to the DDR3. Parameters ---------- buf : bytearray bytearray to write to the DDR Returns ------- block_pipe_return : int length of the buffer written to the DDR (or error code if unsuccessful) speed_MBs : float speed of the write in MB/s """ print('Length of buffer being written to DDR [bytes]: ', len(buf)) self.clear_dac_read() self.reset_fifo(name='DAC_IN') self.set_dac_write() print('Writing to DDR...') time1 = time.time() block_pipe_return = self.fpga.xem.WriteToBlockPipeIn(epAddr=self.endpoints['BLOCK_PIPE_IN'].address, blockSize=DDR3.BLOCK_SIZE, data=buf) print(f'The length of the DDR write was {block_pipe_return}') time2 = time.time() time3 = (time2-time1) speed_MBs = (int)(block_pipe_return/1024/1024/time3) print(f'The speed of the write was {speed_MBs} MB/s') # below prepares the HDL into read mode self.clear_dac_write() if set_ddr_read: self.set_dac_read() self.set_adc_read() return block_pipe_return, speed_MBs
[docs] def reset_mig_interface(self): """Reset user interface to the MIG (memory interface generator). Resets the DDR address pointers for read/write of both buffers (does not reset the MIG controller). """ self.fpga.send_trig(self.endpoints['UI_RESET'])
[docs] def fifo_status(self): """Check the empty, full, and count status of the DDR interfacing FIFOs Returns ------- fifo_status : dict dictionary of fifo status ('EMPTY', 'FULL', 'ADC_DATA_COUNT') for 'IN', 'OUT', and channels 1, 2 """ wire_status = self.fpga.read_wire( self.endpoints['INIT_CALIB_COMPLETE'].address) fifo_status = {} for fe in ['EMPTY', 'FULL']: for num in [1, 2]: for inout in ['IN', 'OUT']: ep_name = '{}{}_{}'.format(inout, num, fe) bit_pos = self.endpoints[ep_name].bit_index_low val = test_bit(wire_status, bit_pos) fifo_status[ep_name] = val print('{} = {}'.format(ep_name, val)) ep_name = 'ADC_DATA_COUNT' cnt_bit_low = self.endpoints[ep_name].bit_index_low cnt_bit_width = self.endpoints[ep_name].bit_width cnt_msk = gen_mask(np.arange(cnt_bit_low, cnt_bit_width)) fifo_status[ep_name] = (wire_status & cnt_msk) >> cnt_bit_low print('{} = {}'.format(ep_name, val)) # self.print_fifo_status(fifo_status) return fifo_status
[docs] def print_fifo_status(self, fifo_status): """Print the FIFO status dictionary. Parameters ---------- fifo_status : dict Dictionary of fifo status. """ for k in fifo_status: print('{} = {}'.format(k, fifo_status[k]))
[docs] def set_dac_read(self): """Set DDR / FIFOs read enable. Enables DDR data going to the DACs and ADC data into DDR """ self.fpga.set_wire_bit(self.endpoints['DAC_READ_ENABLE'].address, self.endpoints['DAC_READ_ENABLE'].bit_index_low)
[docs] def clear_dac_read(self): """Clear DDR / FIFOs read enable. Stops DDR data from going to the DACs and ADC data into DDR """ self.fpga.clear_wire_bit(self.endpoints['DAC_READ_ENABLE'].address, self.endpoints['DAC_READ_ENABLE'].bit_index_low)
[docs] def set_dac_write(self): """Set DDR / FIFOs write enable into DDR via Pipe. """ self.fpga.set_wire_bit(self.endpoints['DAC_WRITE_ENABLE'].address, self.endpoints['DAC_WRITE_ENABLE'].bit_index_low)
[docs] def clear_dac_write(self): """Clear DDR / FIFOs write enable into DDR via Pipe. """ self.fpga.clear_wire_bit(self.endpoints['DAC_WRITE_ENABLE'].address, self.endpoints['DAC_WRITE_ENABLE'].bit_index_low)
[docs] def set_adc_write(self): """Set DDR / FIFOs write enable ADC data into DDR. """ self.fpga.set_wire_bit(self.endpoints['ADC_WRITE_ENABLE'].address, self.endpoints['ADC_WRITE_ENABLE'].bit_index_low)
[docs] def set_adc_dac_simultaneous(self): """ Set DDR / FIFOs read enable. Enables DDR data going to the DACs and ADC data into DDR """ bit_addr = [self.endpoints['ADC_WRITE_ENABLE'].bit_index_low, self.endpoints['DAC_READ_ENABLE'].bit_index_low, self.endpoints['ADC_TRANSFER_ENABLE'].bit_index_low] bit_vals = [1, 1, 1] self.fpga.set_ep_simultaneous(self.endpoints['ADC_WRITE_ENABLE'].address, bit_addr, bit_vals)
[docs] def clear_adc_write(self): """Clear DDR / FIFOs write enable ADC data into DDR. """ self.fpga.clear_wire_bit(self.endpoints['ADC_WRITE_ENABLE'].address, self.endpoints['ADC_WRITE_ENABLE'].bit_index_low)
[docs] def set_adc_read(self): """Set DDR / FIFO read enable DDR data from the ADCs out via a PipeOut. """ self.fpga.set_wire_bit(self.endpoints['ADC_TRANSFER_ENABLE'].address, self.endpoints['ADC_TRANSFER_ENABLE'].bit_index_low)
[docs] def clear_adc_read(self): """Clear DDR / FIFO read enable DDR data from the ADCs out via a PipeOut. """ self.fpga.clear_wire_bit(self.endpoints['ADC_TRANSFER_ENABLE'].address, self.endpoints['ADC_TRANSFER_ENABLE'].bit_index_low)
[docs] def set_adcs_connected(self): """Set that AD7961s are connected to the FPGA so that DDR3 write enable comes from the AD7961.v module. """ self.fpga.set_wire_bit(self.endpoints['USE_ADC_READY'].address, self.endpoints['USE_ADC_READY'].bit_index_low)
[docs] def clear_adcs_connected(self): """ AD7961s are not connected to the FPGA so the DDR3 write enable signal is emulated by the global_timing module. """ self.fpga.clear_wire_bit(self.endpoints['USE_ADC_READY'].address, self.endpoints['USE_ADC_READY'].bit_index_low)
[docs] def reset_fifo(self, name): """Reset FIFO interfaces to DDR . """ fifo_reset_names = ['DAC_IN', 'DAC_READ', 'ADC_IN', 'ADC_TRANSFER'] if (name not in fifo_reset_names) and (not name == 'ALL'): print( f'DDR3 FIFO reset name of {name} is not in list of {fifo_reset_names}') else: if name == 'ALL': for name_tmp in fifo_reset_names: ep_name = 'FIFO_' + name_tmp + '_RST' self.fpga.set_wire_bit(self.endpoints[ep_name].address, self.endpoints[ep_name].bit_index_low) self.fpga.clear_wire_bit(self.endpoints[ep_name].address, self.endpoints[ep_name].bit_index_low) else: ep_name = 'FIFO_' + name + '_RST' self.fpga.set_wire_bit(self.endpoints[ep_name].address, self.endpoints[ep_name].bit_index_low) self.fpga.clear_wire_bit(self.endpoints[ep_name].address, self.endpoints[ep_name].bit_index_low)
[docs] def adc_single(self): """Set ADC read address to the ADC write address. Emulates an immediate "trigger" of an oscilloscope. """ self.fpga.set_wire_bit(self.endpoints['ADC_ADDR_SET'].address, self.endpoints['ADC_ADDR_SET'].bit_index_low) self.fpga.send_trig(self.endpoints['ADC_ADDR_RESET'])
[docs] def read_adc_block(self, sample_size=None, source='ADC', DEBUG_PRINT=False): """Read ADC (and other) DDR data. Block size must be a power of two from 16 to 16384 will automatically perform multiple transfers to complete the full LENGTH. The length must be an integer multiple of 16 for USB3.0 and the length must be an Integer multiple of Block Size. see https://docs.opalkelly.com/fpsdk/frontpanel-api/ section 3.3.1 Parameters ---------- sample_size : int Length of read in bytes. If none uses DDR parameter 'sample_size.' source : str FIFO output buffer to read. Either 'ADC' or 'FG'. 'FG' just reads back what is written for DACs (as function generator) so not so useful. Returns ------- data_buf : byearray adc data read as a bytearray read_cnt : int The count (or error code) read from the OpalKelly interface """ if sample_size is None: data = np.zeros((DDR3.SAMPLE_SIZE,), dtype=int) data_buf = bytearray(data) else: data_buf = bytearray(sample_size) block_size = DDR3.BLOCK_SIZE # check block size if block_size % 16 != 0: print('Error in read adc. Block size is not a multiple of 16') return -1, -1 if block_size > 16384: print('Error in read adc. Block size is greater than 16384') return -2, -2 if source == 'ADC': read_cnt = self.fpga.xem.ReadFromBlockPipeOut(epAddr=self.endpoints['BLOCK_PIPE_OUT'].address, blockSize=block_size, data=data_buf) elif source == 'FG': # TODO: test, don't expect this to work read_cnt = self.fpga.xem.ReadFromBlockPipeOut(epAddr=self.endpoints['BLOCK_PIPE_OUT_FG'].address, blockSize=block_size, data=data_buf) else: print('Incorrect source in read_adc') return -11, -11 if DEBUG_PRINT: print( f'The length [num of bytes] of the BlockPipeOut read is: {read_cnt}') return data_buf, read_cnt
[docs] def deswizzle(self, d, convert_twos=True): """Reorder DDR data to match the ADC channels. Shift MSBytes up by 8 and combine with LSBytes. Swap channels to match ADC channel numbering. Parameters ---------- d : array array of bytes. convert_twos : Boolean if true converts data to signed Returns ------- chan_data : dict dictionary of data arrays (keys are channel numbers) """ bits = 16 chan_data_swz = {} # this data is swizzled # first version of ADC data before DACs + timestamps are stored if self.parameters['data_version'] == 'ADC_NO_TIMESTAMPS': for i in range(4): chan_data_swz[i] = (d[(0 + i * 2):: 8] << 0) + \ (d[(1 + i * 2):: 8] << 8) chan_data = {} chan_data[0] = chan_data_swz[2] chan_data[1] = chan_data_swz[3] chan_data[2] = chan_data_swz[0] chan_data[3] = chan_data_swz[1] if convert_twos: for i in range(4): chan_data[i] = custom_signed_to_int(chan_data[i], bits) # first version of ADC data before DACs + timestamps are stored if self.parameters['data_version'] == 'TIMESTAMPS': for i in range(8): chan_data_swz[i] = (d[(0 + i * 2):: 16] << 0) + \ (d[(1 + i * 2):: 16] << 8) chan_data = {} chan_data[0] = chan_data_swz[6] chan_data[1] = chan_data_swz[7] chan_data[2] = chan_data_swz[5] chan_data[3] = chan_data_swz[4] chan_data[4] = chan_data_swz[2] chan_data[5] = chan_data_swz[3] chan_data[6] = chan_data_swz[0] chan_data[7] = chan_data_swz[1] return chan_data
[docs] def data_to_names(self, chan_data, bitfile_version=None): """ Put deswizzled data into dictionaries with names that match with the data sources. Complete twos complement conversion where necessary. Check timestamps for skips. Check the constant values for errors. This supports 2 versions of the FPGA code: 'ADC_NO_TIMESTAMPS': DDR data is only the fast ADC. AD7961 'TIMESTAMPS': DDR data is numerous. AD7961, AD5453 out, ADS8686, timestamps, readcheck Parameters ---------- chan_data : dict of np.arrays data from reading DDR (minimally processed into 2 byte containers) bitfile_version : int The bitfile version that the .h5 file which held chan_data was created with. Defaults to the bitfile version of the FPGA Returns ------- adc_data : dict fast adc data (double format @ 5 MSPS) timestamp : np.array timestamps dac_data : dict DAC output data ads : dict ADS8686 ADC data (double format @ 1 MSPS) ads_seq_cnt : dict two keys 0, 1. Each value is an array of sequence counts (counts from 0 to 23) the first key will have 0,2,4,6, ... , 22 (or similar) the second key will have 1,3,5, ..., 23 (or similar) error : boolean if True the constant read values were wrong or the timestamp steps are not all the same """ if bitfile_version is None: # Old code, hasn't been updated to pass bitfile_version from .h5 file header # Default to FPGA bitfile version, or version 1 if the FPGA bitfile version is None if self.fpga.bitfile_version is None: bitfile_version = 1 else: bitfile_version = self.fpga.bitfile_version # first version of ADC data before DACs + timestamps are stored if self.parameters['data_version'] == 'ADC_NO_TIMESTAMPS': adc_data = chan_data timestamp = np.nan read_check = np.nan dac_data = np.nan ads = np.nan # first version of ADC data before DACs + timestamps are stored if self.parameters['data_version'] == 'TIMESTAMPS': adc_data = {} for i in range(4): # adc_data[i] = custom_signed_to_int(chan_data[i], 16) adc_data[i] = chan_data[i] if bitfile_version < 2: # 00.00.02 -> 2 lsb = chan_data[6][0::5].astype(np.uint64) else: lsb = chan_data[7][1::5].astype(np.uint64) mid_b = ((chan_data[6][1::5].astype(np.uint64)) << 16) msb = ((chan_data[7][2::5].astype(np.uint64)) << 32) t_len = np.size(msb) timestamp = (lsb[0:(t_len-1)] + mid_b[0:(t_len-1)] + msb[0:(t_len-1)]) read_check = {} read_check[0] = chan_data[7][3::10] read_check[1] = chan_data[7][4::10] read_check[2] = chan_data[7][8::10] read_check[3] = chan_data[7][9::10] ads_seq_cnt = {} ads_seq_cnt[0] = (chan_data[7][4::10] & 0x001f) ads_seq_cnt[1] = (chan_data[7][9::10] & 0x001f) dac_data = {} dac_data[0] = chan_data[4][0::2] dac_data[1] = chan_data[4][1::2] dac_data[2] = chan_data[5][0::2] dac_data[3] = chan_data[5][1::2] # dac channels 4,5 are available but not every sample. skip for now. TODO: add channels 4,5 ads = {} ads['A'] = custom_signed_to_int(chan_data[7][0::5], 16) if bitfile_version < 2: # 00.00.02 -> 2 ads['B'] = custom_signed_to_int(chan_data[7][1::5], 16) else: ads['B'] = custom_signed_to_int(chan_data[6][0::5], 16) error = False # check that the constant values are constant constant_values = {0: 0xaa55, 1: (0x28b<<5), 2: 0x77bb, 3: (0x28c<<5)} constant_value_mask = {0: 0xffff, 1: 0xffe0, 2: 0xffff, 3: 0xffe0} for i in range(4): if not np.all( (read_check[i] & constant_value_mask[i]) == constant_values[i]): print(f'Error in constant value: {constant_values[i]} ') print( f'Number of errors: {np.sum(read_check[i] != constant_values[i])}') error = True # check the timestamps for a skip unq_time_intervals = np.unique(np.diff(timestamp)) if np.size(unq_time_intervals) > 1: print('Warning: Multiple time intervals') error = True return adc_data, timestamp, dac_data, ads, ads_seq_cnt, error
[docs] def save_data(self, data_dir, file_name, num_repeats=4, blk_multiples=40, append=False): """ read and save DDR data to an hdf file Parameters ---------- data_dir : string directory for data file_name : string filename to save data (does not append extension). num_repeats : int total data read is 2048 bytes * num_repeats * blk_multiples blk_multiples : int number of blocks read by adc_read. adc_read is a single OpalKelly API call Returns ------- new_data : np.ndarray The new data saved in the h5 file. """ # If the file doesn't already exist, write a new one full_data_name = os.path.join(data_dir, file_name) if append: if not os.path.exists(full_data_name): print(f'No existing file found at {full_data_name}, creating new file') append = False else: try: os.remove(full_data_name) except OSError: pass if append: file_mode = 'a' else: file_mode = 'w' chunk_size = int(DDR3.BLOCK_SIZE * blk_multiples / ( DDR3.NUM_ADC_CHANNELS*2)) # readings per ADC repeat = 0 adc_readings = chunk_size*num_repeats print(f'Anticipated chunk size (readings per channel) {chunk_size}') print( f'Reading {adc_readings*2/1024} kB per ADC channel for a total of {adc_readings*DDR3.ADC_PERIOD*1000} ms of data') self.set_adc_read() # enable data into the ADC reading FIFO time.sleep(adc_readings*DDR3.ADC_PERIOD) # Save ADC DDR data to a file with h5py.File(full_data_name, file_mode) as file: if append: data_set = file['adc'] new_data_index = data_set.shape[1] if data_set.attrs['bitfile_version'] != self.fpga.bitfile_version: raise Exception(f"File {os.path.join(data_dir, file_name)} bitfile version {data_set.attrs['bitfile_version']} does not match FPGA bitfile version {self.fpga.bitfile_version}") # Make space for first round of new data. Not needed when not appending because the data set is created with chunk_size space data_set.resize(data_set.shape[1] + chunk_size, axis=1) else: data_set = file.create_dataset("adc", (DDR3.NUM_ADC_CHANNELS, chunk_size), maxshape=( DDR3.NUM_ADC_CHANNELS, None)) data_set.attrs['bitfile_version'] = self.fpga.bitfile_version new_data_index = 0 while repeat < num_repeats: d, bytes_read_error = self.read_adc(blk_multiples) if self.parameters['data_version'] == 'ADC_NO_TIMESTAMPS': chan_data = self.deswizzle(d) elif self.parameters['data_version'] == 'TIMESTAMPS': chan_data = self.deswizzle(d) if DDR3.NUM_ADC_CHANNELS == 4: chan_stack = np.vstack( (chan_data[0], chan_data[1], chan_data[2], chan_data[3])) if DDR3.NUM_ADC_CHANNELS == 8: chan_stack = np.vstack((chan_data[0], chan_data[1], chan_data[2], chan_data[3], chan_data[4], chan_data[5], chan_data[6], chan_data[7])) repeat += 1 if repeat == 0: print(f'Chunk size by chan data size {chunk_size}') data_set[:] = chan_stack else: data_set[:, -chunk_size:] = chan_stack if repeat < num_repeats: data_set.resize(data_set.shape[1] + chunk_size, axis=1) new_data = data_set[:, new_data_index:] print(f'Done with DDR reading: saved as {full_data_name}') return new_data
[docs] def read_adc(self, blk_multiples=2048): """ read DDR data into a numpy buffer of bytes Parameters ---------- blk_multiples : int total size of the read is blk_multiples * block_size Returns ------- d : bytearray data as uint32 bytes_read_error : int bytes read or error code """ t, bytes_read_error = self.read_adc_block( # just reads from the block pipe out sample_size=DDR3.BLOCK_SIZE * blk_multiples ) d = np.frombuffer(t, dtype=np.uint8).astype(np.uint32) print(f'Bytes read: {bytes_read_error}') return d, bytes_read_error
[docs] def set_index(self, factor, factor2=None): """ No longer used. Index (the DDR address that wraps-around to 0). This is now fixed to improve timing performance. """ print('Set index is no longer used. Index is fixed to: {}'.format( DDR3.PORT1_INDEX))
[docs] def write_setup(self, data_driven_clock=True): """Set up DDR for writing.""" if data_driven_clock: self.set_adcs_connected() else: self.clear_adcs_connected() self.clear_dac_read() self.clear_adc_write() self.clear_adc_read() # Stop putting data in outgoing FIFO for Pipe read self.reset_fifo(name='ALL') self.reset_mig_interface()
[docs] def repeat_setup(self): """Setup for reading new data without writing to the DDR again.""" # stop access to the FIFOs so that after reset of the FIFO(s) no new data is added/extracted self.clear_adc_read() self.clear_adc_write() self.clear_dac_read() self.reset_fifo(name='ALL') # self.fpga.send_trig(self.endpoints['UI_RESET']) self.reset_mig_interface() # note that the MIG interface addresses are driven by the FIFOs so will idle # until the FIFOs are reenable with write_finish() self.write_finish() time.sleep(0.01)
def write_finish(self): # reenable both DACs self.set_adc_dac_simultaneous() # enable DAC playback and ADC writing to DDR