Xylo-Imu Accelerated Mode#
Accelerated mode is a mode that mocks realtime mode of Xylo-IMU:
Input spikes are processed automatically.
Processing result named
Readout
is read by FPGA automatically.It runs in a high speed comparable to realtime mode.
It stops automatically when all input spikes are processed.
You can do whatever you like as manual mode when it stops.
Here is an example that display typical usage of accelerated mode, which bases on packages :
- samna 0.33.1
- numpy 1.24.3
import samna
import numpy as np
def initialize_board() :
dk = samna.device.open_device("XyloImuTestBoard:0")
buf = samna.graph.sink_from(dk.get_model_source_node())
source = samna.graph.source_to(dk.get_model_sink_node())
return dk, buf, source
dk, buf, source = initialize_board()
graph = samna.graph.EventFilterGraph() # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.
_, etf, readout_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf.set_desired_type('xyloImu::event::Readout')
graph.start() # Graph has to be started manually to work.
def get_current_timestep():
# Current timestep is last processed timestep.
source.write([samna.xyloImu.event.TriggerReadout()])
evts = readout_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
assert(len(evts) == 1)
return evts[0].timestep
def apply_configuration():
xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.AcceleratedTime
input_count = 3
hidden_count = 5
output_count = 2
xylo_config.input.weights = [[1] * hidden_count] * input_count
xylo_config.hidden.weights = [[1] * hidden_count] * hidden_count
hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron()] * hidden_count
xylo_config.hidden.neurons = hidden_neurons
output_neurons = [samna.xyloImu.configuration.OutputNeuron()] * output_count
xylo_config.readout.neurons = output_neurons
xylo_config.readout.weights = [[1] * output_count] * hidden_count
# Configure neuron ranges to monitor which is necessary for hidden neurons in accelerated mode.
xylo_config.debug.monitor_neuron_i_syn = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = hidden_count + output_count)
xylo_config.debug.monitor_neuron_v_mem = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = hidden_count + output_count)
xylo_config.debug.monitor_neuron_spike = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = hidden_count)
dk.get_model().apply_configuration(xylo_config)
return xylo_config
xylo_config = apply_configuration()
def evolve(input: np.ndarray):
# Process all input spikes including spikes of last timestep.
# input (np.ndarray): A raster ``(Timestep, NumberOfSpikesInChannels)`` specifying for each bin the number of input events sent to the corresponding input channel on Xylo, at the corresponding time point. Up to 15 input events can be sent per bin.
start_timestep = get_current_timestep() + 1
timestep_count = len(input)
if not timestep_count: return
final_timestamp = start_timestep + timestep_count - 1
print("** To be processed timesteps: ", [timestep for timestep in range(start_timestep, final_timestamp + 1)])
input_events_list = []
# Locate input events
spikes = np.argwhere(input)
counts = input[np.nonzero(input)]
# Generate input events
for timestep, channel, count in zip(spikes[:, 0], spikes[:, 1], counts):
for _ in range(count):
event = samna.xyloImu.event.Spike()
event.neuron_id = channel
event.timestep = start_timestep + timestep
input_events_list.append(event)
# Add an extra event to ensure readout for entire input extent
event = samna.xyloImu.event.TriggerProcessing(target_timestep = final_timestamp + 1)
input_events_list.append(event)
source.write(input_events_list)
readouts = readout_buf.get_n_events(timestep_count, 10000) # Wait for `timestep_count` of readout events in 10 seconds.
assert(len(readouts) == timestep_count)
print("** Readout events of evolve: ")
for readout in readouts:
print(readout)
print("\n")
return readouts
readouts = evolve(np.array([
[0, 2, 1], # Timestep 0. No spike on neuron 0, 2 spikes on neuron 1, 1 spike on neuron 2.
[0, 0, 0], # Timestep 1. No spike on all input neuron.
[7, 8, 9], # Timestep 2. 7 spikes on neuron 0, 8 spikes on neuron 1, 9 spikes on neuron 2.
]))
readouts = evolve(np.array([
[2, 2, 2], # Timestep 3
]))
readouts = evolve(np.array([
[9, 9, 9], # Timestep 4
[0, 0, 0], # Timestep 5
[0, 0, 0], # Timestep 6
]))
assert(get_current_timestep() == 6) # Last processed timestep is 6
graph.stop()