Sensor Fusion

sensor_fusion.py

Overview

sensor_fusion.py is a fusion node that synthesizes data from a planar LiDAR scanner and a single-beam sonar into a unified sensor_msgs/LaserScan. This lightweight placeholder lets sensor-fusion and obstacle-avoidance pipelines be exercised without complex probabilistic filters or real hardware.

Interfaces (strongly-typed, stateless)

Direction

Topic

Message type / Notes

Required (sub)

/{robot}/lidar

sensor_msgs/LaserScan – 180° planar scan

Required (sub)

/{robot}/sonar

sensor_msgs/Range – single-beam ultrasound

Provided (pub)

/{robot}/fused_scan

sensor_msgs/LaserScan – LiDAR + centred sonar reading fused

Contract

Pre-conditions

  • LiDAR and sonar topics publish at compatible rates (~10 Hz).

  • LiDAR ranges length matches the pseudo-scan length generated from sonar.

Post-conditions

  • Each call to publish_fused() emits exactly one fused LaserScan.

  • Fused ranges = element-wise sum of LiDAR ranges and the adapted sonar array.

Invariants

  • Sonar reading is placed at the centre index; all other beam values are unchanged.

Implementation notes

  • Fusion is attempted on every sonar callback and in a fixed-rate loop.

  • Adapter expands a scalar sonar reading into a zero-filled list with the value at the centre.

  • Holds no extra state beyond the last messages, ideal for hot-reload and tests.

class sensor_fusion.Adapter[source]

Bases: object

Expand a scalar sonar range into a list matching the LiDAR beam count.

The sonar value is inserted at the centre index; all other positions are zero so that simple addition embeds the sonar reading into the LiDAR fan.

class sensor_fusion.SensorFusionNode[source]

Bases: object

Merge LiDAR and sonar information into a single LaserScan.

Variables

lidar_datasensor_msgs.msg.LaserScan | None

Last LiDAR scan received.

sonar_datasensor_msgs.msg.Range | None

Last sonar reading received.

adapted_sonarlist[float]

Sonar value expanded to match LiDAR beam count.

Register the node, wire subscriptions and publisher, and initialise state.

lidar_callback(data: sensor_msgs.msg.LaserScan) None[source]

Store the most recent LiDAR scan.

Parameters:

data (sensor_msgs.msg.LaserScan) – Scan received from /lidar.

merge_data() list[float][source]

Combine the LiDAR and adapted sonar arrays element-wise.

Returns:

Sum of both sensors; length == len(self.lidar_data.ranges).

Return type:

list[float]

publish_fused() None[source]

Produce and publish a fused LaserScan if both inputs are ready.

Workflow

  1. Verify both self.lidar_data and self.sonar_data are present.

  2. Use Adapter to expand the scalar sonar reading into a list matching the LiDAR beam count.

  3. Call merge_data() to sum the two arrays.

  4. Populate a new LaserScan, copy LiDAR header and geometry fields, attach fused ranges, and publish.

sonar_callback(data: sensor_msgs.msg.Range) None[source]

Store the sonar reading and trigger a fusion attempt.

Parameters:

data (sensor_msgs.msg.Range) – Range measurement received from /sonar.