Gesture recognition with Dynap-CNN dev kit#

An example that uses a dynapcnnDevkit and a DVXplorer camera to do below:

  • display dynapcnnDevkit output dvs events which are inputted by camera in visualizer

  • receive CNN layer outputs as Spike events from DynapcnnDevkit

  • recognize gesture from spike events and print it out

All these functionality have been tested, it bases on packages :

- samna                 0.30.23
- torch                 1.13.1
- sinabs                1.2.5
- sinabs-dynapcnn       1.0.10

There are two files: gesture.py which contains the main code and algorithm_helper.py which contains algorithm related code.

File needed: bptt-11-500-5-1500.pth

Structure:

dynapcnn gesture data flow structure
# gesture.py
from multiprocessing import Process

from algorithm_helper import algorithm_process, get_algorithm_model_config

import samna
import samnagui


# open two devices: dynapcnn and camera
def open_dynapcnn():
    return samna.device.open_device("DynapcnnDevKit:0")


def open_camera():
    return samna.device.open_device("DVXplorer:0")


# build a graph to filter events from camera to dynapcnn model
def route_input(camera, dk, input_graph):
    # process dvs events and put them to dynapcnn
    # Add node in filter graph and nodes are connected automatically.
    (
        _,
        dvs_crop_node,
        dvs_rescale_node,
        dvs_decimate_node,
        _,
        _,
    ) = input_graph.sequential(
        [
            camera.get_source_node(),
            "DvsEventCrop",
            "DvsEventRescale",
            "DvsEventDecimate",
            "CameraToDynapcnnInput",
            dk.get_model().get_sink_node(),
        ]
    )

    # we only want the pixels in the rectangle (including edges) whose top left corner is (31,0) and bottom right corner is (288,240) pass and passed pixels will set the top left corner as new origin.
    dvs_crop_node.set_roi(31, 0, 288, 240)

    # we divide x and y axis by 2
    dvs_rescale_node.set_rescaling_coefficients(2, 2)

    # every 15 events we pick 1 event
    dvs_decimate_node.set_decimation_fraction(15, 1)

    input_graph.start()


# build a graph to filter events from dynapcnn model to visualizer
def route_output(dk, output_graph):
    # create filter nodes
    # DynapcnnDvsToVizConverter: convert output dvs events to viz events which can be recognized by visualizer
    # VizEventStreamer: viz events tcp sender
    _, _, streamer = output_graph.sequential(
        [dk.get_model_source_node(), "DynapcnnDvsToVizConverter", "VizEventStreamer"]
    )
    config_source, _ = output_graph.sequential(
        [samna.BasicSourceNode_ui_event(), streamer]
    )

    # Set the streamer destination
    streamer.set_streamer_endpoint("tcp://0.0.0.0:40000")
    if streamer.wait_for_receiver_count() == 0:
        raise Exception(
            f"connecting to visualizer on {streamer_endpoint} fails, please open visualizer first!"
        )

    def add_readout():
        # create layer selecter, only events from layer 3 can pass, which is the output layer of our model
        _, readouter, _ = output_graph.sequential(
            [dk.get_model_source_node(), "DynapcnnMemberSelect", readoutBuf]
        )
        readouter.set_white_list([3], "layer")

    add_readout()

    output_graph.start()

    # initialize a screen in visualizer window to display dvs events.
    config_source.write(
        [
            samna.ui.VisualizerConfiguration(
                plots=[samna.ui.ActivityPlotConfiguration(128, 128, "DVS Layer")]
            )
        ]
    )


def open_visualizer(window_width, window_height, receiver_endpoint):
    # start visualizer in a isolated process which is required on mac, intead of a sub process.
    gui_process = Process(
        target=samnagui.run_visualizer,
        args=(receiver_endpoint, window_width, window_height),
    )
    gui_process.start()

    return gui_process


# get the configuration from the algorithm model
config = get_algorithm_model_config()

streamer_endpoint = "tcp://0.0.0.0:40000"

gui_process = open_visualizer(0.75, 0.75, streamer_endpoint)

dk = open_dynapcnn()
dk.get_model().apply_configuration(config)
camera = open_camera()

# in graph object's destructor, `stop` method will be called, so we need to put them outside.
input_graph = samna.graph.EventFilterGraph()
output_graph = samna.graph.EventFilterGraph()
readoutBuf = (
    samna.BasicSinkNode_dynapcnn_event_output_event()
)  # receive the spike events

route_input(camera, dk, input_graph)
route_output(dk, output_graph)

# Start DVS camera
camera.start()

# to calc gesture from dynapcnn's output
algorithm_process(readoutBuf, gui_process)

