Xylo-Imu RealTime Mode With Acceleration Input#
Xylo-IMU supports debugging with acceleration data in realtime mode. You could pause easily to debug like manual mode by controlling your input.
Here is an example that display typical usage of it, which bases on packages :
- samna 0.33.1
- numpy 1.24.3
import samna
import numpy as np
def initialize_board() :
dk = samna.device.open_device("XyloImuTestBoard:0")
buf = samna.graph.sink_from(dk.get_model_source_node())
source = samna.graph.source_to(dk.get_model_sink_node())
return dk, buf, source
dk, buf, source = initialize_board()
def build_event_type_filters(dk, graph):
_, etf0, readout_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf0.set_desired_type('xyloImu::event::Readout')
_, etf1, membrane_potential_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf1.set_desired_type('xyloImu::event::MembranePotential')
_, etf2, synaptic_current_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf2.set_desired_type('xyloImu::event::SynapticCurrent')
_, etf3, hidden_spike_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf3.set_desired_type('xyloImu::event::HiddenSpikeCount')
return readout_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf
graph = samna.graph.EventFilterGraph() # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.
readout_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf = build_event_type_filters(dk, graph)
graph.start() # Graph has to be started manually to work.
def get_current_timestep():
# Current timestep is last processed timestep.
source.write([samna.xyloImu.event.TriggerReadout()])
evts = readout_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
assert(len(evts) == 1)
return evts[0].timestep
def apply_configuration():
dk.enable_manual_input_acceleration(True) # Use manual input instead of sensor input
xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.RealTime
input_count = 3
hidden_count = 5
output_count = 2
xylo_config.input.weights = [[1] * hidden_count] * input_count
xylo_config.hidden.weights = [[1] * hidden_count] * hidden_count
hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron()] * hidden_count
xylo_config.hidden.neurons = hidden_neurons
output_neurons = [samna.xyloImu.configuration.OutputNeuron()] * output_count
xylo_config.readout.neurons = output_neurons
xylo_config.readout.weights = [[1] * output_count] * hidden_count
# Preparation to run in realtime mode with input interface opened.
xylo_config.debug.always_update_omp_stat = True
xylo_config.imu_if_input_enable = True
xylo_config.debug.imu_if_clk_enable = True
xylo_config.time_resolution_wrap = 0x61a80
xylo_config.debug.imu_if_clock_freq_div = 0x169
# Open and config input interface of Xylo-IMU, you could customize the parameters.
xylo_config.input_interface.enable = True
xylo_config.input_interface.estimator_k_setting = 6
xylo_config.input_interface.select_iaf_output = True
xylo_config.input_interface.update_matrix_threshold = 255
xylo_config.input_interface.delay_threshold = 1
xylo_config.input_interface.bpf_bb_values = [6] * 15
xylo_config.input_interface.bpf_bwf_values = [8] * 15
xylo_config.input_interface.bpf_baf_values = [9] * 15
xylo_config.input_interface.bpf_a1_values = [-64700, -64458, -64330, -64138, -63884, -63566, -63169, -62743, -62238, -61672, -61045, -60357, -59611, -58805, -57941]
xylo_config.input_interface.bpf_a2_values = [31935] + [31754] * 14
xylo_config.input_interface.scale_values = [8]*15
xylo_config.input_interface.iaf_threshold_values = [0x000007d0] * 15
dk.get_model().apply_configuration(xylo_config)
return xylo_config, input_count, hidden_count, output_count
xylo_config, input_count, hidden_count, output_count = apply_configuration()
def decode_acceleration_events(acceleration_data: np.ndarray):
# acceleration_data (np.ndarray): A raster ``(event count, 3)`` whose first dimension is the number of acceleration events and second dimension is fixed at 3 which indicates x, y, z.
assert(acceleration_data.ndim == 2)
assert(acceleration_data.shape[1] == 3)
events = []
for i in range(len(acceleration_data)):
data = acceleration_data[i]
events.append(samna.events.Acceleration(x = data[0], y = data[1], z = data[2]))
return events
def batch_process(acceleration_data: np.ndarray):
# Process specific count of acceleration data and return readouts.
# One acceleration event corresponds to a single readout exactly.
# Readouts of this function are read by FPGA automatically, only three attributes are available for the readouts returned by this function: `timestep`, `output_spike`, `output_v_mem`.
# If you need all attributes of readout to be available, you can call `step_process` which read manually and run slower instead.
start_timestep = get_current_timestep()
print("*** Start timestep before batch processing: ", start_timestep)
# Write acceleration events
accel_events = decode_acceleration_events(acceleration_data)
readout_buf.get_events() # clear
source.write(accel_events)
accel_count = len(accel_events)
# Trigger specific count to digest all input acceleration exactly.
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = start_timestep + accel_count + 1)])
# Wait for all data to be processed and receive all readout.
readouts = readout_buf.get_n_events(accel_count, 10000) # Try to get `accel_count` events in 10 seconds.
assert(len(readouts) == accel_count)
print("*** Readouts:")
for readout in readouts:
print(readout)
print("End timestep after batch processing: ", get_current_timestep())
print("\n")
def process_single(accel: samna.events.Acceleration, hidden_count: int, output_count: int):
# Process a single acceleration event and read full readout manually.
start_timestep = get_current_timestep()
readout_buf.get_events() # clear
source.write([accel])
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = start_timestep + 2)])
# receive readout
readouts = readout_buf.get_n_events(1, 2000) # Wait for 1 readout in 2 seconds
assert(len(readouts) == 1)
readout = readouts[0]
# Only three attributes are available for the readout: `timestep`, `output_spike`, `output_v_mem`.
# So we need to read other attributes manually in debug status.
# Read all membrane potentials
for _ in range(2): # Due to a bug on chip, you have to read memory twice to ensure it's correct.
source.write([samna.xyloImu.event.ReadMembranePotential(neuron_id = i) for i in range(hidden_count + output_count)])
membrane_potentials = membrane_potential_buf.get_n_events(hidden_count + output_count, 5000)
assert(len(membrane_potentials) == hidden_count + output_count)
readout.neuron_v_mems = [e.value for e in membrane_potentials]
# Read all synaptic current
for _ in range(2): # Due to a bug on chip, you have to read memory twice to ensure it's correct.
source.write([samna.xyloImu.event.ReadSynapticCurrent(neuron_id = i) for i in range(hidden_count + output_count)])
synaptic_currents = synaptic_current_buf.get_n_events(hidden_count + output_count, 5000)
assert(len(synaptic_currents) == hidden_count + output_count)
readout.neuron_i_syns = [e.value for e in synaptic_currents]
# Read all hidden spike count
source.write([samna.xyloImu.event.ReadHiddenSpikeCount(neuron_id = i) for i in range(hidden_count)])
hidden_spikes = hidden_spike_buf.get_n_events(hidden_count, 5000)
assert(len(hidden_spikes) == hidden_count)
readout.hidden_spikes = [e.count for e in hidden_spikes]
return readout
def step_process(acceleration_data: np.ndarray):
# Similar to `batch_process` except all attributes of readouts returned by this function is available.
# As most information of readouts are read manually in debug status, so it runs more slowly than `batch_process`.
start_timestep = get_current_timestep()
print("*** Start timestep before step processing: ", start_timestep)
# Decode acceleration events
accel_events = decode_acceleration_events(acceleration_data)
accel_count = len(accel_events)
readouts = []
for accel in accel_events:
readouts.append(process_single(accel, hidden_count, output_count))
end_timestep = get_current_timestep()
assert(end_timestep - start_timestep == accel_count)
print("*** Readouts:")
for readout in readouts:
print(readout)
print("End timestep after step processing: ", end_timestep)
print("\n")
# ------- bach process ------- #
# Run faster, but only timestep and information about output neurons are available in readout.
readouts = batch_process(np.array([
[50, 80, 90], # Acceleration data with x=50,y=80,z=90 which is to be processed at timestep 0.
[0, -40, -60], # Acceleration data with x=0,y=-40,z=-60 which is to be processed at timestep 1.
[0, 0, 0], # Acceleration data with x=0,y=0,z=0 which is to be processed at timestep 2.
]))
# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 2) # Last processed timestep is 2
readouts = batch_process(np.array([
[400, -100, -50],
[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
]))
# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 6) # Last processed timestep is 6
readouts = batch_process(np.array([
[100, 200, 300],
[0, 0, 0],
[-200, -900, -1000],
]))
assert(get_current_timestep() == 9) # Last processed timestep is 9
# ------- step process ------- #
# Run slower and all attributes of readout are available.
readouts = step_process(np.array([
[50, 80, 90], # Acceleration data with x=50,y=80,z=90 which is to be processed at timestep 10.
[0, -40, -60], # Acceleration data with x=0,y=-40,z=-60 which is to be processed at timestep 11.
[0, 0, 0], # Acceleration data with x=0,y=0,z=0 which is to be processed at timestep 12.
]))
# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 12) # Last processed timestep is 12
readouts = step_process(np.array([
[400, -100, -50],
[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
]))
# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 16) # Last processed timestep is 16
readouts = step_process(np.array([
[100, 200, 300],
[0, 0, 0],
[-200, -900, -1000],
]))
assert(get_current_timestep() == 19) # Last processed timestep is 19
graph.stop()