3rd Party Cameras#
Samna includes support for some 3rd party DVS event cameras.
These cameras are models, much like the Speck models, and have a source node that streams data. Consequently cameras can be hooked up to the rest of Samna’s filter system as expected.
Currently the following cameras are supported and can be used as arguments to samna.device.open_device()
Davis240
: Inivation DAVIS240 camera seriesDavis346
: Inivation DAVIS346 camera seriesDVXplorer
: Inivation DVXplorer camera seriesPseeCamera
: Prophesee DVS cameras supported by OpenEB 3.0.2
All cameras adhere to the following uniform interface.
get_source_node() -> DeviceSourceNode
Returns the device source node that can be connected to samna’s filter system. Any event produced by the camera will appear in this source node.start() -> None
Start the camera recording.stop() -> None
Stop the camera recording.
The demo script below visualises the camera output using samnagui.
import samna
import samnagui
import socket
from multiprocessing import Process
class Visualizer:
"""Helper class to run the visualiser in a different process"""
def __init__(self, show_dvs_size):
self.endpoint = None
self.gui_process = None
self.__open_visualizer(show_dvs_size)
def get_endpoint(self):
if self.endpoint:
return self.endpoint
free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
free_socket.bind(("0.0.0.0", 0))
free_socket.listen(5)
self.endpoint = f"tcp://0.0.0.0:{free_socket.getsockname()[1]}"
free_socket.close()
return self.endpoint
def join(self):
self.gui_process.join()
def __open_visualizer(self, show_dvs_size):
window_width = 3 / 4
window_height = 9 / 16
dvs_layout = [0, 0, 1, 1]
visualizer_config = samna.ui.VisualizerConfiguration(
plots=[
samna.ui.ActivityPlotConfiguration(
*show_dvs_size, "DVS Activity", dvs_layout
),
]
)
self.gui_process = Visualizer.start_visualizer_process(
window_width, window_height, self.get_endpoint()
)
graph = samna.graph.EventFilterGraph()
config_source, streamer = graph.sequential(
[samna.BasicSourceNode_ui_event(), "VizEventStreamer"]
)
streamer.set_streamer_endpoint(self.get_endpoint())
if streamer.wait_for_receiver_count() == 0:
raise Exception(f"connecting to visualizer on {self.get_endpoint()} fails")
graph.start()
config_source.write([visualizer_config])
graph.stop()
@staticmethod
def start_visualizer_process(window_width, window_height, receiver_endpoint):
gui_process = Process(
target=samnagui.run_visualizer,
args=(receiver_endpoint, window_width, window_height),
)
gui_process.start()
return gui_process
visualizer = Visualizer(show_dvs_size=(128, 128))
# Open a device using the device string
camera = samna.device.open_device("PseeCamera")
# First we define a filter graph to process the device output
graph = samna.graph.EventFilterGraph()
# Take the dvs events of the device and stream it directly to the visualizer
_, _, streamer = graph.sequential(
[camera.get_source_node(), "CameraToVizConverter", "VizEventStreamer"]
)
streamer.set_streamer_endpoint(visualizer.get_endpoint())
# Start the graph and camera to process events
graph.start()
camera.start()
# Wait until visualizer window is destroyed
visualizer.join()
# Stop the graph and camera
camera.stop()
graph.stop()