3rd Party Cameras

3rd Party Cameras#

Samna includes support for some 3rd party DVS event cameras.

These cameras are models, much like the Speck models, and have a source node that streams data. Consequently cameras can be hooked up to the rest of Samna’s filter system as expected.

Currently the following cameras are supported and can be used as arguments to samna.device.open_device()

  • Davis240: Inivation DAVIS240 camera series

  • Davis346: Inivation DAVIS346 camera series

  • DVXplorer: Inivation DVXplorer camera series

  • PseeCamera: Prophesee DVS cameras supported by OpenEB 3.0.2

All cameras adhere to the following uniform interface.

  • get_source_node() -> DeviceSourceNode Returns the device source node that can be connected to samna’s filter system. Any event produced by the camera will appear in this source node.

  • start() -> None Start the camera recording.

  • stop() -> None Stop the camera recording.

The demo script below visualises the camera output using samnagui.

import samna
import samnagui
import socket
from multiprocessing import Process


class Visualizer:
    """Helper class to run the visualiser in a different process"""
    def __init__(self, show_dvs_size):
        self.endpoint = None
        self.gui_process = None

        self.__open_visualizer(show_dvs_size)

    def get_endpoint(self):
        if self.endpoint:
            return self.endpoint
        free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        free_socket.bind(("0.0.0.0", 0))
        free_socket.listen(5)
        self.endpoint = f"tcp://0.0.0.0:{free_socket.getsockname()[1]}"
        free_socket.close()
        return self.endpoint

    def join(self):
        self.gui_process.join()

    def __open_visualizer(self, show_dvs_size):
        window_width = 3 / 4
        window_height = 9 / 16
        dvs_layout = [0, 0, 1, 1]

        visualizer_config = samna.ui.VisualizerConfiguration(
            plots=[
                samna.ui.ActivityPlotConfiguration(
                    *show_dvs_size, "DVS Activity", dvs_layout
                ),
            ]
        )

        self.gui_process = Visualizer.start_visualizer_process(
            window_width, window_height, self.get_endpoint()
        )

        graph = samna.graph.EventFilterGraph()

        config_source, streamer = graph.sequential(
            [samna.BasicSourceNode_ui_event(), "VizEventStreamer"]
        )

        streamer.set_streamer_endpoint(self.get_endpoint())
        if streamer.wait_for_receiver_count() == 0:
            raise Exception(f"connecting to visualizer on {self.get_endpoint()} fails")

        graph.start()
        config_source.write([visualizer_config])
        graph.stop()

    @staticmethod
    def start_visualizer_process(window_width, window_height, receiver_endpoint):
        gui_process = Process(
            target=samnagui.run_visualizer,
            args=(receiver_endpoint, window_width, window_height),
        )
        gui_process.start()

        return gui_process


visualizer = Visualizer(show_dvs_size=(128, 128))

# Open a device using the device string
camera = samna.device.open_device("PseeCamera")

# First we define a filter graph to process the device output
graph = samna.graph.EventFilterGraph()

# Take the dvs events of the device and stream it directly to the visualizer
_, _, streamer = graph.sequential(
    [camera.get_source_node(), "CameraToVizConverter", "VizEventStreamer"]
)
streamer.set_streamer_endpoint(visualizer.get_endpoint())

# Start the graph and camera to process events
graph.start()
camera.start()

# Wait until visualizer window is destroyed
visualizer.join()

# Stop the graph and camera
camera.stop()
graph.stop()