Speck 2b smart door demo#

A classic example about smart door that uses speck2b testboard to implement the following steps:

  • receive output dvs events to show in dvs plot

  • receive CNN layer outputs as Spike events to show spike count plot

  • calculate readout from spike count and show readout in image plot

  • read power measurement and show in power measurement plot

All these functionality have been tested with the following packages:

- samna                 0.30.9
- torch                 1.9.0
- sinabs                0.2.1.dev56
- sinabs-dynapcnn       0.2.2.dev6

Files needed (all files should be put in the same directory of python script):

Structure:

speck2b smart door structure
from multiprocessing import Process

import sinabs
import sinabs.backend.dynapcnn as sindynapcnn
import sinabs.from_torch
import torch
import torch.nn as nn

import samna
import samnagui


class SmartDoorClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.seq = nn.Sequential(
            # 2 x 128 x 128
            # Core 0
            nn.Conv2d(2, 8, kernel_size=(2, 2), stride=(2, 2), padding=(0, 0), bias=False),  # 8, 64, 64
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(2, 2)),  # 8,32,32
            # """Core 1"""
            nn.Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),  # 16, 32, 32
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(2, 2)),  # 16, 16, 16
            # """Core 2"""
            nn.Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),  # 8, 16, 16
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(2, 2)),  # 16, 16, 16
            nn.Flatten(),
            nn.Dropout2d(0.5),
            nn.Linear(8 * 8 * 8, 2, bias=False),
            nn.ReLU(),
        )

    def forward(self, x):
        return self.seq(x)


def open_speck2b():
    # when speck2bDevKit, Speck2bTestboard -> Speck2bDevKit
    # when speck2bDevKitTiny, Speck2bTestboard -> Speck2bDevKitTiny
    return samna.device.open_device("Speck2bTestboard:0")


def algorithm_model():
    model = SmartDoorClassifier()
    print(model.seq[0].weight[0])
    model.load_state_dict(
        torch.load("./smart_door_net_ep60.pth", map_location="cpu"), strict=True
    )
    spiking_model = sinabs.from_torch.from_model(model, input_shape=(2, 128, 128))
    # spiking_model.spiking_model.seq[11].weight.data = spiking_model.spiking_model.seq[11].weight.data*5
    # - Input dimensions
    input_shape = (2, 128, 128)
    # - Dynapcnn compatible network
    dynapcnn_net = sindynapcnn.DynapcnnCompatibleNetwork(
        spiking_model,
        input_shape=input_shape,
        discretize=True,
        dvs_input=True,
    )

    config = dynapcnn_net.make_config("auto", device="speck2b:0")
    lyrs = dynapcnn_net.chip_layers_ordering[-1]
    config.dvs_layer.monitor_enable = True
    config.cnn_layers[lyrs].monitor_enable = True
    return config


feature_count = 2
threshold = 60
default_feature = 0


# a custom python callback that receives an array of spike events as input and outputs an array of readout events. you are encouraged to customize it.
def custom_readout(collection):
    def generate_result(feature):
        e = samna.ui.Readout()
        e.feature = feature
        return [e]

    sum = {}
    for spike in collection:
        if spike.feature in sum:
            sum[spike.feature] += 1
        else:
            sum[spike.feature] = 1

    if len(collection) >= threshold:
        maxCount = 0
        maxCountFeature = 0
        maxCountNum = 0
        for key in sum:
            if key >= feature_count:
                continue
            if sum[key] > maxCount:
                maxCount = sum[key]
                maxCountFeature = key
                maxCountNum = 1
            elif sum[key] == maxCount:
                maxCountNum += 1

        if maxCount > 0 and 1 == maxCountNum:
            return generate_result(maxCountFeature)
        else:
            return generate_result(default_feature)
    else:
        return generate_result(default_feature)


# apply the algorithm model and get a speck2b configuration.
config = algorithm_model()

streamer_endpoint = "tcp://0.0.0.0:40000"

gui_process = Process(
    target=samnagui.run_visualizer, args=(visualizer_endpoint, 0.75, 0.75)
)
gui_process.start()

# open device
dk = open_speck2b()

# open power supply
# only speck2bTestboard has power module, speck2bDevKit and speck2bDevKitTiny don't have it
power = dk.get_power_monitor()
power.start_auto_power_measurement(20)


# prepare graph
graph = samna.graph.EventFilterGraph()

