Using Recorded data with Dynap-CNN dev kit#

This example is similar to example: gesture recognition except recorded data is used here. The functionality have been tested, it bases on packages :

- samna                 0.30.23
- numpy                 1.23.1
- torch                 1.13.1
- sinabs                1.2.5
- sinabs-dynapcnn       1.0.10
- aermanager            0.3.0

A recorded data is a aedat4 format file and we can read spike events from it directly, so no dvs is needed.

Please note that it will take us some time to read from file in python.

There are two files : gesture.py below and algorithm_helper.py in example: gesture recognition.

If you want to see difference, just search #-#.

File needed: bptt-11-500-5-1500.pth and gesture.aedat4

Structure:

dynapcnn recorded data flow structure
# gesture.py
from multiprocessing import Process

from aermanager.aerparser import load_events_from_file
from algorithm_helper import algorithm_process, get_algorithm_model_config

import samna
import samnagui


# -# read data from file. the recorded data is saved in aedat4 format, which can be load via aermanager.
def read_spike_events_from_recorded_data():
    filename = "gesture.aedat4"
    shape, events = load_events_from_file(filename)

    """
    to convert it to spike events. And since the resolution of DVXplorer is 320x240, we need to convert to 128x128 which is the resolution the DYNAPCNN acceptable. Here we crops a 256x256 area on the DVXplorer (add 8 to Y axis to make the final image lose the same pixels above and below) and then compress to 128x128 resolution.
    """
    spike_events = []
    t = 0
    decimationCount = 0
    for e in events:
        if e[0] > 31 and e[0] < 288:
            decimationCount = decimationCount + 1
            if decimationCount == 4:  # decimate pixels by 1/4
                spike = samna.dynapcnn.event.Spike()
                spike.feature = e[3]
                spike.x = (e[0] - 32) >> 1
                spike.y = (e[1] + 8) >> 1
                spike.layer = 13
                spike.timestamp = 0
                spike_events.append(spike)
                t = t + 1
                decimationCount = 0

    return spike_events


# sinabs library somehow wraps open device.
def open_dynapcnn():
    return samna.device.open_device("DynapcnnDevKit:0")


def route_input(dk, input_graph):
    inputBuf = samna.BasicSourceNode_dynapcnn_event_input_event()

    input_graph.sequential([inputBuf, dk.get_model_sink_node()])

    input_graph.start()

    return inputBuf


# build a graph to filter events from dynapcnn model to visualizer
def route_output(dk, output_graph, streamer_endpoint):
    # create filter nodes
    # DynapcnnDvsToVizConverter: convert output dvs events to viz events which can be recognized by visualizer
    # VizEventStreamer: viz events tcp sender
    _, _, streamer = output_graph.sequential(
        [dk.get_model_source_node(), "DynapcnnDvsToVizConverter", "VizEventStreamer"]
    )
    config_source, _ = output_graph.sequential(
        [samna.BasicSourceNode_ui_event(), streamer]
    )

    # Set the streamer destination
    streamer.set_streamer_endpoint(streamer_endpoint)
    if streamer.wait_for_receiver_count() == 0:
        raise Exception(
            f"connecting to visualizer on {streamer_endpoint} fails, please open visualizer first!"
        )

    def add_readout():
        # create layer selecter, only events from layer 3 can pass, which is the output layer of our model
        _, readouter, _ = output_graph.sequential(
            [dk.get_model_source_node(), "DynapcnnMemberSelect", readoutBuf]
        )
        readouter.set_white_list([3], "layer")

    add_readout()

    output_graph.start()

    # initialize a screen in visualizer window to display dvs events.
    config_source.write(
        [
            samna.ui.VisualizerConfiguration(
                plots=[samna.ui.ActivityPlotConfiguration(128, 128, "DVS Layer")]
            )
        ]
    )

def open_visualizer(window_width, window_height, receiver_endpoint):
    # start visualizer in a isolated process which is required on mac, intead of a sub process.
    gui_process = Process(
        target=samnagui.run_visualizer,
        args=(receiver_endpoint, window_width, window_height),
    )
    gui_process.start()

    return gui_process


# get the configuration from the algorithm model
config = get_algorithm_model_config()

print("begin to read spike events from file...")
spike_events = read_spike_events_from_recorded_data()
print("read over, events num: ", len(spike_events))

streamer_endpoint = "tcp://0.0.0.0:40000"

gui_process = open_visualizer(0.75, 0.75, streamer_endpoint)

dk = open_dynapcnn()
dk.get_model().apply_configuration(config)

# in graph object's destructor, `stop` method will be called, so we need to put them outside.
output_graph = samna.graph.EventFilterGraph()
readoutBuf = (
    samna.BasicSinkNode_dynapcnn_event_output_event()
)  # receive the spike events

# -# no input dvs , no input filters, because we use outside data.
route_output(dk, output_graph, streamer_endpoint)

# -# send spike events to dev-kit
input_graph = samna.graph.EventFilterGraph()
inputBuf = route_input(dk, input_graph)
inputBuf.write(spike_events)

# to calc gesture from dynapcnn's output
algorithm_process(readoutBuf, gui_process)

output_graph.stop()