Xylo-Imu Readout#

The output event samna.xyloImu.event.Readout has been refactored to replace NeuronStateSinkNode which is capable of monitoring:

  • Specific processed timestep which this readout belongs to. Please pay attention to the update: now timestep in readout is the last processed timestep, instead of current to be processed timestep. Typically the timestep now starts from -1 before any trigger.

  • State of hidden neurons and output neurons in accelerated mode.

  • State of output neurons in realtime mode.

  • State of output neurons in manual mode.

Here are three examples about how to make use of it in different mode respecively.

Examples are based on packages :

- samna                 0.33.1

1. Auto read readout in accelerated mode#

import samna
import time

def initialize_board() :
   dk = samna.device.open_device("XyloImuTestBoard:0")
   buf = samna.graph.sink_from(dk.get_model_source_node())
   source = samna.graph.source_to(dk.get_model_sink_node())
   return dk, buf, source

dk, buf, source = initialize_board()
graph = samna.graph.EventFilterGraph()
_, etf, state_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf.set_desired_type('xyloImu::event::Readout')
graph.start()   # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.

def get_current_timestep():
   source.write([samna.xyloImu.event.TriggerReadout()])
   evts = state_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
   assert(len(evts) == 1)
   return evts[0].timestep

def send_spike(ts):
   source.write([samna.xyloImu.event.Spike(timestep=ts)])

xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.AcceleratedTime

# A tiny model is applied here.
input_count = 1
rsn_count = 2
output_count = 1
xylo_config.input.weights = [[1, 1]]
xylo_config.hidden.weights = [[1,1], [1,1]]
hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron(), samna.xyloImu.configuration.HiddenNeuron()]
xylo_config.hidden.neurons = hidden_neurons
output_neurons = [samna.xyloImu.configuration.OutputNeuron()]
xylo_config.readout.neurons = output_neurons
xylo_config.readout.weights = [[1], [1]]

# Configure neuron ranges to monitor which is necessary for hidden neurons in accelerated mode.
xylo_config.debug.monitor_neuron_i_syn = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count + output_count)
xylo_config.debug.monitor_neuron_v_mem = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count + output_count)
xylo_config.debug.monitor_neuron_spike = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count)
dk.get_model().apply_configuration(xylo_config)

# Evolve
init_timestep = get_current_timestep()
print("initial timestep: ", init_timestep)   # which is -1

test_step_count = 10
for step in range(1, test_step_count+1):
   curr_timestep = init_timestep+step
   print(f"send a spike with timestep {curr_timestep}")
   send_spike(curr_timestep)

readouts = state_buf.get_n_events(test_step_count, 3000)   # Try to get specific count of events in a blocking way, if it gets timeout before all events are received, just return the received events.
assert(len(readouts) == test_step_count - 1)    # The last spike is not processed.
print(f"Get {len(readouts)} readout events: ", readouts)

print("timestep after run: ", get_current_timestep())   # which is 8
graph.stop()

2. Auto read readout in realtime mode#

import samna
import time

def initialize_board() :
   dk = samna.device.open_device("XyloImuTestBoard:0")
   buf = samna.graph.sink_from(dk.get_model_source_node())
   source = samna.graph.source_to(dk.get_model_sink_node())
   return dk, buf, source

dk, buf, source = initialize_board()
graph = samna.graph.EventFilterGraph()
_, etf, state_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf.set_desired_type('xyloImu::event::Readout')
graph.start()

def get_current_timestep():
   source.write([samna.xyloImu.event.TriggerReadout()])
   evts = state_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
   assert(len(evts) == 1)
   return evts[0].timestep

xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.RealTime

dk.enable_manual_input_acceleration(False)  # Use sensor input instead of manual input.

# A tiny model is applied here.
input_count = 1
rsn_count = 2
output_count = 1
xylo_config.time_resolution_wrap = 0x7a120  # Resolution wrap is set to 10ms if main clock rate is 50MHz which is the default value.

xylo_config.input.weights = [[1, 1]]
xylo_config.hidden.weights = [[1,1], [1,1]]
hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron(), samna.xyloImu.configuration.HiddenNeuron()]
xylo_config.hidden.neurons = hidden_neurons
output_neurons = [samna.xyloImu.configuration.OutputNeuron()]
xylo_config.readout.neurons = output_neurons
xylo_config.readout.weights = [[1], [1]]

dk.get_model().apply_configuration(xylo_config)
timestep = get_current_timestep()
print("Initial timestep in realtime mode before first trigger: ", timestep)   # which is -1
# feel free to do things like reading/writing in realtime mode before first trigger.

source.write([samna.xyloImu.event.TriggerProcessing()]) # The manual trigger to start realtime mode.
# timestep start from 0 after first trigger

# Keep receiving.
receive_duration = 1
begin = time.time()
recv_count = 0
while time.time() - begin < receive_duration:
   evts = state_buf.get_events()
   if evts:
      recv_count += len(evts)
      # Only three attributes of `Readout` event are available in realtime mode: `timestep`, `output_spike`, `output_v_mem`.
      print("Get readout events: ", evts)
   time.sleep(0.01)

print(f"Finished, receiving {recv_count} readout events. Still running.")
graph.stop()

3. Read readout in manual mode manually#

import samna
import time

def initialize_board() :
   dk = samna.device.open_device("XyloImuTestBoard:0")
   io = dk.get_io_module()
   buf = samna.graph.sink_from(dk.get_model_source_node())
   source = samna.graph.source_to(dk.get_model_sink_node())
   return dk, io, buf, source

dk, io, buf, source = initialize_board()

def request_readout():
   source.write([samna.xyloImu.event.TriggerReadout()])
   time.sleep(0.2)
   evts = buf.get_events()
   evts = [e for e in evts if isinstance(e, samna.xyloImu.event.Readout)]
   assert(len(evts) == 1)

   # Only two attributes of `Readout` event is available in manual mode: `timestep`, `output_v_mems`.
   return evts[0]

xylo_config = samna.xyloImu.configuration.XyloConfiguration()
xylo_config.operation_mode = samna.xyloImu.OperationMode.Manual
dk.get_model().apply_configuration(xylo_config)

# Request manually
readout = request_readout() # timestep start from -1 after reset
print("Get readout: ", readout)

source.write([samna.xyloImu.event.TriggerProcessing()])
time.sleep(0.1)             # wait for processing done which is necessary in manual mode

readout = request_readout() # timestep 0
print("Get readout: ", readout)

source.write([samna.xyloImu.event.TriggerProcessing()])
time.sleep(0.1)

readout = request_readout() # timestep 1
print("Get readout: ", readout)

source.write([samna.xyloImu.event.TriggerProcessing()])
time.sleep(0.1)

readout = request_readout() # timestep 2
print("Get readout: ", readout)