Speck 2b smart door demo#
A classic example about smart door that uses speck2b testboard to implement the following steps:
receive output dvs events to show in dvs plot
receive CNN layer outputs as Spike events to show spike count plot
calculate readout from spike count and show readout in image plot
read power measurement and show in power measurement plot
All these functionality have been tested with the following packages:
- samna 0.30.9
- torch 1.9.0
- sinabs 0.2.1.dev56
- sinabs-dynapcnn 0.2.2.dev6
Files needed (all files should be put in the same directory of python script):
Structure:
from multiprocessing import Process
import sinabs
import sinabs.backend.dynapcnn as sindynapcnn
import sinabs.from_torch
import torch
import torch.nn as nn
import samna
import samnagui
class SmartDoorClassifier(nn.Module):
def __init__(self):
super().__init__()
self.seq = nn.Sequential(
# 2 x 128 x 128
# Core 0
nn.Conv2d(2, 8, kernel_size=(2, 2), stride=(2, 2), padding=(0, 0), bias=False), # 8, 64, 64
nn.ReLU(),
nn.AvgPool2d(kernel_size=(2, 2)), # 8,32,32
# """Core 1"""
nn.Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False), # 16, 32, 32
nn.ReLU(),
nn.AvgPool2d(kernel_size=(2, 2)), # 16, 16, 16
# """Core 2"""
nn.Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False), # 8, 16, 16
nn.ReLU(),
nn.AvgPool2d(kernel_size=(2, 2)), # 16, 16, 16
nn.Flatten(),
nn.Dropout2d(0.5),
nn.Linear(8 * 8 * 8, 2, bias=False),
nn.ReLU(),
)
def forward(self, x):
return self.seq(x)
def open_speck2b():
# when speck2bDevKit, Speck2bTestboard -> Speck2bDevKit
# when speck2bDevKitTiny, Speck2bTestboard -> Speck2bDevKitTiny
return samna.device.open_device("Speck2bTestboard:0")
def algorithm_model():
model = SmartDoorClassifier()
print(model.seq[0].weight[0])
model.load_state_dict(
torch.load("./smart_door_net_ep60.pth", map_location="cpu"), strict=True
)
spiking_model = sinabs.from_torch.from_model(model, input_shape=(2, 128, 128))
# spiking_model.spiking_model.seq[11].weight.data = spiking_model.spiking_model.seq[11].weight.data*5
# - Input dimensions
input_shape = (2, 128, 128)
# - Dynapcnn compatible network
dynapcnn_net = sindynapcnn.DynapcnnCompatibleNetwork(
spiking_model,
input_shape=input_shape,
discretize=True,
dvs_input=True,
)
config = dynapcnn_net.make_config("auto", device="speck2b:0")
lyrs = dynapcnn_net.chip_layers_ordering[-1]
config.dvs_layer.monitor_enable = True
config.cnn_layers[lyrs].monitor_enable = True
return config
feature_count = 2
threshold = 60
default_feature = 0
# a custom python callback that receives an array of spike events as input and outputs an array of readout events. you are encouraged to customize it.
def custom_readout(collection):
def generate_result(feature):
e = samna.ui.Readout()
e.feature = feature
return [e]
sum = {}
for spike in collection:
if spike.feature in sum:
sum[spike.feature] += 1
else:
sum[spike.feature] = 1
if len(collection) >= threshold:
maxCount = 0
maxCountFeature = 0
maxCountNum = 0
for key in sum:
if key >= feature_count:
continue
if sum[key] > maxCount:
maxCount = sum[key]
maxCountFeature = key
maxCountNum = 1
elif sum[key] == maxCount:
maxCountNum += 1
if maxCount > 0 and 1 == maxCountNum:
return generate_result(maxCountFeature)
else:
return generate_result(default_feature)
else:
return generate_result(default_feature)
# apply the algorithm model and get a speck2b configuration.
config = algorithm_model()
streamer_endpoint = "tcp://0.0.0.0:40000"
gui_process = Process(
target=samnagui.run_visualizer, args=(visualizer_endpoint, 0.75, 0.75)
)
gui_process.start()
# open device
dk = open_speck2b()
# open power supply
# only speck2bTestboard has power module, speck2bDevKit and speck2bDevKitTiny don't have it
power = dk.get_power_monitor()
power.start_auto_power_measurement(20)
# prepare graph
graph = samna.graph.EventFilterGraph()
# prepare filters about dvs plot
# if dynapcnn: Speck2bDvsToVizConverter -> DynapcnnDvsToVizConverter
_, dvs_spike_select, _, streamer = graph.sequential(
[
dk.get_model_source_node(),
"Speck2bMemberSelect",
"Speck2bDvsToVizConverter",
"VizEventStreamer",
]
)
# filter out real spikes from pixel spikes.
dvs_spike_select.set_white_list([9], "layer")
# set streamer destination which is exactly the visualizer(visualizer.receiver).
streamer.set_streamer_endpoint(streamer_endpoint)
if streamer.wait_for_receiver_count() == 0:
raise Exception(f"connecting to visualizer on {streamer_endpoint} fails")
# prepare filters about spike count plot
# first divide spike events into groups, then cout spike events grouping by feature.
# if dynapcnn: Speck2bSpikeCollectionNode -> DynapcnnSpikeCollectionNode, Speck2bSpikeCountNode -> DynapcnnSpikeCountNode
(
_,
readout_spike_select,
spike_collection_filter,
spike_count_filter,
_,
) = graph.sequential(
[
dk.get_model_source_node(),
"Speck2bMemberSelect",
"Speck2bSpikeCollectionNode",
"Speck2bSpikeCountNode",
streamer,
]
)
readout_spike_select.set_white_list(
[3], "layer"
) # output pixels are in the format of spike events, so we need to filter out all pixel spikes.
spike_collection_filter.set_interval_milli_sec(
500
) # divide according to this time period in milliseconds.
spike_count_filter.set_feature_count(feature_count)
# prepare filters about readout plot
# add a custom filter node that receive a custom python callback. it receives spike collection and outputs the readout.
# if dynapcnn: Speck2bCustomFilterNode -> DynapcnnCustomFilterNode
_, readout_filter, _ = graph.sequential(
[spike_collection_filter, "Speck2bCustomFilterNode", streamer]
) # from spike collection to streamer
readout_filter.set_filter_function(custom_readout)
# prepare filters about power measurement plot
# there is a converter that converts power measurements from power monitor to viz events and give them to streamer.
graph.sequential(
[power.get_source_node(), "MeasurementToVizConverter", streamer]
) # to streamer
# connect to visualizer to specifiy the UI
config_source, _ = graph.sequential([samna.BasicSourceNode_ui_event(), streamer])
# graph build over
graph.start()
visualizer_config = samna.ui.VisualizerConfiguration(
# add plots to gui
plots=[
# add plot to show pixels
samna.ui.ActivityPlotConfiguration(128, 128, "DVS Layer", [0, 0, 0.5, 0.75]),
# add plot to show readout. params: plot title and images array of the same size of feature count. these images correspond to each feature.
samna.ui.ReadoutPlotConfiguration(
"Readout Layer",
["./Closed.png", "./Open.png"],
[0.5, 0, 1, 0.375],
),
# add plot to show spike count. params: plot title and feature count and name of each feature
samna.ui.SpikeCountPlotConfiguration(
title="Spike Count",
channel_count=feature_count,
line_names=["closed", "opened"],
layout=[0.5, 0.375, 1, 0.75],
show_x_span=25,
label_interval=2.5,
max_y_rate=1.2,
show_point_circle=True,
default_y_max=10,
),
# add plot to show power measurement. params: plot title and count of features (line count) and each line's name
# please note many speck2b testboards can't read power, so no line will show if you are using that kind of board.
# if dynapcnn: 5 -> 3, ["io", "ram", "logic", "vddd", "vdda"] -> ["io", "ram", "logic"]
samna.ui.PowerMeasurementPlotConfiguration(
title="Power Consumption",
channel_count=5,
line_names=["io", "ram", "logic", "vddd", "vdda"],
layout=[0, 0.75, 1, 1],
show_x_span=10,
label_interval=2,
max_y_rate=1.5,
show_point_circle=False,
default_y_max=1,
y_label_name="power (mW)",
),
]
)
config_source.write([visualizer_config])
# apply the configuration about algorithm model.
dk.get_model().apply_configuration(config)
# wait until visualizer window destroys.
gui_process.join()
# CustomFilterNode should be stopped manually before `graph.stop()` and program ends.
readout_filter.stop()
graph.stop()