Display Speck 2b dvs

Display Speck 2b dvs#

An example that uses a speck2bTestboard to display output spike events of speck2b model in visualizer.

It also supports printing output events from model.

This example bases on packages :

- samna                 0.30.9

Structure:

Speck 2b dvs display structure
import time
from multiprocessing import Process
from threading import Thread

import samna
import samnagui


def open_speck2b():
    # when speck2bDevKit, Speck2bTestboard -> Speck2bDevKit
    # when speck2bDevKitTiny, Speck2bTestboard -> Speck2bDevKitTiny
    return samna.device.open_device("Speck2bTestboard:0")


def build_samna_event_route(dk, dvs_graph, streamer_endpoint):
    # build a graph in samna to show dvs
    _, dvs_converter, streamer = dvs_graph.sequential(
        [dk.get_model_source_node(), "Speck2bDvsToVizConverter", "VizEventStreamer"]
    )

    config_source, _ = dvs_graph.sequential(
        [samna.BasicSourceNode_ui_event(), streamer]
    )

    streamer.set_streamer_endpoint(streamer_endpoint)
    if streamer.wait_for_receiver_count() == 0:
        raise Exception(f"connecting to visualizer on {streamer_endpoint} fails")

    return config_source


def create_output_buffer(dk):
    buf = samna.graph.sink_from(dk.get_model_source_node())
    return buf


def print_model_output(buf):
    while True:
        time.sleep(0.1)
        evts = buf.get_events()
        for e in evts:
            if isinstance(e, samna.speck2b.event.Spike):
                print(e)


def open_visualizer(window_width, window_height, receiver_endpoint):
    # start visualizer in a isolated process which is required on mac, intead of a sub process.
    gui_process = Process(
        target=samnagui.run_visualizer,
        args=(receiver_endpoint, window_width, window_height),
    )
    gui_process.start()

    return gui_process


streamer_endpoint = "tcp://0.0.0.0:40000"

visualizer, gui_process = open_visualizer(0.75, 0.75, streamer_endpoint)

dk = open_speck2b()
io = dk.get_io_module()

stopWatch = dk.get_stop_watch()
stopWatch.set_enable_value(True)  # open timestamp

# route events
dvs_graph = samna.graph.EventFilterGraph()
config_source = build_samna_event_route(dk, dvs_graph)

config_source.write(
    [
        samna.ui.VisualizerConfiguration(
            plots=[samna.ui.ActivityPlotConfiguration(128, 128, "DVS Layer")]
        )
    ]
)

dvs_graph.start()

# modify configuration
config = samna.speck2b.configuration.SpeckConfiguration()
config.dvs_layer.monitor_sensor_enable = True  # enable spike events to output.
dk.get_model().apply_configuration(config)

# print dvs
buf = create_output_buffer(dk)
t1 = Thread(target=print_model_output, args=(buf,))
t1.setDaemon(True)
t1.start()

# wait until visualizer window destroys.
gui_process.join()

dvs_graph.stop()