Xylo-Imu Convert Acceleration to Spike#
Convert acceleration events to input spikes by IMU_IF.
Acceleration events are consumed by IMU_IF one by one to read spike count of hidden neurons manually. samna.xyloImu.event.TriggerProcessing
is made use of to stop processing after each single trigger.
Due to current deficiency of FPGA, don’t send too much acceleration events (more than 1000) in once before previous acceleration events are consumed!
You better set the target timestep in samna.xyloImu.event.TriggerProcessing
correctly to consume all input acceleration events precisely, then send following acceleration events, you could take implementation of batch_process
function in Xylo-Imu RealTime Mode With Acceleration Input as an example.
- samna 0.33.1
Files needed:
import samna
import numpy as np
def initialize_board() :
dk = samna.device.open_device("XyloImuTestBoard:0")
buf = samna.graph.sink_from(dk.get_model_source_node())
source = samna.graph.source_to(dk.get_model_sink_node())
return dk, buf, source
dk, buf, source = initialize_board()
def build_event_type_filters(dk, graph):
_, etf0, register_value_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf0.set_desired_type('xyloImu::event::RegisterValue')
_, etf1, readout_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf1.set_desired_type('xyloImu::event::Readout')
_, etf2, interrupt_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf2.set_desired_type('xyloImu::event::Interrupt')
_, etf3, membrane_potential_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf3.set_desired_type('xyloImu::event::MembranePotential')
_, etf4, synaptic_current_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf4.set_desired_type('xyloImu::event::SynapticCurrent')
_, etf5, hidden_spike_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf5.set_desired_type('xyloImu::event::HiddenSpikeCount')
return register_value_buf, readout_buf, interrupt_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf
graph = samna.graph.EventFilterGraph() # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.
register_value_buf, readout_buf, interrupt_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf = build_event_type_filters(dk, graph)
graph.start() # Graph has to be started manually to work.
def decode_acceleration_events():
import pandas as pd
data = pd.read_csv("sample_003_device01.csv")
events = []
for row in range(len(data)):
x = data.iloc[row][1]
y = data.iloc[row][2]
z = data.iloc[row][3]
scale_factor = 2 ** 12
x = round(x * scale_factor)
y = round(y * scale_factor)
z = round(z * scale_factor)
events.append(samna.events.Acceleration(x=x, y=y, z=z))
return events
def get_current_timestep():
# Current timestep is last processed timestep.
source.write([samna.xyloImu.event.TriggerReadout()])
evts = readout_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
assert(len(evts) == 1)
return evts[0].timestep
def apply_configuration():
dk.enable_manual_input_acceleration(True) # Use manual input instead of sensor input
xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.RealTime
# Configure a custom network which reflects input spikes to hidden spikes
input_count = 15
hidden_count = 15
output_count = 15
xylo_config.input.weights = np.eye(15, dtype='int8') * 2
xylo_config.hidden.weights = np.eye(15, dtype='int8') * -1
hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron()] * hidden_count
for neuron in hidden_neurons:
neuron.threshold = 1
neuron.v_mem_decay = 1
neuron.i_syn_decay = 1
xylo_config.hidden.neurons = hidden_neurons
output_neurons = [samna.xyloImu.configuration.OutputNeuron()] * output_count
for neuron in output_neurons:
neuron.threshold = 1
neuron.v_mem_decay = 1
neuron.i_syn_decay = 1
xylo_config.readout.neurons = output_neurons
xylo_config.readout.weights = np.eye(15, dtype='int8') * 2
# Preparation to run in realtime mode with input interface opened.
xylo_config.debug.always_update_omp_stat = True
xylo_config.imu_if_input_enable = True
xylo_config.debug.imu_if_clk_enable = True
xylo_config.time_resolution_wrap = 0x61a80
xylo_config.debug.imu_if_clock_freq_div = 0x169
# Open and config input interface of Xylo-IMU, you could customize the parameters.
xylo_config.input_interface.enable = True
xylo_config.input_interface.estimator_k_setting = 6
xylo_config.input_interface.select_iaf_output = True
xylo_config.input_interface.update_matrix_threshold = 255
xylo_config.input_interface.delay_threshold = 1
xylo_config.input_interface.bpf_bb_values = [6] * 15
xylo_config.input_interface.bpf_bwf_values = [8] * 15
xylo_config.input_interface.bpf_baf_values = [9] * 15
xylo_config.input_interface.bpf_a1_values = [-64700, -64458, -64330, -64138, -63884, -63566, -63169, -62743, -62238, -61672, -61045, -60357, -59611, -58805, -57941]
xylo_config.input_interface.bpf_a2_values = [31935] + [31754] * 14
xylo_config.input_interface.scale_values = [8]*15
xylo_config.input_interface.iaf_threshold_values = [0x000007d0] * 15
dk.get_model().apply_configuration(xylo_config)
return xylo_config, input_count, hidden_count, output_count
xylo_config, input_count, hidden_count, output_count = apply_configuration()
def get_readout(hidden_count, output_count):
readouts = readout_buf.get_n_events(1, 2000)
# Only two attributes of `Readout` event is available in manual mode: `timestep`, `output_v_mems`.
# We have to read all other things manually in manual mode.
assert(len(readouts) == 1)
readout = readouts[0]
# Read all hidden spike count
for i in range(2):
source.write([samna.xyloImu.event.ReadHiddenSpikeCount(neuron_id = i) for i in range(hidden_count)])
hidden_spikes = hidden_spike_buf.get_n_events(hidden_count, 5000)
assert(len(hidden_spikes) == hidden_count)
readout.hidden_spikes = [e.count for e in hidden_spikes]
return readout
def process_single(accel: samna.events.Acceleration, hidden_count: int, output_count: int):
# Process a single acceleration event and read full readout manually.
readout_buf.get_events() # clear
start_timestep = get_current_timestep()
source.write([accel])
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = start_timestep + 2)])
interrupt_events = interrupt_buf.get_n_events(1, 2000) # Try to get 1 event in 2 seconds.
if not interrupt_events:
# By default there is an interrupt after processing done.
raise Exception("No interrupt occurs after processing done!")
# receive readout
readout = get_readout(hidden_count, output_count)
return readout
def step_process(accel_events):
start_timestep = get_current_timestep()
print("*** Start timestep before step processing: ", start_timestep)
accel_count = len(accel_events)
spikes = []
for accel in accel_events:
readout = process_single(accel, hidden_count, output_count)
# print(f"*** Readouts: ", readout)
neuron_id = 0
for spike_count in readout.hidden_spikes:
for _ in range(spike_count):
spike = samna.xyloImu.event.Spike(neuron_id = neuron_id, timestep = readout.timestep)
print("Get spike: ", spike)
spikes.append(spike)
neuron_id += 1
end_timestep = get_current_timestep()
assert(end_timestep - start_timestep == accel_count)
print("Readout count: ", accel_count)
print("End timestep after step processing: ", end_timestep)
print("\n")
return spikes
def get_expected_spikes():
spikes = []
with open("expected_spikes.txt", "r") as f:
lines = f.readlines()
for line in lines:
items = line.strip().split(",")
items = [int(i) for i in items]
spikes.append(samna.xyloImu.event.Spike(neuron_id = items[0], timestep = items[1]))
return spikes
accel_events = decode_acceleration_events()
spikes = step_process(accel_events)
print("Get spike count: ", len(spikes))
assert(spikes == get_expected_spikes())
graph.stop()