******************************** Xylo-Imu TriggerProcessing ******************************** AcceleratedTime mode and realtime mode now supports :class:`samna.xyloImu.event.TriggerProcessing` event, which triggers specific times until target timestep reaches and allows you to: * Process spikes of last input timestep in accelerated mode. * Trigger specific times then pause and debug in realtime mode. Here are two examples that demonstrates how to use it in accelerated mode and realtime mode respectively. Examples are based on packages : :: - samna 0.33.1 1. Trigger processing in accelerated mode =============================================== .. code-block:: python import samna import time def initialize_board() : dk = samna.device.open_device("XyloImuTestBoard:0") io = dk.get_io_module() buf = samna.graph.sink_from(dk.get_model_source_node()) source = samna.graph.source_to(dk.get_model_sink_node()) return dk, io, buf, source dk, io, buf, source = initialize_board() graph = samna.graph.EventFilterGraph() _, etf, state_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()]) etf.set_desired_type('xyloImu::event::Readout') graph.start() # Please mind that this `graph` object can't be released in python while receiving events, otherwise no event will be received. def get_current_timestep(): source.write([samna.xyloImu.event.TriggerReadout()]) evts = state_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds. assert(len(evts) == 1) return evts[0].timestep def send_spike(timestep): source.write([samna.xyloImu.event.Spike(timestep=timestep)]) xylo_config = samna.xyloImu.configuration.XyloConfiguration() xylo_config.operation_mode = samna.xyloImu.OperationMode.AcceleratedTime # A tiny model is applied here. input_count = 1 rsn_count = 2 output_count = 1 xylo_config.input.weights = [[1, 1]] xylo_config.hidden.weights = [[1,1], [1,1]] hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron(), samna.xyloImu.configuration.HiddenNeuron()] xylo_config.hidden.neurons = hidden_neurons output_neurons = [samna.xyloImu.configuration.OutputNeuron()] xylo_config.readout.neurons = output_neurons xylo_config.readout.weights = [[1], [1]] # Configure neuron ranges to monitor which is necessary for hidden neurons in accelerated mode. xylo_config.debug.monitor_neuron_i_syn = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count + output_count) xylo_config.debug.monitor_neuron_v_mem = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count + output_count) xylo_config.debug.monitor_neuron_spike = samna.xyloImu.configuration.NeuronRange(neuron_id = 0, length = rsn_count) dk.get_model().apply_configuration(xylo_config) # Evolve normally init_timestep = get_current_timestep() print("initial timestep for first run: ", init_timestep) # which is -1 after reset test_step_count = 10 for step in range(1, test_step_count+1): curr_timestep = init_timestep+step print(f"send a spike with timestep {curr_timestep}") send_spike(curr_timestep) # Sending a TriggerProcessing event to make xylo core process to specific timestep. # Typically below line ensure timestep `init_timestep+test_step_count` is triggered which is used to process spike of last timestep. source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = init_timestep+test_step_count+1)]) readouts = state_buf.get_n_events(test_step_count, 3000) # Try to get specific count of events in a blocking way, if it gets timeout before all events are received, just return the received events. assert(len(readouts) == test_step_count) # All spikes are processed. print(f"Get {len(readouts)} readout events: ", readouts) # Evolve with missing spike init_timestep = get_current_timestep() print("initial timestep for second run: ", init_timestep) # timestep 9 test_step_count = 10 for step in range(1, test_step_count+1): if step <= 5: curr_timestep = init_timestep+step print(f"send a spike with timestep {curr_timestep}") send_spike(curr_timestep) # We don't input any spike for last 5 timesteps, but it can't prevent us from processing on them by using `TriggerProcessing` event. # Typically below line ensure timesteps until `init_timestep+test_step_count`(included) is triggered which is used for processing last several timesteps. source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = init_timestep+test_step_count+1)]) readouts = state_buf.get_n_events(test_step_count, 3000) # Give enough time for all timesteps without spike to be processed. assert(len(readouts) == test_step_count) # All spikes are processed. print(f"Get {len(readouts)} readout events: ", readouts) print("timestep after run: ", get_current_timestep()) # timestep 19 # no effect if `TriggerProcessing` of previous timestep or default timestep is sent in accelerated mode. source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = init_timestep+test_step_count)]) time.sleep(0.1) print("timestep after run: ", get_current_timestep()) # timestep 19 source.write([samna.xyloImu.event.TriggerProcessing()]) time.sleep(0.1) print("timestep after run: ", get_current_timestep()) # timestep 19 graph.stop() 2. Trigger processing in realtime mode =============================================== .. code-block:: python import samna import time def initialize_board() : dk = samna.device.open_device("XyloImuTestBoard:0") buf = samna.graph.sink_from(dk.get_model_source_node()) source = samna.graph.source_to(dk.get_model_sink_node()) return dk, buf, source dk, buf, source = initialize_board() graph = samna.graph.EventFilterGraph() _, etf, state_buf = graph.sequential([dk.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink()]) etf.set_desired_type('xyloImu::event::Readout') graph.start() def get_current_timestep(): source.write([samna.xyloImu.event.TriggerReadout()]) evts = state_buf.get_n_events(1, 3000) # Try to get 1 event in 3 seconds. assert(len(evts) == 1) return evts[0].timestep xylo_config = samna.xyloImu.configuration.XyloConfiguration() xylo_config.operation_mode = samna.xyloImu.OperationMode.RealTime dk.enable_manual_input_acceleration(False) # Use sensor input instead of manual input. # A tiny model is applied here. input_count = 1 rsn_count = 2 output_count = 1 xylo_config.time_resolution_wrap = 0x7a120 # Resolution wrap is set to 10ms if main clock rate is 50MHz which is the default value. xylo_config.input.weights = [[1, 1]] xylo_config.hidden.weights = [[1,1], [1,1]] hidden_neurons = [samna.xyloImu.configuration.HiddenNeuron(), samna.xyloImu.configuration.HiddenNeuron()] xylo_config.hidden.neurons = hidden_neurons output_neurons = [samna.xyloImu.configuration.OutputNeuron()] xylo_config.readout.neurons = output_neurons xylo_config.readout.weights = [[1], [1]] dk.get_model().apply_configuration(xylo_config) timestep = get_current_timestep() print("Initial timestep in realtime mode before first trigger: ", timestep) # which is -1 after reset # Feel free to do things like reading/writing in realtime mode before first trigger, we call now as debug time of realtime mode. run_trigger_count = 10 source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = timestep + run_trigger_count + 1)]) # Run timestep 0,1,2,...,9 # timestep start from 0 after first trigger readouts = state_buf.get_n_events(run_trigger_count, 3000) # Try to get 10 readouts in 3 seconds. Make sure to give enough time for all triggers to complete, otherwise less readouts will be received. assert(len(readouts) == run_trigger_count) for r in readouts: print(r) # Now it's debug time of realtime mode again. You can do anything you like (including switching to other mode) to debug. print("Enter debug time of realtime mode, current timestep: ", get_current_timestep()) # timestep 9 # no effect if `TriggerProcessing` of previous timestep is sent. source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = timestep + run_trigger_count)]) time.sleep(0.1) print("Current timestep after TriggerProcessing of previous timestep is sent: ", get_current_timestep()) # timestep 9 # Start processing again after debug timestep = get_current_timestep() source.write([samna.xyloImu.event.TriggerProcessing(target_timestep = timestep + run_trigger_count + 1)]) # Run timestep 10,11,12,...,19 readouts = state_buf.get_n_events(run_trigger_count, 3000) assert(len(readouts) == run_trigger_count) for r in readouts: print(r) print("Enter debug time of realtime mode again, current timestep: ", get_current_timestep()) # timestep 19 # Run infinitely with `TriggerProcessing` event with default timestep. print("Start to trigger infinitely...") source.write([samna.xyloImu.event.TriggerProcessing()]) receiving_duration = 3 begin = time.time() recv_count = 0 while time.time() - begin < receiving_duration: evts = state_buf.get_events() if evts: recv_count += len(evts) time.sleep(0.01) print(f"Running infinitely in realtime mode. Receiving {recv_count} readout events in {receiving_duration} seconds. Still running.") graph.stop()