Xylo-Imu Convert Acceleration to Spike

Xylo-Imu Convert Acceleration to Spike#

Convert acceleration events to input spikes by IMU_IF.

Acceleration events are consumed by IMU_IF one by one to read spike count of hidden neurons manually. samna.xyloImu.event.TriggerProcessing is made use of to stop processing after each single trigger.

Due to current deficiency of FPGA, don’t send too much acceleration events (more than 1000) in once before previous acceleration events are consumed! You better set the target timestep in samna.xyloImu.event.TriggerProcessing correctly to consume all input acceleration events precisely, then send following acceleration events, you could take implementation of batch_process function in Xylo-Imu RealTime Mode With Acceleration Input as an example.

- samna                 0.33.1

Files needed:

import samna
import numpy as np

def initialize_board() :
    dk = samna.device.open_device("XyloImuTestBoard:0")
    buf = samna.graph.sink_from(dk.get_model_source_node())
    source = samna.graph.source_to(dk.get_model_sink_node())
    return dk, buf, source

dk, buf, source = initialize_board()

def build_event_type_filters(dk, graph):
    _, etf0, register_value_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf0.set_desired_type('xyloImu::event::RegisterValue')
    _, etf1, readout_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf1.set_desired_type('xyloImu::event::Readout')
    _, etf2, interrupt_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf2.set_desired_type('xyloImu::event::Interrupt')
    _, etf3, membrane_potential_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf3.set_desired_type('xyloImu::event::MembranePotential')
    _, etf4, synaptic_current_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf4.set_desired_type('xyloImu::event::SynapticCurrent')
    _, etf5, hidden_spike_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()])
    etf5.set_desired_type('xyloImu::event::HiddenSpikeCount')

    return register_value_buf, readout_buf, interrupt_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf

graph = samna.graph.EventFilterGraph()  # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received.
register_value_buf, readout_buf, interrupt_buf, membrane_potential_buf, synaptic_current_buf, hidden_spike_buf = build_event_type_filters(dk, graph)
graph.start()       # Graph has to be started manually to work.

def decode_acceleration_events():
    import pandas as pd
    data = pd.read_csv("sample_003_device01.csv")
    events = []
    for row in range(len(data)):
        x = data.iloc[row][1]
        y = data.iloc[row][2]
        z = data.iloc[row][3]

        scale_factor = 2 ** 12
        x = round(x * scale_factor)
        y = round(y * scale_factor)
        z = round(z * scale_factor)

        events.append(samna.events.Acceleration(x=x, y=y, z=z))

    return events

def get_current_timestep():
    # Current timestep is last processed timestep.
    source.write([samna.xyloImu.event.TriggerReadout()])
    evts = readout_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds.
    assert(len(evts) == 1)
    return evts[0].timestep