camera.stop()
input_graph.stop()
output_graph.stop()
# algorithm_helper.py
import time
import numpy as np
import torch
from sinabs.backend.dynapcnn import DynapcnnNetwork
from sinabs.from_torch import from_model
from torch import nn


# The model is defined as blew.
class GestureClassifier(nn.Module):
    def __init__(self):
        super().__init__()

        self.seq = nn.Sequential(
            # Core 0
            # nn.AvgPool2d(kernel_size=(2,2)), # 2 ,32 , 32
            nn.Conv2d(
                2, 8, kernel_size=(2, 2), stride=(2, 2), padding=(0, 0), bias=False
            ),  # 8, 64, 64
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(2, 2)),  # 8,32,32
            # """Core 1"""
            # nn.Dropout2d(0.5),
            nn.Conv2d(
                8, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False
            ),  # 16, 32, 32
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(2, 2)),  # 16, 16, 16
            # """Core 2"""
            nn.Dropout2d(0.5),
            nn.Conv2d(
                16, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False
            ),  # 8, 16, 16
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(2, 2)),  # 8x8x8
            nn.Flatten(),
            nn.Dropout2d(0.5),
            nn.Linear(8 * 8 * 8, 11, bias=False),
            nn.ReLU(),
        )

    def forward(self, x):
        return self.seq(x)


def get_algorithm_model_config():
    device = torch.device(
        "cpu"
    )  # when running with a real chip, we use cpu instead of cuda.

    input_shape = (2, 128, 128)
    model_analog = GestureClassifier()
    stat_dic = torch.load("bptt-11-500-5-1500.pth", map_location=device)

    # The first thing is to convert the CNN model to a SNN model, and this can be done in one line through the from_model method provided by Sinabs.
    model_spiking = from_model(
        model_analog, input_shape=input_shape, batch_size=1
    ).spiking_model
    model_spiking.eval()

    model_spiking.state_dict()["seq.0.weight"][:] = torch.nn.Parameter(
        stat_dic["model.0.weight"] * 5
    )
    model_spiking.state_dict()["seq.3.weight"][:] = torch.nn.Parameter(
        stat_dic["model.3.weight"]
    )
    model_spiking.state_dict()["seq.7.weight"][:] = torch.nn.Parameter(
        stat_dic["model.7.weight"]
    )
    model_spiking.state_dict()["seq.12.weight"][:] = torch.nn.Parameter(
        stat_dic["model.12.weight"]
    )

    # Then we need to generate the DYNAPTM-CNN compatible model based on this SNN model, this can be done in one step through Sinabs.
    model_spiking = DynapcnnNetwork(
        model_spiking.seq, discretize=True, input_shape=input_shape, dvs_input=True
    )

    # make the configuration which would be applied to the chip
    config = model_spiking.make_config(
        chip_layers_ordering="auto", device="dynapcnndevkit:0"
    )

    config.dvs_layer.monitor_enable = True
    config.factory_settings.monitor_input_enable = False
    config.cnn_layers[3].monitor_enable = True
    return config


def majority_readout(buffer, gui_process, t_interval=0.5, labels=512, threshold=None):
    """Find the most active neuron in a given time interval, and print
    the corresponding class.

    Args:
        buffer (object implementing get_events()): The buffer to read from.
        t_interval (float): Time interval in seconds (default 0.5).
        labels (list or int): If int, number of neurons in the readout layer.\
            The neuron number will be printed as a label in this case. If list,\
            a list of label names, one for each neuron.
        threshold (None or int): If not None, do not print anything when the \
            activity is below `threshold` spikes per second.
    """

    if np.isscalar(labels):
        labels = range(labels)

    counts = np.zeros(len(labels), dtype=int)

    while gui_process.is_alive():
        for ev in buffer.get_events():
            counts[ev.feature] += 1
        print(counts, counts.sum())
        # if threshold is None or counts.sum() > threshold / t_interval:
        if counts.sum() > threshold:
            print(labels[np.argmax(counts)])
        else:
            print(" ")

        counts[...] = 0
        time.sleep(t_interval)


def algorithm_process(readoutBuf, gui_process):
    LABELS = [
        "hand clap",
        "right hand wave",
        "left hand wave",
        "right arm clockwise",
        "right arm counterclockwise",
        "left arm clockwise",
        "left arm counterclockwise",
        "arm roll",
        "air drums",
        "air guitar",
        "other gestures",
    ]
    # starts reading the buffer and printing outputs
    majority_readout(
        readoutBuf, gui_process, t_interval=0.5, labels=LABELS, threshold=8
    )