Display Dynap-CNN dev kit dvs#

An example that uses a dynapcnnDevkit and a DVXplorer camera to display dynapcnnDevkit output dvs events in visualizer. the dvs events outputted by dynapcnnDevkit is the events inputted by DVXplorer.

It also support printing all output events from model.

Please adjust the focus to get a clear image.

This example bases on packages :

- samna                 0.30.23

Structure:

dynapcnn dvs display data flow structure
import time
from multiprocessing import Process
from threading import Thread

import samna
import samnagui


# open two devices: dynapcnn and camera
def open_dynapcnn():
    return samna.device.open_device("DynapcnnDevKit:0")


def open_camera():
    return samna.device.open_device("DVXplorer:0")


# build a graph to filter events from camera to dynapcnn model
def route_input(camera, dk, input_graph):
    # process dvs events and put them to dynapcnn
    # Add node in filter graph and nodes are connected automatically.
    (
        _,
        dvs_crop_node,
        dvs_rescale_node,
        dvs_decimate_node,
        _,
        _,
    ) = input_graph.sequential(
        [
            camera.get_source_node(),
            "DvsEventCrop",
            "DvsEventRescale",
            "DvsEventDecimate",
            "CameraToDynapcnnInput",
            dk.get_model().get_sink_node(),
        ]
    )

    # we only want the pixels in the rectangle (including edges) whose top left corner is (31,0) and bottom right corner is (288,240) pass and passed pixels will set the top left corner as new origin.
    dvs_crop_node.set_roi(31, 0, 288, 240)

    # we divide x and y axis by 2
    dvs_rescale_node.set_rescaling_coefficients(2, 2)

    # every 15 events we pick 1 event
    dvs_decimate_node.set_decimation_fraction(15, 1)

    input_graph.start()

    """
    if we are using Davis346 camera, this function will be different:

    # Add node in filter graph
    _, dvs_crop_node, dvs_decimate_node, _, _ = input_graph.sequential([
        camera.get_source_node(), "DvsEventCrop", "DvsEventDecimate", "CameraToDynapcnnInput", dk.get_model().get_sink_node()])

    # we only want the pixels inside the rectangle
    # whose top left corner is (100,100) and bottom right corner is (288,228) pass
    dvs_crop_node.set_roi(100, 100, 228, 228)

    # every 3 events we pick 1 event
    dvs_decimate_node.set_decimation_fraction(3,1)

    input_graph.start()
    """


# build a graph to filter events from dynapcnn model to visualizer
def route_output(dk, output_graph, streamer_endpoint):
    # create filter nodes
    # DynapcnnDvsToVizConverter: convert output dvs events to viz events which can be recognized by visualizer
    # VizEventStreamer: viz events tcp sender
    _, _, streamer = output_graph.sequential(
        [dk.get_model_source_node(), "DynapcnnDvsToVizConverter", "VizEventStreamer"]
    )
    config_source, _ = output_graph.sequential(
        [samna.BasicSourceNode_ui_event(), streamer]
    )

    # Set the streamer destination
    streamer.set_streamer_endpoint(streamer_endpoint)
    if streamer.wait_for_receiver_count() == 0:
        raise Exception(
            f"connecting to visualizer on {streamer_endpoint} fails, please open visualizer first!"
        )

    # use BasicSinkNode to receive final events that sent to visualizer
    def add_buf_to_print():
        buf = samna.graph.sink_from(dk.get_model_source_node())

        def record():
            while True:
                time.sleep(1)
                b = buf.get_events()
                for e in b:
                    print("output event: ", e)

        t1 = Thread(target=record)
        t1.setDaemon(True)
        t1.start()

    add_buf_to_print()

    output_graph.start()

    # initialize a screen in visualizer window to display dvs events.
    config_source.write(
        [
            samna.ui.VisualizerConfiguration(
                plots=[samna.ui.ActivityPlotConfiguration(128, 128, "DVS Layer")]
            )
        ]
    )


def open_visualizer(window_width, window_height, receiver_endpoint):
    # start visualizer in a isolated process which is required on mac, intead of a sub process.
    gui_process = Process(
        target=samnagui.run_visualizer,
        args=(receiver_endpoint, window_width, window_height),
    )
    gui_process.start()

    return gui_process


streamer_endpoint = "tcp://0.0.0.0:40000"

gui_process = open_visualizer(0.75, 0.75, streamer_endpoint)

dk = open_dynapcnn()
camera = open_camera()
io = dk.get_io_module()

# in graph object's destructor, `stop` method will be called, so we need to put them outside.
input_graph = samna.graph.EventFilterGraph()
output_graph = samna.graph.EventFilterGraph()

route_input(camera, dk, input_graph)
route_output(dk, output_graph, streamer_endpoint)

# apply a configuration to the model. you can customize your configuration here.
config = samna.dynapcnn.configuration.DynapcnnConfiguration()
config.dvs_layer.monitor_enable = True  # let it output dvs events
dk.get_model().apply_configuration(config)

# camera should start when everything is ready.
camera.start()

# wait until visualizer window destroys.
gui_process.join()

camera.stop()
input_graph.stop()
output_graph.stop()