def apply_configuration():
    dk.enable_manual_input_acceleration(True)   # Use manual input instead of sensor input

    xylo_config = samna.xyloImu.configuration.XyloConfiguration()
    xylo_config.operation_mode = samna.xyloImu.OperationMode.RealTime

    # Configure a custom network which reflects input spikes to hidden spikes
    input_count = 15
    hidden_count = 15
    output_count = 15

    xylo_config.input.weights = np.eye(15, dtype='int8') * 2

    xylo_config.hidden.weights = np.eye(15, dtype='int8') * -1
    hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron()] * hidden_count
    for neuron in hidden_neurons:
        neuron.threshold = 1
        neuron.v_mem_decay = 1
        neuron.i_syn_decay = 1
    xylo_config.hidden.neurons = hidden_neurons

    output_neurons = [samna.xyloImu.configuration.OutputNeuron()] * output_count
    for neuron in output_neurons:
        neuron.threshold = 1
        neuron.v_mem_decay = 1
        neuron.i_syn_decay = 1
    xylo_config.readout.neurons = output_neurons
    xylo_config.readout.weights = np.eye(15, dtype='int8') * 2

    # Preparation to run in realtime mode with input interface opened.
    xylo_config.debug.always_update_omp_stat = True
    xylo_config.imu_if_input_enable = True
    xylo_config.debug.imu_if_clk_enable = True
    xylo_config.time_resolution_wrap = 0x61a80
    xylo_config.debug.imu_if_clock_freq_div = 0x169

    # Open and config input interface of Xylo-IMU, you could customize the parameters.
    xylo_config.input_interface.enable = True
    xylo_config.input_interface.estimator_k_setting = 6
    xylo_config.input_interface.select_iaf_output = True
    xylo_config.input_interface.update_matrix_threshold = 255
    xylo_config.input_interface.delay_threshold = 1
    xylo_config.input_interface.bpf_bb_values = [6] * 15
    xylo_config.input_interface.bpf_bwf_values = [8] * 15
    xylo_config.input_interface.bpf_baf_values = [9] * 15
    xylo_config.input_interface.bpf_a1_values = [-64700, -64458, -64330, -64138, -63884, -63566, -63169, -62743, -62238, -61672, -61045, -60357, -59611, -58805, -57941]
    xylo_config.input_interface.bpf_a2_values = [31935] + [31754] * 14
    xylo_config.input_interface.scale_values = [8]*15
    xylo_config.input_interface.iaf_threshold_values = [0x000007d0] * 15

    dk.get_model().apply_configuration(xylo_config)
    return xylo_config, input_count, hidden_count, output_count
xylo_config, input_count, hidden_count, output_count = apply_configuration()

def get_readout(hidden_count, output_count):
    readouts = readout_buf.get_n_events(1, 2000)

    # Only two attributes of `Readout` event is available in manual mode: `timestep`, `output_v_mems`.
    # We have to read all other things manually in manual mode.
    assert(len(readouts) == 1)
    readout = readouts[0]

    # Read all hidden spike count
    for i in range(2):
        source.write([samna.xyloImu.event.ReadHiddenSpikeCount(neuron_id = i) for i in range(hidden_count)])
        hidden_spikes = hidden_spike_buf.get_n_events(hidden_count, 5000)
        assert(len(hidden_spikes) == hidden_count)
        readout.hidden_spikes = [e.count for e in hidden_spikes]

    return readout

def process_single(accel: samna.events.Acceleration, hidden_count: int, output_count: int):
    # Process a single acceleration event and read full readout manually.
    readout_buf.get_events()    # clear
    start_timestep = get_current_timestep()

    source.write([accel])
    source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = start_timestep + 2)])
    interrupt_events = interrupt_buf.get_n_events(1, 2000) # Try to get 1 event in 2 seconds.
    if not interrupt_events:
        # By default there is an interrupt after processing done.
        raise Exception("No interrupt occurs after processing done!")

    # receive readout
    readout = get_readout(hidden_count, output_count)
    return readout

def step_process(accel_events):
    start_timestep = get_current_timestep()
    print("*** Start timestep before step processing: ", start_timestep)

    accel_count = len(accel_events)
    spikes = []
    for accel in accel_events:
        readout = process_single(accel, hidden_count, output_count)
        # print(f"*** Readouts: ", readout)

        neuron_id = 0
        for spike_count in readout.hidden_spikes:
            for _ in range(spike_count):
                spike = samna.xyloImu.event.Spike(neuron_id = neuron_id, timestep = readout.timestep)
                print("Get spike: ", spike)
                spikes.append(spike)
            neuron_id += 1

    end_timestep = get_current_timestep()
    assert(end_timestep - start_timestep == accel_count)

    print("Readout count: ", accel_count)
    print("End timestep after step processing: ", end_timestep)
    print("\n")
    return spikes

def get_expected_spikes():
    spikes = []
    with open("expected_spikes.txt", "r") as f:
        lines = f.readlines()
        for line in lines:
            items = line.strip().split(",")
            items = [int(i) for i in items]
            spikes.append(samna.xyloImu.event.Spike(neuron_id = items[0], timestep = items[1]))
    return spikes

accel_events = decode_acceleration_events()
spikes = step_process(accel_events)

print("Get spike count: ", len(spikes))
assert(spikes == get_expected_spikes())

graph.stop()