Xylo-Imu Accelerated Mode

Xylo-Imu Accelerated Mode#

Accelerated mode is a mode that mocks realtime mode of Xylo-IMU:

  • Input spikes are processed automatically.

  • Processing result named Readout is read by FPGA automatically.

  • It runs in a high speed comparable to realtime mode.

  • It stops automatically when all input spikes are processed.

  • You can do whatever you like as manual mode when it stops.

Here is an example that display typical usage of accelerated mode, which bases on packages :

- samna                 0.33.1
- numpy                 1.24.3
import samna
import numpy as np

def initialize_board() :
    dk = samna.device.open_device("XyloImuTestBoard:0")
    buf = samna.graph.sink_from(dk.get_model_source_node())
    source = samna.graph.source_to(dk.get_model_sink_node())
    return dk, buf, source

dk, buf, source = initialize_board()

graph = samna.graph.EventFilterGraph()  # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.
_, etf, readout_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
etf.set_desired_type('xyloImu::event::Readout')
graph.start()       # Graph has to be started manually to work.

def get_current_timestep():
    # Current timestep is last processed timestep.
    source.write([samna.xyloImu.event.TriggerReadout()])
    evts = readout_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
    assert(len(evts) == 1)
    return evts[0].timestep

def apply_configuration():
    xylo_config = samna.xyloImu.configuration.XyloConfiguration()
    xylo_config.operation_mode = samna.xyloImu.OperationMode.AcceleratedTime

    input_count = 3
    hidden_count = 5
    output_count = 2
    xylo_config.input.weights = [[1] * hidden_count] * input_count
    xylo_config.hidden.weights = [[1] * hidden_count] * hidden_count
    hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron()] * hidden_count
    xylo_config.hidden.neurons = hidden_neurons
    output_neurons = [samna.xyloImu.configuration.OutputNeuron()] * output_count
    xylo_config.readout.neurons = output_neurons
    xylo_config.readout.weights = [[1] * output_count] * hidden_count

    # Configure neuron ranges to monitor which is necessary for hidden neurons in accelerated mode.
    xylo_config.debug.monitor_neuron_i_syn = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = hidden_count + output_count)
    xylo_config.debug.monitor_neuron_v_mem = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = hidden_count + output_count)
    xylo_config.debug.monitor_neuron_spike = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = hidden_count)

    dk.get_model().apply_configuration(xylo_config)
    return xylo_config
xylo_config = apply_configuration()

def evolve(input: np.ndarray):
    # Process all input spikes including spikes of last timestep.
    # input (np.ndarray): A raster ``(Timestep, NumberOfSpikesInChannels)`` specifying for each bin the number of input events sent to the corresponding input channel on Xylo, at the corresponding time point. Up to 15 input events can be sent per bin.
    start_timestep = get_current_timestep() + 1
    timestep_count = len(input)
    if not timestep_count: return
    final_timestamp = start_timestep + timestep_count - 1
    print("** To be processed timesteps: ", [timestep for timestep in range(start_timestep, final_timestamp + 1)])

    input_events_list = []

    # Locate input events
    spikes = np.argwhere(input)
    counts = input[np.nonzero(input)]

    # Generate input events
    for timestep, channel, count in zip(spikes[:, 0], spikes[:, 1], counts):
        for _ in range(count):
            event = samna.xyloImu.event.Spike()
            event.neuron_id = channel
            event.timestep = start_timestep + timestep
            input_events_list.append(event)

    # Add an extra event to ensure readout for entire input extent
    event = samna.xyloImu.event.TriggerProcessing(target_timestep = final_timestamp + 1)
    input_events_list.append(event)

    source.write(input_events_list)

    readouts = readout_buf.get_n_events(timestep_count, 10000)     # Wait for `timestep_count` of readout events in 10 seconds.
    assert(len(readouts) == timestep_count)

    print("** Readout events of evolve: ")
    for readout in readouts:
        print(readout)
    print("\n")
    return readouts

readouts = evolve(np.array([
    [0, 2, 1],      # Timestep 0. No spike on neuron 0, 2 spikes on neuron 1, 1 spike on neuron 2.
    [0, 0, 0],      # Timestep 1. No spike on all input neuron.
    [7, 8, 9],      # Timestep 2. 7 spikes on neuron 0, 8 spikes on neuron 1, 9 spikes on neuron 2.
    ]))

readouts = evolve(np.array([
    [2, 2, 2],      # Timestep 3
    ]))

readouts = evolve(np.array([
    [9, 9, 9],      # Timestep 4
    [0, 0, 0],      # Timestep 5
    [0, 0, 0],      # Timestep 6
    ]))

assert(get_current_timestep() == 6)     # Last processed timestep is 6
graph.stop()