Xylo-Imu RealTime Mode With Acceleration Input#

Xylo-IMU supports debugging with acceleration data in realtime mode. You could pause easily to debug like manual mode by controlling your input.

Here is an example that display typical usage of it, which bases on packages :

- samna                 0.33.1
- numpy                 1.24.3
import samna
import numpy as np

def initialize_board() :
    dk = samna.device.open_device("XyloImuTestBoard:0")
    buf = samna.graph.sink_from(dk.get_model_source_node())
    source = samna.graph.source_to(dk.get_model_sink_node())
    return dk, buf, source

dk, buf, source = initialize_board()

def build_event_type_filters(dk, graph):
    _, etf0, readout_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf0.set_desired_type('xyloImu::event::Readout')
    _, etf1, membrane_potential_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf1.set_desired_type('xyloImu::event::MembranePotential')
    _, etf2, synaptic_current_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf2.set_desired_type('xyloImu::event::SynapticCurrent')
    _, etf3, hidden_spike_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf3.set_desired_type('xyloImu::event::HiddenSpikeCount')

    return readout_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf

graph = samna.graph.EventFilterGraph()  # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.
readout_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf = build_event_type_filters(dk, graph)
graph.start()       # Graph has to be started manually to work.

def get_current_timestep():
    # Current timestep is last processed timestep.
    source.write([samna.xyloImu.event.TriggerReadout()])
    evts = readout_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
    assert(len(evts) == 1)
    return evts[0].timestep

def apply_configuration():
    dk.enable_manual_input_acceleration(True)   # Use manual input instead of sensor input

    xylo_config = samna.xyloImu.configuration.XyloConfiguration()
    xylo_config.operation_mode = samna.xyloImu.OperationMode.RealTime

    input_count = 3
    hidden_count = 5
    output_count = 2
    xylo_config.input.weights = [[1] * hidden_count] * input_count
    xylo_config.hidden.weights = [[1] * hidden_count] * hidden_count
    hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron()] * hidden_count
    xylo_config.hidden.neurons = hidden_neurons
    output_neurons = [samna.xyloImu.configuration.OutputNeuron()] * output_count
    xylo_config.readout.neurons = output_neurons
    xylo_config.readout.weights = [[1] * output_count] * hidden_count

    # Preparation to run in realtime mode with input interface opened.
    xylo_config.debug.always_update_omp_stat = True
    xylo_config.imu_if_input_enable = True
    xylo_config.debug.imu_if_clk_enable = True
    xylo_config.time_resolution_wrap = 0x61a80
    xylo_config.debug.imu_if_clock_freq_div = 0x169

    # Open and config input interface of Xylo-IMU, you could customize the parameters.
    xylo_config.input_interface.enable = True
    xylo_config.input_interface.estimator_k_setting = 6
    xylo_config.input_interface.select_iaf_output = True
    xylo_config.input_interface.update_matrix_threshold = 255
    xylo_config.input_interface.delay_threshold = 1
    xylo_config.input_interface.bpf_bb_values = [6] * 15
    xylo_config.input_interface.bpf_bwf_values = [8] * 15
    xylo_config.input_interface.bpf_baf_values = [9] * 15
    xylo_config.input_interface.bpf_a1_values = [-64700, -64458, -64330, -64138, -63884, -63566, -63169, -62743, -62238, -61672, -61045, -60357, -59611, -58805, -57941]
    xylo_config.input_interface.bpf_a2_values = [31935] + [31754] * 14
    xylo_config.input_interface.scale_values = [8]*15
    xylo_config.input_interface.iaf_threshold_values = [0x000007d0] * 15

    dk.get_model().apply_configuration(xylo_config)
    return xylo_config, input_count, hidden_count, output_count
xylo_config, input_count, hidden_count, output_count = apply_configuration()

def decode_acceleration_events(acceleration_data: np.ndarray):
    # acceleration_data (np.ndarray): A raster ``(event count, 3)`` whose first dimension is the number of acceleration events and second dimension is fixed at 3 which indicates x, y, z.
    assert(acceleration_data.ndim == 2)
    assert(acceleration_data.shape[1] == 3)

    events = []
    for i in range(len(acceleration_data)):
        data = acceleration_data[i]
        events.append(samna.events.Acceleration(x = data[0], y = data[1], z = data[2]))
    return events

def batch_process(acceleration_data: np.ndarray):
    # Process specific count of acceleration data and return readouts.
    # One acceleration event corresponds to a single readout exactly.
    # Readouts of this function are read by FPGA automatically, only three attributes are available for the readouts returned by this function: `timestep`, `output_spike`, `output_v_mem`.
    # If you need all attributes of readout to be available, you can call `step_process` which read manually and run slower instead.
    start_timestep = get_current_timestep()
    print("*** Start timestep before batch processing: ", start_timestep)

    # Write acceleration events
    accel_events = decode_acceleration_events(acceleration_data)
    readout_buf.get_events()    # clear
    source.write(accel_events)
    accel_count = len(accel_events)

    # Trigger specific count to digest all input acceleration exactly.
    source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = start_timestep + accel_count + 1)])

    # Wait for all data to be processed and receive all readout.
    readouts = readout_buf.get_n_events(accel_count, 10000) # Try to get `accel_count` events in 10 seconds.
    assert(len(readouts) == accel_count)
    print("*** Readouts:")
    for readout in readouts:
        print(readout)

    print("End timestep after batch processing: ", get_current_timestep())
    print("\n")

