Xylo-Imu TriggerProcessing#
AcceleratedTime mode and realtime mode now supports samna.xyloImu.event.TriggerProcessing
event, which triggers specific times until target timestep reaches and allows you to:
Process spikes of last input timestep in accelerated mode.
Trigger specific times then pause and debug in realtime mode.
Here are two examples that demonstrates how to use it in accelerated mode and realtime mode respectively.
Examples are based on packages :
- samna 0.33.1
1. Trigger processing in accelerated mode#
import samna
import time
def initialize_board() :
dk = samna.device.open_device("XyloImuTestBoard:0")
io = dk.get_io_module()
buf = samna.graph.sink_from(dk.get_model_source_node())
source = samna.graph.source_to(dk.get_model_sink_node())
return dk, io, buf, source
dk, io, buf, source = initialize_board()
graph = samna.graph.EventFilterGraph()
_, etf, state_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf.set_desired_type('xyloImu::event::Readout')
graph.start() # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.
def get_current_timestep():
source.write([samna.xyloImu.event.TriggerReadout()])
evts = state_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
assert(len(evts) == 1)
return evts[0].timestep
def send_spike(timestep):
source.write([samna.xyloImu.event.Spike(timestep=timestep)])
xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.AcceleratedTime
# A tiny model is applied here.
input_count = 1
rsn_count = 2
output_count = 1
xylo_config.input.weights = [[1, 1]]
xylo_config.hidden.weights = [[1,1], [1,1]]
hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron(), samna.xyloImu.configuration.HiddenNeuron()]
xylo_config.hidden.neurons = hidden_neurons
output_neurons = [samna.xyloImu.configuration.OutputNeuron()]
xylo_config.readout.neurons = output_neurons
xylo_config.readout.weights = [[1], [1]]
# Configure neuron ranges to monitor which is necessary for hidden neurons in accelerated mode.
xylo_config.debug.monitor_neuron_i_syn = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count + output_count)
xylo_config.debug.monitor_neuron_v_mem = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count + output_count)
xylo_config.debug.monitor_neuron_spike = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count)
dk.get_model().apply_configuration(xylo_config)
# Evolve normally
init_timestep = get_current_timestep()
print("initial timestep for first run: ", init_timestep) # which is -1 after reset
test_step_count = 10
for step in range(1, test_step_count+1):
curr_timestep = init_timestep+step
print(f"send a spike with timestep {curr_timestep}")
send_spike(curr_timestep)
# Sending a TriggerProcessing event to make xylo core process to specific timestep.
# Typically below line ensure timestep `init_timestep+test_step_count` is triggered which is used to process spike of last timestep.
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = init_timestep+test_step_count+1)])
readouts = state_buf.get_n_events(test_step_count, 3000) # Try to get specific count of events in a blocking way, if it gets timeout before all events are received, just return the received events.
assert(len(readouts) == test_step_count) # All spikes are processed.
print(f"Get {len(readouts)} readout events: ", readouts)
# Evolve with missing spike
init_timestep = get_current_timestep()
print("initial timestep for second run: ", init_timestep) # timestep 9
test_step_count = 10
for step in range(1, test_step_count+1):
if step <= 5:
curr_timestep = init_timestep+step
print(f"send a spike with timestep {curr_timestep}")
send_spike(curr_timestep)
# We don't input any spike for last 5 timesteps, but it can't prevent us from processing on them by using `TriggerProcessing` event.
# Typically below line ensure timesteps until `init_timestep+test_step_count`(included) is triggered which is used for processing last several timesteps.
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = init_timestep+test_step_count+1)])
readouts = state_buf.get_n_events(test_step_count, 3000) # Give enough time for all timesteps without spike to be processed.
assert(len(readouts) == test_step_count) # All spikes are processed.
print(f"Get {len(readouts)} readout events: ", readouts)
print("timestep after run: ", get_current_timestep()) # timestep 19
# no effect if `TriggerProcessing` of previous timestep or default timestep is sent in accelerated mode.
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = init_timestep+test_step_count)])
time.sleep(0.1)
print("timestep after run: ", get_current_timestep()) # timestep 19
source.write([samna.xyloImu.event.TriggerProcessing()])
time.sleep(0.1)
print("timestep after run: ", get_current_timestep()) # timestep 19
graph.stop()
2. Trigger processing in realtime mode#
import samna
import time
def initialize_board() :
dk = samna.device.open_device("XyloImuTestBoard:0")
buf = samna.graph.sink_from(dk.get_model_source_node())
source = samna.graph.source_to(dk.get_model_sink_node())
return dk, buf, source
dk, buf, source = initialize_board()
graph = samna.graph.EventFilterGraph()
_, etf, state_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf.set_desired_type('xyloImu::event::Readout')
graph.start()
def get_current_timestep():
source.write([samna.xyloImu.event.TriggerReadout()])
evts = state_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
assert(len(evts) == 1)
return evts[0].timestep
xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.RealTime
dk.enable_manual_input_acceleration(False) # Use sensor input instead of manual input.
# A tiny model is applied here.
input_count = 1
rsn_count = 2
output_count = 1
xylo_config.time_resolution_wrap = 0x7a120 # Resolution wrap is set to 10ms if main clock rate is 50MHz which is the default value.
xylo_config.input.weights = [[1, 1]]
xylo_config.hidden.weights = [[1,1], [1,1]]
hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron(), samna.xyloImu.configuration.HiddenNeuron()]
xylo_config.hidden.neurons = hidden_neurons
output_neurons = [samna.xyloImu.configuration.OutputNeuron()]
xylo_config.readout.neurons = output_neurons
xylo_config.readout.weights = [[1], [1]]
dk.get_model().apply_configuration(xylo_config)
timestep = get_current_timestep()
print("Initial timestep in realtime mode before first trigger: ", timestep) # which is -1 after reset
# Feel free to do things like reading/writing in realtime mode before first trigger, we call now as debug time of realtime mode.
run_trigger_count = 10
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = timestep + run_trigger_count + 1)]) # Run timestep 0,1,2,...,9
# timestep start from 0 after first trigger
readouts = state_buf.get_n_events(run_trigger_count, 3000) # Try to get 10 readouts in 3 seconds. Make sure to give enough time for all triggers to complete, otherwise less readouts will be received.
assert(len(readouts) == run_trigger_count)
for r in readouts:
print(r)
# Now it's debug time of realtime mode again. You can do anything you like (including switching to other mode) to debug.
print("Enter debug time of realtime mode, current timestep: ", get_current_timestep()) # timestep 9
# no effect if `TriggerProcessing` of previous timestep is sent.
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = timestep + run_trigger_count)])
time.sleep(0.1)
print("Current timestep after TriggerProcessing of previous timestep is sent: ", get_current_timestep()) # timestep 9
# Start processing again after debug
timestep = get_current_timestep()
source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = timestep + run_trigger_count + 1)]) # Run timestep 10,11,12,...,19
readouts = state_buf.get_n_events(run_trigger_count, 3000)
assert(len(readouts) == run_trigger_count)
for r in readouts:
print(r)
print("Enter debug time of realtime mode again, current timestep: ", get_current_timestep()) # timestep 19
# Run infinitely with `TriggerProcessing` event with default timestep.
print("Start to trigger infinitely...")
source.write([samna.xyloImu.event.TriggerProcessing()])
receiving_duration = 3
begin = time.time()
recv_count = 0
while time.time() - begin < receiving_duration:
evts = state_buf.get_events()
if evts:
recv_count += len(evts)
time.sleep(0.01)
print(f"Running infinitely in realtime mode. Receiving {recv_count} readout events in {receiving_duration} seconds. Still running.")
graph.stop()