# prepare filters about dvs plot
# if dynapcnn: Speck2bDvsToVizConverter -> DynapcnnDvsToVizConverter
_, dvs_spike_select, _, streamer = graph.sequential(
    [
        dk.get_model_source_node(),
        "Speck2bMemberSelect",
        "Speck2bDvsToVizConverter",
        "VizEventStreamer",
    ]
)

# filter out real spikes from pixel spikes.
dvs_spike_select.set_white_list([9], "layer")

# set streamer destination which is exactly the visualizer(visualizer.receiver).
streamer.set_streamer_endpoint(streamer_endpoint)
if streamer.wait_for_receiver_count() == 0:
    raise Exception(f"connecting to visualizer on {streamer_endpoint} fails")


# prepare filters about spike count plot
# first divide spike events into groups, then cout spike events grouping by feature.
# if dynapcnn: Speck2bSpikeCollectionNode -> DynapcnnSpikeCollectionNode, Speck2bSpikeCountNode -> DynapcnnSpikeCountNode
(
    _,
    readout_spike_select,
    spike_collection_filter,
    spike_count_filter,
    _,
) = graph.sequential(
    [
        dk.get_model_source_node(),
        "Speck2bMemberSelect",
        "Speck2bSpikeCollectionNode",
        "Speck2bSpikeCountNode",
        streamer,
    ]
)

readout_spike_select.set_white_list(
    [3], "layer"
)  # output pixels are in the format of spike events, so we need to filter out all pixel spikes.

spike_collection_filter.set_interval_milli_sec(
    500
)  # divide according to this time period in milliseconds.
spike_count_filter.set_feature_count(feature_count)


# prepare filters about readout plot
# add a custom filter node that receive a custom python callback. it receives spike collection and outputs the readout.
# if dynapcnn: Speck2bCustomFilterNode -> DynapcnnCustomFilterNode
_, readout_filter, _ = graph.sequential(
    [spike_collection_filter, "Speck2bCustomFilterNode", streamer]
)  # from spike collection to streamer
readout_filter.set_filter_function(custom_readout)

# prepare filters about power measurement plot
# there is a converter that converts power measurements from power monitor to viz events and give them to streamer.
graph.sequential(
    [power.get_source_node(), "MeasurementToVizConverter", streamer]
)  # to streamer

# connect to visualizer to specifiy the UI
config_source, _ = graph.sequential([samna.BasicSourceNode_ui_event(), streamer])

# graph build over
graph.start()

visualizer_config = samna.ui.VisualizerConfiguration(
    # add plots to gui
    plots=[
        # add plot to show pixels
        samna.ui.ActivityPlotConfiguration(128, 128, "DVS Layer", [0, 0, 0.5, 0.75]),
        # add plot to show readout. params: plot title and images array of the same size of feature count. these images correspond to each feature.
        samna.ui.ReadoutPlotConfiguration(
            "Readout Layer",
            ["./Closed.png", "./Open.png"],
            [0.5, 0, 1, 0.375],
        ),
        # add plot to show spike count. params: plot title and feature count and name of each feature
        samna.ui.SpikeCountPlotConfiguration(
            title="Spike Count",
            channel_count=feature_count,
            line_names=["closed", "opened"],
            layout=[0.5, 0.375, 1, 0.75],
            show_x_span=25,
            label_interval=2.5,
            max_y_rate=1.2,
            show_point_circle=True,
            default_y_max=10,
        ),
        # add plot to show power measurement. params: plot title and count of features (line count) and each line's name
        # please note many speck2b testboards can't read power, so no line will show if you are using that kind of board.
        # if dynapcnn: 5 -> 3,  ["io", "ram", "logic", "vddd", "vdda"] -> ["io", "ram", "logic"]
        samna.ui.PowerMeasurementPlotConfiguration(
            title="Power Consumption",
            channel_count=5,
            line_names=["io", "ram", "logic", "vddd", "vdda"],
            layout=[0, 0.75, 1, 1],
            show_x_span=10,
            label_interval=2,
            max_y_rate=1.5,
            show_point_circle=False,
            default_y_max=1,
            y_label_name="power (mW)",
        ),
    ]
)

config_source.write([visualizer_config])

# apply the configuration about algorithm model.
dk.get_model().apply_configuration(config)

# wait until visualizer window destroys.
gui_process.join()

#  CustomFilterNode should be stopped manually before `graph.stop()` and program ends.
readout_filter.stop()
graph.stop()