def process_single(accel: samna.events.Acceleration, hidden_count: int, output_count: int):
    # Process a single acceleration event and read full readout manually.
    start_timestep = get_current_timestep()

    readout_buf.get_events()    # clear
    source.write([accel])
    source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = start_timestep + 2)])

    # receive readout
    readouts = readout_buf.get_n_events(1, 2000)    # Wait for 1 readout in 2 seconds
    assert(len(readouts) == 1)
    readout = readouts[0]
    # Only three attributes are available for the readout: `timestep`, `output_spike`, `output_v_mem`.
    # So we need to read other attributes manually in debug status.

    # Read all membrane potentials
    for _ in range(2):      # Due to a bug on chip, you have to read memory twice to ensure it's correct.
        source.write([samna.xyloImu.event.ReadMembranePotential(neuron_id = i) for i in range(hidden_count + output_count)])
        membrane_potentials = membrane_potential_buf.get_n_events(hidden_count + output_count, 5000)
        assert(len(membrane_potentials) == hidden_count + output_count)
        readout.neuron_v_mems = [e.value for e in membrane_potentials]

    # Read all synaptic current
    for _ in range(2):      # Due to a bug on chip, you have to read memory twice to ensure it's correct.
        source.write([samna.xyloImu.event.ReadSynapticCurrent(neuron_id = i) for i in range(hidden_count + output_count)])
        synaptic_currents = synaptic_current_buf.get_n_events(hidden_count + output_count, 5000)
        assert(len(synaptic_currents) == hidden_count + output_count)
        readout.neuron_i_syns = [e.value for e in synaptic_currents]

    # Read all hidden spike count
    source.write([samna.xyloImu.event.ReadHiddenSpikeCount(neuron_id = i) for i in range(hidden_count)])
    hidden_spikes = hidden_spike_buf.get_n_events(hidden_count, 5000)
    assert(len(hidden_spikes) == hidden_count)
    readout.hidden_spikes = [e.count for e in hidden_spikes]

    return readout


def step_process(acceleration_data: np.ndarray):
    # Similar to `batch_process` except all attributes of readouts returned by this function is available.
    # As most information of readouts are read manually in debug status, so it runs more slowly than `batch_process`.
    start_timestep = get_current_timestep()
    print("*** Start timestep before step processing: ", start_timestep)

    # Decode acceleration events
    accel_events = decode_acceleration_events(acceleration_data)
    accel_count = len(accel_events)
    readouts = []
    for accel in accel_events:
        readouts.append(process_single(accel, hidden_count, output_count))

    end_timestep = get_current_timestep()
    assert(end_timestep - start_timestep == accel_count)
    print("*** Readouts:")
    for readout in readouts:
        print(readout)

    print("End timestep after step processing: ", end_timestep)
    print("\n")

# ------- bach process ------- #
# Run faster, but only timestep and information about output neurons are available in readout.
readouts = batch_process(np.array([
    [50, 80, 90],       # Acceleration data with x=50,y=80,z=90 which is to be processed at timestep 0.
    [0, -40, -60],      # Acceleration data with x=0,y=-40,z=-60 which is to be processed at timestep 1.
    [0, 0, 0],          # Acceleration data with x=0,y=0,z=0 which is to be processed at timestep 2.
    ]))

# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 2)     # Last processed timestep is 2

readouts = batch_process(np.array([
    [400, -100, -50],
    [0, 0, 0],
    [0, 0, 0],
    [0, 0, 0],
    ]))

# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 6)     # Last processed timestep is 6

readouts = batch_process(np.array([
    [100, 200, 300],
    [0, 0, 0],
    [-200, -900, -1000],
    ]))
assert(get_current_timestep() == 9)     # Last processed timestep is 9


# ------- step process ------- #
# Run slower and all attributes of readout are available.
readouts = step_process(np.array([
    [50, 80, 90],       # Acceleration data with x=50,y=80,z=90 which is to be processed at timestep 10.
    [0, -40, -60],      # Acceleration data with x=0,y=-40,z=-60 which is to be processed at timestep 11.
    [0, 0, 0],          # Acceleration data with x=0,y=0,z=0 which is to be processed at timestep 12.
    ]))

# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 12)     # Last processed timestep is 12

readouts = step_process(np.array([
    [400, -100, -50],
    [0, 0, 0],
    [0, 0, 0],
    [0, 0, 0],
    ]))

# Chip is in debug state after processing, you can do anything you like as manual mode now.
assert(get_current_timestep() == 16)     # Last processed timestep is 16

readouts = step_process(np.array([
    [100, 200, 300],
    [0, 0, 0],
    [-200, -900, -1000],
    ]))
assert(get_current_timestep() == 19)     # Last processed timestep is 19

graph.stop()