Skip to content

📝 API Reference

Aegear: a computer vision toolkit for tracking and analyzing fish behavior in controlled aquaculture environments.

calibration

Scene calibration module.

This module is used to calibrate the camera and the scene size to get the pixel to cm ratio. It includes a class SceneCalibration that handles the calibration process, including loading camera parameters, assigning scene reference points, calibrating the scene, and rectifying images. The calibration is performed using a set of screen points and a set of real-world reference points.

The class also provides a method to rectify images based on the calibration parameters. It uses OpenCV for image processing and assumes that the camera calibration parameters are stored in a file. The calibration points are expected to be in a specific order: top left, top right, bottom right, bottom left.

Note that this reference matching system is put in place due allow inconsistent camera placement with respect to the original take of the calibration pattern. This calibration system uses this information to rectify the image for easier tracking of the fish, and to estimate the pixel to cm ratio, hence allowing the correct metric tracking of the fish within the experiment.

SceneCalibration

Calibration of the camera and the scene size to get the pixel to cm ratio.

Source code in src/aegear/calibration.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class SceneCalibration:
    """
    Calibration of the camera and the scene size to get the pixel to cm ratio.
    """

    # Sample points used in the Russian Sturgeon experiment, Fazekas et al, 2025.
    DEFAULT_SCENE_REF = np.array([[0, 0], [149.0, 5.0], [149.0, 35.0], [0.0, 40.0]], dtype=np.float32)

    def __init__(self, calibration_path: str, scene_reference=DEFAULT_SCENE_REF):
        """
        Constructor.

        Parameters
        ----------
        calibration_path : str
            Path to the calibration file.
        scene_reference : np.ndarray, optional
            The reference points for the scene. 4x2 array of floats, designating the borders
            of the reference area used for final image rectification and pixel to cm ratio calculation.
            The default value is assume from the Russian Sturgeon experiment, Fazekas et al, 2025.
        """
        self.mtx, self.dist = self._load_calibration(calibration_path)
        self._scene_reference = scene_reference
        self._perspectiveTransform = None

    def _load_calibration(self, calibration_path: str) -> Tuple[np.ndarray, np.ndarray]:
        """
        Load the camera calibration parameters from a file.
        """

        storage = cv2.FileStorage(calibration_path, cv2.FILE_STORAGE_READ)
        mtx = storage.getNode("mtx").mat()
        dist = storage.getNode("dist").mat()
        storage.release()

        return (mtx, dist)

    def assign_scene_calibration(self, points: List[Tuple[float, float]]):
        """
        Assign the scene calibration points.

        Parameters
        ----------

        points : list
            The scene reference points to use for calibration.
            The 4x2 array of floats, designating the borders of the reference area used for final image rectification and pixel to cm ratio calculation.
            By convention, the points are in the order: top left, top right, bottom right, bottom left.
        """
        points = np.array(points, dtype=np.float32)
        assert points.shape == (4, 2), "Real points must be a 4x2 array"
        self._scene_reference = points

    def calibrate(self, screen_pts: List[Tuple[float, float]]) -> float:
        """
        Run the scene characterization.

        Parameters
        ----------
        screen_pts : list
            The screen points to use for calibration, which within the scene match the points assigned for the scene reference.
            As for the reference points, the points are in the order: top left, top right, bottom right, bottom left.

        Returns
        -------
        float
            The pixel to cm ratio.
        """
        sample_pts = np.array(screen_pts, dtype=np.float32)
        assert sample_pts.shape == (4, 2), "Screen points must be a 4x2 array"

        sample_pts = cv2.undistortPoints(
            np.array(sample_pts, dtype=np.float32).reshape(-1, 1, 2),
            self.mtx,
            self.dist,
            P=self.mtx
        ).reshape(-1, 2) # Reshape to (N, 2) for direct use

        sample_avg_scale = np.mean(np.linalg.norm(np.diff(sample_pts, axis=0)))
        scene_avg_scale = np.mean(np.linalg.norm(np.diff(self._scene_reference, axis=0)))

        img_scaling_factor = sample_avg_scale / scene_avg_scale 

        # move points to match starting x position of samples, and scale up to image scale
        transformed_real_pts = self._scene_reference * img_scaling_factor + sample_pts[0, :]

        # do perspective transform to rectify image
        persp_T = cv2.getPerspectiveTransform(sample_pts, transformed_real_pts)

        # add homogeneous coordinate
        sample_pts = np.hstack((sample_pts, np.ones((4, 1))))

        # also warp points to be able to calculate pixel to cm ratio
        sample_pts = np.dot(persp_T, sample_pts.T).T

        # divide by homogeneous coordinate
        sample_pts = sample_pts[:, 0:2] / sample_pts[:, 2].reshape((4, 1))

        # now calculate pixel to cm ratio
        sample_avg_scale = np.mean(np.linalg.norm(np.diff(sample_pts, axis=0)))
        pixel_to_cm_ratio = scene_avg_scale / sample_avg_scale

        self._perspectiveTransform = persp_T

        return pixel_to_cm_ratio

    def rectify_image(self, image: np.ndarray) -> np.ndarray:
        """
        Rectify the image.

        Parameters
        ----------
        image : np.ndarray
            The image to rectify.

        Returns
        -------
        np.ndarray
            The rectified image.

        """
        assert self._perspectiveTransform is not None, "Need to calibrate first"

        ret_image = cv2.undistort(image, self.mtx, self.dist)
        ret_image = cv2.warpPerspective(ret_image, self._perspectiveTransform, image.shape[0:2][::-1])

        return ret_image

    def rectify_point(self, point: tuple[float, float]) -> tuple[float, float]:
        """
        Rectify a single point using the current calibration.

        Parameters
        ----------
        point : tuple of float
            The (x, y) coordinates of the point to rectify.

        Returns
        -------
        tuple of float
            The rectified (x, y) coordinates.
        """
        assert self._perspectiveTransform is not None, "Need to calibrate first"

        # Step 1: Undistort
        undistorted_pt = cv2.undistortPoints(
            np.array([[point]], dtype=np.float32),
            self.mtx,
            self.dist,
            P=self.mtx
        )[0, 0]

        # Step 2: Perspective transform
        rectified_pt = cv2.perspectiveTransform(
            np.array([[undistorted_pt]], dtype=np.float32),
            self._perspectiveTransform
        )[0, 0]

        return tuple(rectified_pt)

    def unrectify_point(self, point: tuple[float, float]) -> tuple[float, float]:
        """
        Map a point from the rectified image back to its original (distorted) image coordinates.
        """
        assert self._perspectiveTransform is not None, "Need to calibrate first"

        # 1. undo the perspective warp
        inv_T = np.linalg.inv(self._perspectiveTransform)
        pt = np.array([point[0], point[1], 1.0], dtype=np.float32)
        undist_h = inv_T.dot(pt)
        undist_px = undist_h[:2] / undist_h[2]

        # 2. convert back to normalized camera coords
        inv_mtx = np.linalg.inv(self.mtx)
        uv1 = np.array([undist_px[0], undist_px[1], 1.0], dtype=np.float32)
        norm = inv_mtx.dot(uv1).reshape(1, 3)

        # 3. project through intrinsics+distortion to get the original pixel
        rvec = np.zeros(3, dtype=np.float32)
        tvec = np.zeros(3, dtype=np.float32)
        img_pts, _ = cv2.projectPoints(norm, rvec, tvec, self.mtx, self.dist)
        x, y = img_pts[0, 0]

        return (float(x), float(y))

assign_scene_calibration(points)

Assign the scene calibration points.

Parameters
list

The scene reference points to use for calibration. The 4x2 array of floats, designating the borders of the reference area used for final image rectification and pixel to cm ratio calculation. By convention, the points are in the order: top left, top right, bottom right, bottom left.

Source code in src/aegear/calibration.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def assign_scene_calibration(self, points: List[Tuple[float, float]]):
    """
    Assign the scene calibration points.

    Parameters
    ----------

    points : list
        The scene reference points to use for calibration.
        The 4x2 array of floats, designating the borders of the reference area used for final image rectification and pixel to cm ratio calculation.
        By convention, the points are in the order: top left, top right, bottom right, bottom left.
    """
    points = np.array(points, dtype=np.float32)
    assert points.shape == (4, 2), "Real points must be a 4x2 array"
    self._scene_reference = points

calibrate(screen_pts)

Run the scene characterization.

Parameters

screen_pts : list The screen points to use for calibration, which within the scene match the points assigned for the scene reference. As for the reference points, the points are in the order: top left, top right, bottom right, bottom left.

Returns

float The pixel to cm ratio.

Source code in src/aegear/calibration.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def calibrate(self, screen_pts: List[Tuple[float, float]]) -> float:
    """
    Run the scene characterization.

    Parameters
    ----------
    screen_pts : list
        The screen points to use for calibration, which within the scene match the points assigned for the scene reference.
        As for the reference points, the points are in the order: top left, top right, bottom right, bottom left.

    Returns
    -------
    float
        The pixel to cm ratio.
    """
    sample_pts = np.array(screen_pts, dtype=np.float32)
    assert sample_pts.shape == (4, 2), "Screen points must be a 4x2 array"

    sample_pts = cv2.undistortPoints(
        np.array(sample_pts, dtype=np.float32).reshape(-1, 1, 2),
        self.mtx,
        self.dist,
        P=self.mtx
    ).reshape(-1, 2) # Reshape to (N, 2) for direct use

    sample_avg_scale = np.mean(np.linalg.norm(np.diff(sample_pts, axis=0)))
    scene_avg_scale = np.mean(np.linalg.norm(np.diff(self._scene_reference, axis=0)))

    img_scaling_factor = sample_avg_scale / scene_avg_scale 

    # move points to match starting x position of samples, and scale up to image scale
    transformed_real_pts = self._scene_reference * img_scaling_factor + sample_pts[0, :]

    # do perspective transform to rectify image
    persp_T = cv2.getPerspectiveTransform(sample_pts, transformed_real_pts)

    # add homogeneous coordinate
    sample_pts = np.hstack((sample_pts, np.ones((4, 1))))

    # also warp points to be able to calculate pixel to cm ratio
    sample_pts = np.dot(persp_T, sample_pts.T).T

    # divide by homogeneous coordinate
    sample_pts = sample_pts[:, 0:2] / sample_pts[:, 2].reshape((4, 1))

    # now calculate pixel to cm ratio
    sample_avg_scale = np.mean(np.linalg.norm(np.diff(sample_pts, axis=0)))
    pixel_to_cm_ratio = scene_avg_scale / sample_avg_scale

    self._perspectiveTransform = persp_T

    return pixel_to_cm_ratio

rectify_image(image)

Rectify the image.

Parameters

image : np.ndarray The image to rectify.

Returns

np.ndarray The rectified image.

Source code in src/aegear/calibration.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def rectify_image(self, image: np.ndarray) -> np.ndarray:
    """
    Rectify the image.

    Parameters
    ----------
    image : np.ndarray
        The image to rectify.

    Returns
    -------
    np.ndarray
        The rectified image.

    """
    assert self._perspectiveTransform is not None, "Need to calibrate first"

    ret_image = cv2.undistort(image, self.mtx, self.dist)
    ret_image = cv2.warpPerspective(ret_image, self._perspectiveTransform, image.shape[0:2][::-1])

    return ret_image

rectify_point(point)

Rectify a single point using the current calibration.

Parameters

point : tuple of float The (x, y) coordinates of the point to rectify.

Returns

tuple of float The rectified (x, y) coordinates.

Source code in src/aegear/calibration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def rectify_point(self, point: tuple[float, float]) -> tuple[float, float]:
    """
    Rectify a single point using the current calibration.

    Parameters
    ----------
    point : tuple of float
        The (x, y) coordinates of the point to rectify.

    Returns
    -------
    tuple of float
        The rectified (x, y) coordinates.
    """
    assert self._perspectiveTransform is not None, "Need to calibrate first"

    # Step 1: Undistort
    undistorted_pt = cv2.undistortPoints(
        np.array([[point]], dtype=np.float32),
        self.mtx,
        self.dist,
        P=self.mtx
    )[0, 0]

    # Step 2: Perspective transform
    rectified_pt = cv2.perspectiveTransform(
        np.array([[undistorted_pt]], dtype=np.float32),
        self._perspectiveTransform
    )[0, 0]

    return tuple(rectified_pt)

unrectify_point(point)

Map a point from the rectified image back to its original (distorted) image coordinates.

Source code in src/aegear/calibration.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def unrectify_point(self, point: tuple[float, float]) -> tuple[float, float]:
    """
    Map a point from the rectified image back to its original (distorted) image coordinates.
    """
    assert self._perspectiveTransform is not None, "Need to calibrate first"

    # 1. undo the perspective warp
    inv_T = np.linalg.inv(self._perspectiveTransform)
    pt = np.array([point[0], point[1], 1.0], dtype=np.float32)
    undist_h = inv_T.dot(pt)
    undist_px = undist_h[:2] / undist_h[2]

    # 2. convert back to normalized camera coords
    inv_mtx = np.linalg.inv(self.mtx)
    uv1 = np.array([undist_px[0], undist_px[1], 1.0], dtype=np.float32)
    norm = inv_mtx.dot(uv1).reshape(1, 3)

    # 3. project through intrinsics+distortion to get the original pixel
    rvec = np.zeros(3, dtype=np.float32)
    tvec = np.zeros(3, dtype=np.float32)
    img_pts, _ = cv2.projectPoints(norm, rvec, tvec, self.mtx, self.dist)
    x, y = img_pts[0, 0]

    return (float(x), float(y))

datasets

TrackingDataset

Bases: Dataset

Source code in src/aegear/datasets.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
class TrackingDataset(Dataset):

    _MAX_NEGATIVE_OFFSET = 50  # Maximum offset for negative samples

    def __init__(
        self,
        tracking_data,
        video_dir="",
        output_size=128,
        crop_size=168,
        future_frame_seek=[1, 3, 5, 7],
        random_pick_future_seek=False,
        interpolation_smoothness=0.5,
        temporal_jitter_range=0,
        gaussian_sigma=6.0,
        augmentation_transform=None,
        rotation_range=None,
        scale_range=None,
        negative_sample_prob=0.0,
        centroid_perturbation_range=0.0,
    ):

        self.video_path = os.path.join(video_dir, tracking_data["video"])
        self.tracking = sorted(
            tracking_data["tracking"], key=lambda x: x["frame_id"])
        self.smooth_trajectory, self.min_frame, self.max_frame = self._interpolate_tracking(
            interpolation_smoothness)
        self.future_frame_seek = future_frame_seek
        self.output_size = output_size
        self.crop_size = crop_size
        self.random_pick_future_seek = random_pick_future_seek
        self.rotation_range = rotation_range
        self.scale_range = scale_range
        self.negative_sample_prob = negative_sample_prob
        self.centroid_perturbation_range = centroid_perturbation_range
        self.temporal_jitter_range = temporal_jitter_range
        self.gaussian_sigma = gaussian_sigma

        # Estimate FPS from video file
        self.video = cv2.VideoCapture(self.video_path)
        if not self.video.isOpened():
            raise Exception(f"Could not open video file: {self.video_path}")

        self.fps = self.video.get(cv2.CAP_PROP_FPS)
        self.frame_width = int(self.video.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.resolution = np.array([self.frame_width, self.frame_height])

        self.augmentation_transform = augmentation_transform

        self.normalize = transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )

    @staticmethod
    def build_split_datasets(json_filepaths, video_dir, train_fraction=0.9,
                             future_frame_seek=[1, 3, 5, 7], interpolation_smoothness=0.5, gaussian_sigma=6.0,
                             augmentation_transforms=None, rotation_range=None, scale_range=None, negative_sample_prob=0.0):

        train_datasets = []
        val_datasets = []

        for path in json_filepaths:
            with open(path, 'r') as f:
                data = json.load(f)

            full_tracking = data['tracking']
            video = data['video']

            # Shuffle and split indices
            indices = list(range(len(full_tracking)))
            random.shuffle(indices)

            split_idx = int(len(indices) * train_fraction)
            train_idx = indices[:split_idx]
            val_idx = indices[split_idx:]

            # Subsets of tracking samples
            train_tracking = [full_tracking[i] for i in train_idx]
            val_tracking = [full_tracking[i] for i in val_idx]

            train_data = {
                "video": video,
                "tracking": train_tracking
            }

            val_data = {
                "video": video,
                "tracking": val_tracking
            }

            # Build train dataset
            train_dataset = TrackingDataset(
                tracking_data=train_data,
                video_dir=video_dir,
                future_frame_seek=future_frame_seek,
                random_pick_future_seek=True,
                interpolation_smoothness=interpolation_smoothness,
                gaussian_sigma=gaussian_sigma,
                rotation_range=rotation_range,
                scale_range=scale_range,
                negative_sample_prob=negative_sample_prob,
                augmentation_transform=augmentation_transforms
            )
            train_datasets.append(train_dataset)

            # Build val dataset
            val_dataset = TrackingDataset(
                tracking_data=val_data,
                video_dir=video_dir,
                future_frame_seek=future_frame_seek,
                random_pick_future_seek=False,
                interpolation_smoothness=interpolation_smoothness,
                gaussian_sigma=gaussian_sigma
            )
            val_datasets.append(val_dataset)

        # Concat across all videos
        final_train_dataset = ConcatDataset(train_datasets)
        final_val_dataset = ConcatDataset(val_datasets)

        return final_train_dataset, final_val_dataset

    def _interpolate_tracking(self, interpolation_smoothness):
        frame_ids = np.array([pt["frame_id"] for pt in self.tracking])
        coords = np.array([pt["coordinates"] for pt in self.tracking])

        min_frame = int(frame_ids.min())
        max_frame = int(frame_ids.max())
        dense_frames = np.arange(min_frame, max_frame)

        rbf_x = Rbf(
            frame_ids, coords[:, 0], function='multiquadric', epsilon=interpolation_smoothness)
        rbf_y = Rbf(
            frame_ids, coords[:, 1], function='multiquadric', epsilon=interpolation_smoothness)

        x_interp = rbf_x(dense_frames)
        y_interp = rbf_y(dense_frames)

        trajectory = np.stack([x_interp, y_interp], axis=1)

        return trajectory, min_frame, max_frame

    def test_sequence_cache(self):
        for frame_id in range(self.min_frame, self.max_frame):
            try:
                frame = self._read_frame(frame_id)
            except:
                print(f"Frame {frame_id} not found in video {self.video_path}")
                continue

            img = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            coodinate = self.smooth_trajectory[frame_id - self.min_frame]

            cv2.circle(img, (int(coodinate[0]), int(
                coodinate[1])), 5, (0, 255, 0), -1)

            cv2.imshow("Test", np.array(img))
            cv2.waitKey(0)

    def _read_frame(self, frame_id):
        self.video.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
        ret, img = self.video.read()
        if not ret:
            raise Exception(
                f"Could not read frame {frame_id} from video {self.video_path}")

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        return img

    def _get_crop(self, frame_id, center, transform: Tuple[float, float]):
        frame = self._read_frame(frame_id)

        crop_size = self.crop_size
        output_size = self.output_size

        if transform is None:
            x1 = int(center[0] - output_size // 2)
            y1 = int(center[1] - output_size // 2)
            x2 = x1 + output_size
            y2 = y1 + output_size

            if x1 < 0 or y1 < 0 or x2 > frame.shape[1] or y2 > frame.shape[0]:
                raise IndexError("Crop out of bounds")

            return frame[y1:y2, x1:x2, :]
        else:
            rotation_deg, scale = transform
            # Compute top-left corner of the large crop
            x1 = int(center[0] - crop_size // 2)
            y1 = int(center[1] - crop_size // 2)
            x2 = x1 + crop_size
            y2 = y1 + crop_size

            if x1 < 0 or y1 < 0 or x2 > frame.shape[1] or y2 > frame.shape[0]:
                raise IndexError("Crop out of bounds")

            crop = frame[y1:y2, x1:x2, :]

            center_point = (crop_size // 2, crop_size // 2)
            M = cv2.getRotationMatrix2D(center_point, rotation_deg, scale)

            rotated = cv2.warpAffine(
                crop, M, (crop_size, crop_size), flags=cv2.INTER_LINEAR)

            # Final center crop to self.crop_size
            start = crop_size // 2 - output_size // 2
            end = start + output_size

            return rotated[start:end, start:end, :]

    def transform_offset_for_heatmap(self, offset, transform: Tuple[float, float]):
        """
        Apply rotation and scale to an offset vector, then map to heatmap coordinates.

        Args:
            offset: np.ndarray shape (2,), the vector (search - template)
            transform: Tuple[float, float] = (rotation_deg, scale)

        Returns:
            np.ndarray of shape (2,), transformed and rescaled offset in heatmap coordinates
        """

        crop_size = self.crop_size
        output_size = self.output_size

        if transform:
            rotation_deg, scale = transform
            theta = np.deg2rad(rotation_deg)

            # 2D rotation matrix with scale
            R = np.array([
                [np.cos(theta), -np.sin(theta)],
                [np.sin(theta),  np.cos(theta)]
            ]) * scale

            offset = R @ offset

        heatmap_scale = output_size / crop_size
        search_roi_hit = offset * heatmap_scale + output_size // 2

        return search_roi_hit

    def generate_gaussian_heatmap(self, center):
        output_size = self.output_size

        x = torch.arange(0, output_size, 1).float()
        y = torch.arange(0, output_size, 1).float()
        y = y[:, None]

        x0, y0 = center
        heatmap = torch.exp(-((x - x0)**2 + (y - y0)**2) /
                            (2 * self.gaussian_sigma**2))
        return heatmap

    def __len__(self):
        max_future_seek = max(self.future_frame_seek) + \
            self.temporal_jitter_range
        last_frame = self.tracking[-1]["frame_id"]
        num_margin_frames = 0

        for i in range(len(self.tracking) - 1, -1, -1):
            num_margin_frames += 1
            if self.tracking[i]["frame_id"] + max_future_seek < last_frame:
                break

        num_samples = len(self.tracking) - num_margin_frames - 1

        if not self.random_pick_future_seek:
            num_samples *= len(self.future_frame_seek)

        return num_samples

    def __del__(self):
        if self.video.isOpened():
            self.video.release()

    def __getitem__(self, idx):
        if self.random_pick_future_seek:
            # Reset seed with  time for max randomness
            frame_jump = random.choice(self.future_frame_seek)
            template_tracking = self.tracking[idx]
        else:
            # use modulo to cycle through future_frame_seek
            frame_jump = self.future_frame_seek[idx % len(
                self.future_frame_seek)]
            template_tracking = self.tracking[idx //
                                              len(self.future_frame_seek)]

        if self.rotation_range or self.scale_range:
            rotation_deg = np.random.uniform(-self.rotation_range,
                                             self.rotation_range) if self.rotation_range else 0.0
            scale = np.random.uniform(
                1 - self.scale_range, 1 + self.scale_range) if self.scale_range else 1.0
            transform = (rotation_deg, scale)
        else:
            transform = None

        template_frame_id = template_tracking["frame_id"]

        if self.temporal_jitter_range > 0:
            jitter = random.randint(-self.temporal_jitter_range,
                                    self.temporal_jitter_range)
            template_frame_id += jitter

        search_frame_id = template_frame_id + frame_jump

        template_smooth_id = template_frame_id - self.min_frame
        search_smooth_id = template_smooth_id + frame_jump

        template_coordinate = self.smooth_trajectory[template_smooth_id]
        search_coordinate = self.smooth_trajectory[search_smooth_id]

        if self.centroid_perturbation_range > 0.0:
            perturbation_x = np.random.uniform(
                -self.centroid_perturbation_range, self.centroid_perturbation_range)
            perturbation_y = np.random.uniform(
                -self.centroid_perturbation_range, self.centroid_perturbation_range)
            template_coordinate = (
                template_coordinate[0] + perturbation_x, template_coordinate[1] + perturbation_y)

        is_negative = random.random() < self.negative_sample_prob

        if is_negative:
            offset_x = random.choice([-1, 1]) * random.randint(
                TrackingDataset._MAX_NEGATIVE_OFFSET // 2, TrackingDataset._MAX_NEGATIVE_OFFSET)
            offset_y = random.choice([-1, 1]) * random.randint(
                TrackingDataset._MAX_NEGATIVE_OFFSET // 2, TrackingDataset._MAX_NEGATIVE_OFFSET)

            template_coordinate = (
                search_coordinate[0] + offset_x,
                search_coordinate[1] + offset_y
            )

            max_frame_seek = max(self.future_frame_seek)
            search_frame_id = search_smooth_id + \
                random.randint(-max_frame_seek, max_frame_seek)

        try:
            template = self._get_crop(
                template_frame_id, template_coordinate, transform)
            search = self._get_crop(
                search_frame_id, template_coordinate, transform)
        except IndexError:
            return self.__getitem__((idx + 1) % len(self))

        to_tensor = transforms.ToTensor()
        template = to_tensor(template)
        search = to_tensor(search)

        # Augmentation with same seed
        if self.augmentation_transform:
            seed = np.random.randint(0, 10000)
            torch.manual_seed(seed)
            template = self.augmentation_transform(
                template.unsqueeze(0)).squeeze(0)
            torch.manual_seed(seed)
            search = self.augmentation_transform(
                search.unsqueeze(0)).squeeze(0)

        # Normalize the images
        template = self.normalize(template)
        search = self.normalize(search)

        if is_negative:
            heatmap = torch.zeros(
                (1, self.output_size, self.output_size))
        else:
            offset = np.array(search_coordinate) - \
                np.array(template_coordinate)
            search_roi_hit = self.transform_offset_for_heatmap(
                offset, transform)
            heatmap = self.generate_gaussian_heatmap(
                search_roi_hit).unsqueeze(0)

        return (
            template, search, heatmap
        )

transform_offset_for_heatmap(offset, transform)

Apply rotation and scale to an offset vector, then map to heatmap coordinates.

Parameters:

Name Type Description Default
offset

np.ndarray shape (2,), the vector (search - template)

required
transform Tuple[float, float]

Tuple[float, float] = (rotation_deg, scale)

required

Returns:

Type Description

np.ndarray of shape (2,), transformed and rescaled offset in heatmap coordinates

Source code in src/aegear/datasets.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
def transform_offset_for_heatmap(self, offset, transform: Tuple[float, float]):
    """
    Apply rotation and scale to an offset vector, then map to heatmap coordinates.

    Args:
        offset: np.ndarray shape (2,), the vector (search - template)
        transform: Tuple[float, float] = (rotation_deg, scale)

    Returns:
        np.ndarray of shape (2,), transformed and rescaled offset in heatmap coordinates
    """

    crop_size = self.crop_size
    output_size = self.output_size

    if transform:
        rotation_deg, scale = transform
        theta = np.deg2rad(rotation_deg)

        # 2D rotation matrix with scale
        R = np.array([
            [np.cos(theta), -np.sin(theta)],
            [np.sin(theta),  np.cos(theta)]
        ]) * scale

        offset = R @ offset

    heatmap_scale = output_size / crop_size
    search_roi_hit = offset * heatmap_scale + output_size // 2

    return search_roi_hit

CachedTrackingDataset

Bases: Dataset

Cached version of TrackingDataset. Loads crops and metadata from disk, avoiding video decoding at runtime. Each sample contains (template, search, heatmap).

Source code in src/aegear/datasets.py
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
class CachedTrackingDataset(Dataset):
    """
    Cached version of TrackingDataset.
    Loads crops and metadata from disk, avoiding video decoding at runtime.
    Each sample contains (template, search, heatmap).
    """

    def __init__(self, root_dir, output_size=128, gaussian_sigma=6.0):
        with open(os.path.join(root_dir, "metadata.json"), 'r') as f:
            self.metadata = json.load(f)["samples"]

        self.root_dir = root_dir
        self.output_size = output_size
        self.gaussian_sigma = gaussian_sigma

        self.to_tensor = transforms.ToTensor()
        self.normalize = transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )

    def __len__(self):
        return len(self.metadata)

    def generate_heatmap(self, center):
        x = torch.arange(0, self.output_size).float()
        y = torch.arange(0, self.output_size).float()[:, None]
        x0, y0 = center
        heatmap = torch.exp(-((x - x0)**2 + (y - y0)**2) /
                            (2 * self.gaussian_sigma**2))
        return heatmap.unsqueeze(0)  # Shape: [1, H, W]

    def __getitem__(self, idx):
        item = self.metadata[idx]
        template_path = os.path.join(
            self.root_dir, item["template_path"])
        search_path = os.path.join(self.root_dir, item["search_path"])
        template = self.to_tensor(
            Image.open(template_path).convert("RGB"))
        search = self.to_tensor(Image.open(search_path).convert("RGB"))
        template = self.normalize(template)
        search = self.normalize(search)

        if item.get("background", False):
            heatmap = torch.zeros(
                (1, self.output_size, self.output_size))
        else:
            heatmap = self.generate_heatmap(item["centroid"])

        return template, search, heatmap

BackgroundWindowDataset

Bases: Dataset

Dataset for sampling background (no-fish) windows from a video, using a sliding window approach. The user provides a list of frame indices known to contain only background (no fish present). Each sample is a cropped window from a background frame, with optional augmentation, rotation, and scaling. The output is (image, heatmap), where heatmap is always a zero tensor.

Source code in src/aegear/datasets.py
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
class BackgroundWindowDataset(torch.utils.data.Dataset):
    """
    Dataset for sampling background (no-fish) windows from a video, using a sliding window approach.
    The user provides a list of frame indices known to contain only background (no fish present).
    Each sample is a cropped window from a background frame, with optional augmentation, rotation, and scaling.
    The output is (image, heatmap), where heatmap is always a zero tensor.
    """

    def __init__(
        self,
        video_path: str,
        background_frames: list[int],
        output_size: int = 128,
        crop_size: int = 168,
        siamese: bool = False,
        stride_portion: float = 0.5,
        augmentation_transform=None,
        rotation_range=None,
        scale_range=None,
    ):
        self.video_path = video_path
        self.background_frames = sorted(background_frames)
        self.output_size = output_size
        self.crop_size = crop_size
        self.siamese = siamese
        self.stride_portion = stride_portion
        self.augmentation_transform = augmentation_transform
        self.rotation_range = rotation_range
        self.scale_range = scale_range
        self.normalize = transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
        # Open video and get frame size
        self.video = cv2.VideoCapture(self.video_path)
        if not self.video.isOpened():
            raise Exception(f"Could not open video file: {self.video_path}")
        self.frame_width = int(self.video.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        # Precompute all valid (frame, y, x) window positions
        self.samples = []
        stride = max(1, int(self.stride_portion * self.output_size))
        for frame_id in self.background_frames:
            for y in range(0, self.frame_height - self.crop_size + 1, stride):
                for x in range(0, self.frame_width - self.crop_size + 1, stride):
                    self.samples.append((frame_id, y, x))

    def __len__(self):
        return len(self.samples)

    def __del__(self):
        if hasattr(self, 'video') and self.video.isOpened():
            self.video.release()

    def _read_frame(self, frame_id):
        self.video.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
        ret, img = self.video.read()
        if not ret:
            raise Exception(
                f"Could not read frame {frame_id} from video {self.video_path}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        return img

    def __getitem__(self, idx):
        frame_id, y, x = self.samples[idx]
        # Optionally apply rotation/scale
        if self.rotation_range or self.scale_range:
            rotation_deg = np.random.uniform(-self.rotation_range,
                                             self.rotation_range) if self.rotation_range else 0.0
            scale = np.random.uniform(
                1 - self.scale_range, 1 + self.scale_range) if self.scale_range else 1.0
        else:
            rotation_deg = 0.0
            scale = 1.0
        # Read frame and crop
        frame = self._read_frame(frame_id)
        crop = frame[y:y+self.crop_size, x:x+self.crop_size, :]
        # Apply rotation/scale if needed
        if rotation_deg != 0.0 or scale != 1.0:
            center_point = (self.crop_size // 2, self.crop_size // 2)
            M = cv2.getRotationMatrix2D(center_point, rotation_deg, scale)
            crop = cv2.warpAffine(
                crop, M, (self.crop_size, self.crop_size), flags=cv2.INTER_LINEAR)
        # Final center crop to output_size
        start = self.crop_size // 2 - self.output_size // 2
        end = start + self.output_size
        crop = crop[start:end, start:end, :]
        # To tensor
        crop = transforms.ToTensor()(crop)
        # Augmentation
        if self.augmentation_transform:
            crop = self.augmentation_transform(crop.unsqueeze(0)).squeeze(0)
        crop = self.normalize(crop)
        heatmap = torch.zeros((1, self.output_size, self.output_size))

        if self.siamese:
            # For Siamese networks, return two identical crops
            return crop, crop, heatmap
        else:
            return crop, heatmap

split_coco_annotations(coco_json_path, train_ratio=0.8, seed=42)

Loads a COCO JSON and splits it into train/val dictionaries based on image-level split.

Parameters:

Name Type Description Default
coco_json_path Path

Path to the COCO annotations.json.

required
train_ratio float

Ratio of images to assign to the training set.

0.8
seed int

Random seed for reproducibility.

42

Returns:

Type Description
Tuple[dict, dict]

Tuple[dict, dict]: (train_dict, val_dict)

Source code in src/aegear/datasets.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def split_coco_annotations(
    coco_json_path: Path,
    train_ratio: float = 0.8,
    seed: int = 42
) -> Tuple[dict, dict]:
    """
    Loads a COCO JSON and splits it into train/val dictionaries based on image-level split.

    Args:
        coco_json_path (Path): Path to the COCO annotations.json.
        train_ratio (float): Ratio of images to assign to the training set.
        seed (int): Random seed for reproducibility.

    Returns:
        Tuple[dict, dict]: (train_dict, val_dict)
    """
    with open(coco_json_path, 'r') as f:
        coco = json.load(f)

    images = coco["images"]
    annotations = coco["annotations"]
    categories = coco["categories"]

    # Reproducible shuffle
    random.seed(seed)
    shuffled_images = images[:]
    random.shuffle(shuffled_images)

    split_idx = int(len(shuffled_images) * train_ratio)
    train_images = shuffled_images[:split_idx]
    val_images = shuffled_images[split_idx:]

    train_img_ids = {img["id"] for img in train_images}
    val_img_ids = {img["id"] for img in val_images}

    # Filter annotations
    train_annotations = [
        ann for ann in annotations if ann["image_id"] in train_img_ids]
    val_annotations = [
        ann for ann in annotations if ann["image_id"] in val_img_ids]

    train_dict = {
        "images": train_images,
        "annotations": train_annotations,
        "categories": categories
    }

    val_dict = {
        "images": val_images,
        "annotations": val_annotations,
        "categories": categories
    }

    return train_dict, val_dict

model

CBAM

Bases: Module

Lightweight convolutional block attention module (CBAM) for channel and spatial attention.

Source code in src/aegear/model.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class CBAM(nn.Module):
    """Lightweight convolutional block attention module (CBAM) for channel and spatial attention."""

    def __init__(self, in_channels):
        super().__init__()
        # Channel attention
        self.channel = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(in_channels, in_channels // 8, 1),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // 8, in_channels, 1),
            nn.Sigmoid()
        )
        # Spatial attention
        self.spatial = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size=7, padding=3),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Channel attention
        ca = self.channel(x)
        x = x * ca

        # Spatial attention
        max_pool = torch.max(x, dim=1, keepdim=True)[0]
        avg_pool = torch.mean(x, dim=1, keepdim=True)
        sa = self.spatial(torch.cat([max_pool, avg_pool], dim=1))
        return x * sa

EfficientUNet

Bases: Module

EfficientUNet backbone based on EfficientNet-B0, enhanced with CBAM (Convolutional Block Attention Module) attention blocks after each encoder and decoder stage.

The architecture removes the deepest (last) encoder and decoder stages compared to a standard UNet, resulting in a lighter model with fewer parameters and reduced memory usage, while retaining strong feature extraction and localization capabilities.

CBAM modules are used to improve feature representation by applying both channel and spatial attention at multiple levels of the network, allowing the model to focus on the object of interest while ignoring irrelevant information. This is particularly useful in scenarios where the object of interest (e.g., fish) may be small and difficult to distinguish from the background, or when there are multiple objects present in the image.

Source code in src/aegear/model.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class EfficientUNet(nn.Module):
    """
    EfficientUNet backbone based on EfficientNet-B0, enhanced with CBAM
    (Convolutional Block Attention Module) attention blocks after each encoder
    and decoder stage.

    The architecture removes the deepest (last) encoder and
    decoder stages compared to a standard UNet, resulting in a lighter model
    with fewer parameters and reduced memory usage, while retaining strong
    feature extraction and localization capabilities.

    CBAM modules are used to improve feature representation by applying both
    channel and spatial attention at multiple levels of the network, allowing
    the model to focus on the object of interest while ignoring irrelevant information.
    This is particularly useful in scenarios where the object of interest (e.g., fish)
    may be small and difficult to distinguish from the background, or when there
    are multiple objects present in the image.
    """

    def __init__(self, weights=None):
        super().__init__()
        backbone = efficientnet_b0(weights=weights)
        features = list(backbone.features.children())

        # Encoder stages
        self.enc1 = nn.Sequential(*features[:2])  # Output: 16 ch, S/2
        self.enc2 = nn.Sequential(*features[2:3])  # Output: 24 ch, S/4
        self.enc3 = nn.Sequential(*features[3:4])  # Output: 40 ch, S/8
        self.enc4 = nn.Sequential(*features[4:5])  # Output: 80 ch, S/16
        self.enc5 = nn.Sequential(*features[5:6])  # Output: 112 ch, S/16

        # Bottleneck with dilated convs.
        self.bottleneck = nn.Sequential(
            nn.Conv2d(112, 256, kernel_size=3, padding=2, dilation=2),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
        )
        self.att_bottleneck = CBAM(256)

        # Decoder with CBAM after skip merges
        self.att4 = CBAM(256 + 112)
        self.up4 = self._conf_block(256 + 112, 64)  # S/16 -> S/16

        self.att3 = CBAM(64 + 80)
        self.up3 = self._up_block(64 + 80, 32)

        self.att2 = CBAM(32 + 40)
        self.up2 = self._up_block(32 + 40, 24)

        self.att1 = CBAM(24 + 24)
        self.up1 = self._up_block(24 + 24, 16)

        self.att0 = CBAM(16 + 16)
        self.up0 = self._up_block(16 + 16, 8)

        # Final 1-channel output
        self.out = nn.Conv2d(8, 1, kernel_size=1)

    def _up_block(self, in_ch, out_ch):
        return nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
            nn.Conv2d(in_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
        )

    def _conf_block(self, in_ch, out_ch):
        return nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.forward_with_decoded(x)[0]

    def forward_with_decoded(self, x):
        # Encoder
        x1 = self.enc1(x)  # S/2
        x2 = self.enc2(x1)  # S/4
        x3 = self.enc3(x2)  # S/8
        x4 = self.enc4(x3)  # S/16
        x5 = self.enc5(x4)  # S/16

        b = self.bottleneck(x5)
        b = self.att_bottleneck(b)

        # Decoder
        d4_cat = torch.cat([b, x5], dim=1)
        d4_att = self.att4(d4_cat)
        d4 = self.up4(d4_att)

        d3_cat = torch.cat([d4, x4], dim=1)
        d3_att = self.att3(d3_cat)
        d3 = self.up3(d3_att)

        d2_cat = torch.cat([d3, x3], dim=1)
        d2_att = self.att2(d2_cat)
        d2 = self.up2(d2_att)

        d1_cat = torch.cat([d2, x2], dim=1)
        d1_att = self.att1(d1_cat)
        d1 = self.up1(d1_att)

        d0_cat = torch.cat([d1, x1], dim=1)
        d0_att = self.att0(d0_cat)
        d0 = self.up0(d0_att)

        # Final output
        out = self.out(d0)

        # Resize to original input size
        out = F.interpolate(out,
                            size=x.shape[2:],
                            mode='bilinear',
                            align_corners=False)

        return out, d0

SiameseTracker

Bases: Module

Siamese UNet model for tracking, based on EfficientUNet.

This model is designed to take two inputs: a template image and a search image. The template image is the reference image of the object to be tracked, while the search image is the current frame in which the object is being searched for. The model processes both images through a shared UNet architecture, extracting features from both images and then concatenating them at each stage of the decoder. This allows the model to leverage the spatial information from both images, improving the tracking performance.

Source code in src/aegear/model.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
class SiameseTracker(nn.Module):
    """
    Siamese UNet model for tracking, based on EfficientUNet.

    This model is designed to take two inputs: a template image and a search
    image. The template image is the reference image of the object to be
    tracked, while the search image is the current frame in which the object
    is being searched for. The model processes both images through a shared
    UNet architecture, extracting features from both images and then
    concatenating them at each stage of the decoder. This allows the model to
    leverage the spatial information from both images, improving the
    tracking performance.
    """

    def __init__(self, unet=EfficientUNet()):
        super().__init__()
        # Share encoder stages from the UNet
        self.enc1 = unet.enc1
        self.enc2 = unet.enc2
        self.enc3 = unet.enc3
        self.enc4 = unet.enc4
        self.enc5 = unet.enc5

        # Share bottleneck and bottleneck attention
        self.bottleneck = unet.bottleneck
        self.att_bottleneck = unet.att_bottleneck

        # Decoder blocks with adjusted input channel sizes for concatenated Siamese features
        # The input channels to att/up blocks will be double the UNet's combined input
        self.att4 = CBAM(256 * 2 + 112 * 2)
        self.up4 = unet._conf_block(256 * 2 + 112 * 2, 64)

        self.att3 = CBAM(64 + 80 * 2)
        self.up3 = unet._up_block(64 + 80 * 2, 32)

        self.att2 = CBAM(32 + 40 * 2)
        self.up2 = unet._up_block(32 + 40 * 2, 24)

        self.att1 = CBAM(24 + 24 * 2)
        self.up1 = unet._up_block(24 + 24 * 2, 16)

        self.att0 = CBAM(16 + 16 * 2)
        self.up0 = unet._up_block(16 + 16 * 2, 8)

        # Re-use the output layer from UNet
        self.out = unet.out

    def forward(self, template, search):
        # Encoder
        t1 = self.enc1(template)  # S/2
        s1 = self.enc1(search)

        t2 = self.enc2(t1)  # S/4
        s2 = self.enc2(s1)

        t3 = self.enc3(t2)  # S/8
        s3 = self.enc3(s2)

        t4 = self.enc4(t3)  # S/16
        s4 = self.enc4(s3)

        t5 = self.enc5(t4)  # S/16
        s5 = self.enc5(s4)

        # Bottleneck with attention.
        b_t = self.bottleneck(t5)
        b_s = self.bottleneck(s5)
        b_t_att = self.att_bottleneck(b_t)
        b_s_att = self.att_bottleneck(b_s)

        fused_bottleneck = torch.cat(
            [b_t_att, b_s_att], dim=1)

        # Decoder
        d4_cat = torch.cat(
            [fused_bottleneck, torch.cat([t5, s5], dim=1)], dim=1)
        d4_att = self.att4(d4_cat)
        d4_fused = self.up4(d4_att)

        d3_cat = torch.cat([d4_fused, torch.cat([t4, s4], dim=1)], dim=1)
        d3_att = self.att3(d3_cat)
        d3_fused = self.up3(d3_att)

        d2_cat = torch.cat([d3_fused, torch.cat([t3, s3], dim=1)], dim=1)
        d2_att = self.att2(d2_cat)
        d2_fused = self.up2(d2_att)

        d1_cat = torch.cat([d2_fused, torch.cat([t2, s2], dim=1)], dim=1)
        d1_att = self.att1(d1_cat)
        d1_fused = self.up1(d1_att)

        d0_cat = torch.cat([d1_fused, torch.cat([t1, s1], dim=1)], dim=1)
        d0_att = self.att0(d0_cat)
        d0_fused = self.up0(d0_att)

        out = self.out(d0_fused)
        return F.interpolate(out, size=template.shape[2:], mode='bilinear', align_corners=False)

ConvClassifier

Bases: Module

A simple convolutional network for binary classification. This model is designed to classify whether a fish is present in a given region of interest (ROI) of the image.

Source code in src/aegear/model.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class ConvClassifier(nn.Module):
    """
    A simple convolutional network for binary classification.
    This model is designed to classify whether a fish is present in a given
    region of interest (ROI) of the image.
    """
    # Size of the region of interest (ROI) for classification.
    ROI_SIZE = 64

    def __init__(self):
        super(ConvClassifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(128 * (ConvClassifier.ROI_SIZE // 8) ** 2, 256)
        self.fc2 = nn.Linear(256, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2)
        x = x.view(x.size(0), -1)  # Flatten layer
        x = F.relu(self.fc1(x))
        return torch.sigmoid(self.fc2(x))

motiondetection

Motion detection module.

This module provides the MotionDetector class that identifies motion by comparing three consecutive frames. The algorithm converts frames to grayscale, computes the absolute difference between frames, applies binary thresholding, combines the results, and uses morphological operations to filter the motion regions before extracting contours.

MotionDetector

Motion detector class that identifies motion by comparing three consecutive frames.

Source code in src/aegear/motiondetection.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class MotionDetector:
    """
    Motion detector class that identifies motion by comparing three consecutive frames.
    """

    MIN_AREA: int = 10

    def __init__(self, motion_threshold: int, erode_kernel_size: int = 3,
                 dilate_kernel_size: int = 15, min_area: int = 800, max_area: int = 3000) -> None:
        """
        Initialize the MotionDetector.

        Parameters
        ----------
        motion_threshold : int
            The threshold used to detect motion based on pixel intensity difference.
        erode_kernel_size : int, optional
            Size of the kernel used for erosion (default is 3).
        dilate_kernel_size : int, optional
            Size of the kernel used for dilation (default is 15).
        min_area : int, optional
            Minimum contour area to be considered as good motion (default is 800).
        max_area : int, optional
            Maximum contour area to be considered as good motion (default is 3000).
        """
        self.motion_threshold = motion_threshold
        self.erode_kernel_size = erode_kernel_size
        self.dilate_kernel_size = dilate_kernel_size
        self.min_area = min_area
        self.max_area = max_area

    def detect(self, prev_frame: np.ndarray, this_frame: np.ndarray,
               next_frame: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray]]:
        """
        Detect motion by comparing three consecutive frames.

        The function converts the frames to grayscale, computes the absolute differences,
        thresholds them to produce binary images, combines the thresholded images, applies
        morphological operations to remove noise, and finally extracts contours. Detected
        contours are classified into "good" (within the area range) and "bad" (outside the
        area range but above a minimum threshold).

        Parameters
        ----------
        prev_frame : numpy.ndarray
            Previous frame in BGR color space.
        this_frame : numpy.ndarray
            Current frame in BGR color space.
        next_frame : numpy.ndarray
            Next frame in BGR color space.

        Returns
        -------
        Tuple[List[numpy.ndarray], List[numpy.ndarray]]
            A tuple containing two lists of contours:
            - The first list contains contours with areas between min_area and max_area.
            - The second list contains contours with areas outside that range but above MIN_AREA.
        """
        # Convert frames to grayscale
        gprev_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
        gframe = cv2.cvtColor(this_frame, cv2.COLOR_BGR2GRAY)
        gnext_frame = cv2.cvtColor(next_frame, cv2.COLOR_BGR2GRAY)

        # Compute absolute differences between the current frame and its neighbors
        diff_prev = np.abs(gframe.astype(np.float32) - gprev_frame.astype(np.float32)).astype(np.uint8)
        diff_next = np.abs(gframe.astype(np.float32) - gnext_frame.astype(np.float32)).astype(np.uint8)

        # Apply binary thresholding to highlight significant differences
        _, thresh_prev = cv2.threshold(diff_prev, self.motion_threshold, 255, cv2.THRESH_BINARY)
        _, thresh_next = cv2.threshold(diff_next, self.motion_threshold, 255, cv2.THRESH_BINARY)

        # Combine the thresholded images
        combined = cv2.bitwise_or(thresh_prev, thresh_next)

        # Apply morphological operations to reduce noise and close gaps
        erode_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (self.erode_kernel_size, self.erode_kernel_size))
        dilate_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (self.dilate_kernel_size, self.dilate_kernel_size))
        morphed = cv2.erode(combined, erode_kernel)
        morphed = cv2.dilate(morphed, dilate_kernel)

        # Smooth the image and reapply thresholding to finalize the binary image
        blurred = cv2.GaussianBlur(morphed, (19, 19), 5.0)
        _, final_thresh = cv2.threshold(blurred, 50, 255, cv2.THRESH_BINARY)

        # Find contours in the thresholded image
        contours, _ = cv2.findContours(final_thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

        good_contours: List[np.ndarray] = []
        bad_contours: List[np.ndarray] = []

        # Classify contours based on their area
        for contour in contours:
            area = cv2.contourArea(contour)
            if area < MotionDetector.MIN_AREA:
                continue

            if self.min_area <= area <= self.max_area:
                good_contours.append(contour)
            else:
                bad_contours.append(contour)

        return good_contours, bad_contours

detect(prev_frame, this_frame, next_frame)

Detect motion by comparing three consecutive frames.

The function converts the frames to grayscale, computes the absolute differences, thresholds them to produce binary images, combines the thresholded images, applies morphological operations to remove noise, and finally extracts contours. Detected contours are classified into "good" (within the area range) and "bad" (outside the area range but above a minimum threshold).

Parameters

prev_frame : numpy.ndarray Previous frame in BGR color space. this_frame : numpy.ndarray Current frame in BGR color space. next_frame : numpy.ndarray Next frame in BGR color space.

Returns

Tuple[List[numpy.ndarray], List[numpy.ndarray]] A tuple containing two lists of contours: - The first list contains contours with areas between min_area and max_area. - The second list contains contours with areas outside that range but above MIN_AREA.

Source code in src/aegear/motiondetection.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def detect(self, prev_frame: np.ndarray, this_frame: np.ndarray,
           next_frame: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray]]:
    """
    Detect motion by comparing three consecutive frames.

    The function converts the frames to grayscale, computes the absolute differences,
    thresholds them to produce binary images, combines the thresholded images, applies
    morphological operations to remove noise, and finally extracts contours. Detected
    contours are classified into "good" (within the area range) and "bad" (outside the
    area range but above a minimum threshold).

    Parameters
    ----------
    prev_frame : numpy.ndarray
        Previous frame in BGR color space.
    this_frame : numpy.ndarray
        Current frame in BGR color space.
    next_frame : numpy.ndarray
        Next frame in BGR color space.

    Returns
    -------
    Tuple[List[numpy.ndarray], List[numpy.ndarray]]
        A tuple containing two lists of contours:
        - The first list contains contours with areas between min_area and max_area.
        - The second list contains contours with areas outside that range but above MIN_AREA.
    """
    # Convert frames to grayscale
    gprev_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    gframe = cv2.cvtColor(this_frame, cv2.COLOR_BGR2GRAY)
    gnext_frame = cv2.cvtColor(next_frame, cv2.COLOR_BGR2GRAY)

    # Compute absolute differences between the current frame and its neighbors
    diff_prev = np.abs(gframe.astype(np.float32) - gprev_frame.astype(np.float32)).astype(np.uint8)
    diff_next = np.abs(gframe.astype(np.float32) - gnext_frame.astype(np.float32)).astype(np.uint8)

    # Apply binary thresholding to highlight significant differences
    _, thresh_prev = cv2.threshold(diff_prev, self.motion_threshold, 255, cv2.THRESH_BINARY)
    _, thresh_next = cv2.threshold(diff_next, self.motion_threshold, 255, cv2.THRESH_BINARY)

    # Combine the thresholded images
    combined = cv2.bitwise_or(thresh_prev, thresh_next)

    # Apply morphological operations to reduce noise and close gaps
    erode_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (self.erode_kernel_size, self.erode_kernel_size))
    dilate_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (self.dilate_kernel_size, self.dilate_kernel_size))
    morphed = cv2.erode(combined, erode_kernel)
    morphed = cv2.dilate(morphed, dilate_kernel)

    # Smooth the image and reapply thresholding to finalize the binary image
    blurred = cv2.GaussianBlur(morphed, (19, 19), 5.0)
    _, final_thresh = cv2.threshold(blurred, 50, 255, cv2.THRESH_BINARY)

    # Find contours in the thresholded image
    contours, _ = cv2.findContours(final_thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

    good_contours: List[np.ndarray] = []
    bad_contours: List[np.ndarray] = []

    # Classify contours based on their area
    for contour in contours:
        area = cv2.contourArea(contour)
        if area < MotionDetector.MIN_AREA:
            continue

        if self.min_area <= area <= self.max_area:
            good_contours.append(contour)
        else:
            bad_contours.append(contour)

    return good_contours, bad_contours

tracker

Prediction

A class to represent a prediction made by the model.

Source code in src/aegear/tracker.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Prediction:
    """A class to represent a prediction made by the model."""

    def __init__(self, confidence, centroid, roi=None):
        """Initialize the prediction.

        Parameters
        ----------

        confidence : float
            The confidence of the prediction.
        centroid : tuple
            The centroid of the prediction.
        roi : np.ndarray
            The region of interest of the prediction.
        """

        self.centroid = centroid
        self.confidence = confidence
        self.roi = roi

    def global_coordinates(self, origin):
        x, y = origin

        confidence = self.confidence
        centroid = self.centroid

        return Prediction(
            confidence,
            (centroid[0] + x, centroid[1] + y),
            self.roi,
        )

FishTracker

Source code in src/aegear/tracker.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
class FishTracker:

    # Original window size for the training data.
    WINDOW_SIZE = 128
    # The size of the tracking window.
    TRACKER_WINDOW_SIZE = 128

    def __init__(self,
                 heatmap_model_path,
                 siamese_model_path,
                 tracking_threshold=0.9,
                 detection_threshold=0.85,
                 search_stride=0.5,
                 tracking_max_skip=10,
                 debug=False):

        self._debug = debug
        self._stride = search_stride
        self._device = FishTracker._select_device()
        self._transform = FishTracker._init_transform()
        self.heatmap_model = self._init_heatmap_model(heatmap_model_path)
        self.siamese_model = self._init_siamese_model(siamese_model_path)
        self.tracking_threshold = tracking_threshold
        self.detection_threshold = detection_threshold
        self.tracking_max_skip = tracking_max_skip

        self.last_result = None
        self.history = []
        self.frame_size = None

    def run_tracking(self,
                     video: VideoClip,
                     start_frame: int,
                     end_frame: int,
                     model_track_register,
                     progress_reporter: Optional[ProgressReporter] = None,
                     ui_update=None):
        """Run the tracking on a video."""

        bgs = self._init_background_subtractor(video, start_frame)
        current_skip = self.tracking_max_skip
        anchor_frame = start_frame

        self.last_result = None

        def progress_still_running(
        ): return progress_reporter is not None and progress_reporter.still_running()

        while anchor_frame < end_frame and progress_still_running():
            candidate = anchor_frame + current_skip
            if candidate >= end_frame:
                break

            # Read and pre‑process the candidate.
            frame = video.get_frame(float(candidate) / video.fps)
            if frame is None:
                break

            result = self._track_frame(
                frame, mask=self._motion_detection(bgs, frame))

            if result is not None:
                # Store this result for further tracking.
                self.last_result = result
                model_track_register(
                    candidate, result.centroid, result.confidence)

                anchor_frame = candidate

                if progress_reporter is not None:
                    progress_reporter.update(anchor_frame)

                if current_skip < self.tracking_max_skip:
                    current_skip = min(
                        current_skip * 2, self.tracking_max_skip)
            else:
                if self.last_result is not None and current_skip > 1:
                    current_skip = max(current_skip // 2, 1)
                    continue

                anchor_frame = candidate
                self.last_result = None

            if ui_update is not None:
                ui_update(anchor_frame)

    def _select_device():
        """Select the device - try CUDA, if fails, try mps for Apple Silicon, else CPU."""
        if torch.cuda.is_available():
            return torch.device("cuda")
        elif torch.backends.mps.is_available():
            return torch.device("mps")
        else:
            return torch.device("cpu")

    def _init_transform():
        """Initialize the transform."""
        return transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225]),
        ])

    def _init_heatmap_model(self, model_path):
        """Initialize the model."""
        model = EfficientUNet(weights=None)
        model.load_state_dict(torch.load(
            model_path, map_location=self._device))
        model.to(self._device)

        # Set the model to evaluation mode
        model.eval()
        return model

    def _init_siamese_model(self, model_path):
        """Initialize the siamese tracking model."""
        model = SiameseTracker()
        model.load_state_dict(torch.load(
            model_path, map_location=self._device))
        model.to(self._device)

        # Set the model to evaluation mode
        model.eval()
        return model

    def _track_frame(self, frame, mask=None):
        """Track the fish in the given frame.

        Parameters
        ----------

        frame : np.ndarray
            The frame to track the fish in.
        mask : np.ndarray, optional
            The mask to use for tracking. If None, the whole frame is used.

        Returns
        -------

        Prediction or None
            The prediction made by the model, or None if no fish is detected.
        """
        if self.frame_size is None:
            self.frame_size = frame.shape[:2]

        self._debug_print("track")

        if self.last_result is None:
            self._debug_print("sliding")
            # Do a sliding window over the whole frame to try and find our fish.
            result = self._sliding_window_predict(frame, mask)

            if result is not None:
                prediction = result

                prediction.roi = self._tracking_roi(
                    frame, prediction.centroid)[1]

                return prediction
        else:
            self._debug_print("tracking")
            # Try getting a ROI around the last position.
            (x1, y1), current_roi = self._tracking_roi(
                frame, self.last_result.centroid)
            result = self._evaluate_siamese_model(
                self.last_result.roi, current_roi)

            if result is not None:
                prediction = result.global_coordinates((x1, y1))
                prediction.roi = self._tracking_roi(
                    frame, prediction.centroid)[1]

                self._debug_print(
                    f"Found fish at ({result.centroid}) with confidence {result.confidence}")

                return prediction

        return None

    def _tracking_roi(self, frame, centroid):
        """Get the tracking ROI around the centroid."""
        x, y = centroid
        h, w = frame.shape[:2]
        w_t = self.TRACKER_WINDOW_SIZE // 2

        # Clamp center so that full ROI fits in frame
        x = max(w_t, min(x, w - w_t))
        y = max(w_t, min(y, h - w_t))

        x1 = int(x - w_t)
        y1 = int(y - w_t)
        x2 = int(x + w_t)
        y2 = int(y + w_t)

        return (x1, y1), frame[y1:y2, x1:x2]

    def _init_background_subtractor(self, video: VideoClip, start_frame: int, history=50, dist2threshold=500, warmup=20):
        """Initialize the background subtractor."""
        background_subtractor = cv2.createBackgroundSubtractorKNN(
            history=history, dist2Threshold=dist2threshold, detectShadows=False)

        # Warm up the background subtractor with a few frames.
        for fid in range(max(start_frame - warmup, 0), start_frame):
            t = float(fid) / video.fps
            f = video.get_frame(t)
            if f is None:
                continue

            gframe = cv2.cvtColor(f, cv2.COLOR_RGB2GRAY)
            gframe = cv2.GaussianBlur(gframe, (5, 5), 1.0)

            background_subtractor.apply(gframe, learningRate=0.25)

        return background_subtractor

    def _motion_detection(self, bgs, frame):
        """Detect motion in the frame using the background subtractor."""

        gframe = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        gframe = cv2.GaussianBlur(gframe, (5, 5), 1.0)

        mask = bgs.apply(gframe, learningRate=0.125)

        k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k)

        return mask

    def _sliding_window_predict(self, frame, mask=None) -> Optional[Prediction]:
        """
        Do a sliding window over the whole frame to try and find our fish.

        Parameters
        ----------
        frame : np.ndarray
            The frame to do the sliding window over.

        Returns
        -------

        list
            A list of predictions made by the model.

        """

        h, w = frame.shape[:2]
        results = []

        win_size = self.WINDOW_SIZE
        stride = int(self._stride * win_size)

        for y in range(0, h, stride):
            for x in range(0, w, stride):

                if mask is not None:
                    mask_roi = mask[y:y+win_size, x:x+win_size]
                    mask_sum = mask_roi.sum()

                    # Check if the window is in the mask.
                    if mask_sum == 0:
                        continue

                try:
                    window = frame[y:y+win_size, x:x+win_size]
                except:
                    # If we go out of bounds, we skip this window.
                    continue

                if window.shape[0] != win_size or window.shape[1] != win_size:
                    continue

                result = self._evaluate_heatmap_model(window)

                if not result:
                    continue

                # Map out the global coordinates of the predictions.
                results.append(result.global_coordinates((x, y)))

        if results:
            self._debug_print(f"Got {len(results)} results")

            # Sort by score
            results.sort(key=lambda x: x.confidence, reverse=True)

            # Get the best result
            result = results[0]

            if result.confidence < self.detection_threshold:

                self._debug_print(
                    f"Best candidate confidence {result.confidence} is below threshold {self.detection_threshold}")
                return None

            return result  # Return the best result

        self._debug_print(f"Not a single sliding window found a fish")

        return None

    def _get_centroid(heatmap):
        if heatmap.sum() < 1e-6:
            return None

        b, _, _, w = heatmap.shape
        flat_idx = torch.argmax(heatmap.view(b, -1), dim=1)
        y = flat_idx // w
        x = flat_idx % w

        # Get confidence at the centroid
        confidence = heatmap[0, 0, y, x].item()

        return confidence, (x.int().item(), y.int().item())

    def _evaluate_heatmap_model(self, window) -> Prediction:
        """Evaluate the model on a window of the image.
        Note that this returns the prediction in window local space. For global space
        adjust the centroid and box coordinates accordingly using the origin of the window.
        """

        # Prepare the input.
        input = self._transform(window) \
                    .to(self._device) \
                    .unsqueeze(0)

        try:
            output = torch.sigmoid(self.heatmap_model(input))
        except Exception as e:
            self._debug_print(f"Error in model evaluation: {e}")
            # If we get an error, we just return None.
            return None

        result = FishTracker._get_centroid(output)

        if result is None:
            self._debug_print("Heatmap: No fish detected")
            return None

        (confidence, centroid) = result

        return Prediction(confidence, centroid)

    def _evaluate_siamese_model(self, last_roi, current_roi) -> Prediction:

        # Prepare the input.
        template = self._transform(last_roi) \
            .to(self._device) \
            .unsqueeze(0)

        search = self._transform(current_roi) \
            .to(self._device) \
            .unsqueeze(0)

        try:
            output = torch.sigmoid(self.siamese_model(template, search))
        except Exception as e:
            self._debug_print(f"Siamese: Error in model evaluation: {e}")
            # If we get an error, we just return None.
            return None

        result = FishTracker._get_centroid(output)

        if result is None:
            self._debug_print("Siamese: No fish detected")
            return None

        (confidence, centroid) = result

        if confidence < self.tracking_threshold:
            self._debug_print(
                f"Siamese: Confidence {confidence} is below threshold {self.tracking_threshold}")
            return None

        return Prediction(confidence, centroid, roi=None)

    def _debug_print(self, msg):
        if self._debug:
            print(msg)

run_tracking(video, start_frame, end_frame, model_track_register, progress_reporter=None, ui_update=None)

Run the tracking on a video.

Source code in src/aegear/tracker.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def run_tracking(self,
                 video: VideoClip,
                 start_frame: int,
                 end_frame: int,
                 model_track_register,
                 progress_reporter: Optional[ProgressReporter] = None,
                 ui_update=None):
    """Run the tracking on a video."""

    bgs = self._init_background_subtractor(video, start_frame)
    current_skip = self.tracking_max_skip
    anchor_frame = start_frame

    self.last_result = None

    def progress_still_running(
    ): return progress_reporter is not None and progress_reporter.still_running()

    while anchor_frame < end_frame and progress_still_running():
        candidate = anchor_frame + current_skip
        if candidate >= end_frame:
            break

        # Read and pre‑process the candidate.
        frame = video.get_frame(float(candidate) / video.fps)
        if frame is None:
            break

        result = self._track_frame(
            frame, mask=self._motion_detection(bgs, frame))

        if result is not None:
            # Store this result for further tracking.
            self.last_result = result
            model_track_register(
                candidate, result.centroid, result.confidence)

            anchor_frame = candidate

            if progress_reporter is not None:
                progress_reporter.update(anchor_frame)

            if current_skip < self.tracking_max_skip:
                current_skip = min(
                    current_skip * 2, self.tracking_max_skip)
        else:
            if self.last_result is not None and current_skip > 1:
                current_skip = max(current_skip // 2, 1)
                continue

            anchor_frame = candidate
            self.last_result = None

        if ui_update is not None:
            ui_update(anchor_frame)

training

Module containing various training-related utilities and functions.

WeightedBCEWithLogitsLoss

Custom loss function that applies weighted binary cross-entropy with logits. It emphasizes the center of the Gaussian heatmap.

Source code in src/aegear/training.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class WeightedBCEWithLogitsLoss:
    """
    Custom loss function that applies weighted binary cross-entropy with logits.
    It emphasizes the center of the Gaussian heatmap.
    """

    def __init__(self, limit=0.5, pos_weight=10.0):
        self.limit = limit
        self.pos_weight = pos_weight

    def __call__(self, pred, target):
        weights = torch.ones_like(target)
        # emphasize center of Gaussian
        weights[target > self.limit] = self.pos_weight

        bce = F.binary_cross_entropy_with_logits(
            pred, target, weight=weights, reduction='mean')
        return bce

EfficientUNetLoss

Bases: WeightedBCEWithLogitsLoss

Source code in src/aegear/training.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class EfficientUNetLoss(WeightedBCEWithLogitsLoss):
    def __init__(self, limit=0.5, pos_weight=10.0, centroid_weight=2.5e-3, sparsity_weight=1e-3):
        """
        Initialize the loss with weights for BCE and centroid distance.
        """
        super().__init__(limit, pos_weight)
        self.centroid_weight = centroid_weight
        self.sparsity_weight = sparsity_weight

    def __call__(self, pred, target):
        bce_loss = super().__call__(pred, target)
        cdist_loss = self.centroid_distance_loss(pred, target)
        sparsity_loss = self.sparsity_weight * pred.pow(2).mean()
        return bce_loss + self.centroid_weight * cdist_loss + sparsity_loss

    @staticmethod
    def centroid_distance_loss(pred, target):
        preds = get_centroids_per_sample(torch.sigmoid(pred))
        targets = get_centroids_per_sample(target)

        distances = []

        for p, t in zip(preds, targets):
            if p is not None and t is not None:
                x_p, y_p, _ = p
                x_t, y_t, _ = t
                dist = torch.sqrt((x_p - x_t) ** 2 + (y_p - y_t) ** 2 + 1e-8)
                distances.append(dist)

        if not distances:
            return torch.tensor(0.0).to(pred.device)

        return torch.stack(distances).mean()

SiameseLoss

Bases: EfficientUNetLoss

Siamese loss function that combines the EfficientUNetLoss with an RGB consistency loss.

Source code in src/aegear/training.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class SiameseLoss(EfficientUNetLoss):
    """
    Siamese loss function that combines the EfficientUNetLoss with an RGB consistency loss.
    """

    def __init__(
        self,
        limit=0.5,
        pos_weight=10.0,
        centroid_weight=2.5e-3,
        sparsity_weight=1e-3,
        rgb_weight=5e-3,
        rgb_sigma=2.0,
        rgb_threshold=0.5
    ):
        """
        Initialize the SiameseLoss with weights for different components.
        """
        super().__init__(limit, pos_weight, centroid_weight, sparsity_weight)

        self.rgb_weight = rgb_weight
        self.rgb_sigma = rgb_sigma
        self.rgb_threshold = rgb_threshold

    def __call__(self, output, target, template, search):
        """
        Compute the total loss given predictions and targets.
        """
        main_loss = super().__call__(output, target)
        rgb_loss = self.rgb_consistency_loss(template, search, output)

        return main_loss + self.rgb_weight * rgb_loss

    def rgb_consistency_loss(self, template_img, search_img, pred_heatmap):
        """
        Compute the RGB consistency loss between template and search images
        based on the predicted heatmap.
        """
        B, _, H, W = template_img.shape
        device = template_img.device

        # === Create fixed centered Gaussian for all batch
        grid_y, grid_x = torch.meshgrid(
            torch.linspace(0, H - 1, H, device=device),
            torch.linspace(0, W - 1, W, device=device),
            indexing='ij'
        )
        center_y = (H - 1) / 2
        center_x = (W - 1) / 2
        gaussian = torch.exp(-((grid_x - center_x)**2 +
                             (grid_y - center_y)**2) / (2 * self.rgb_sigma**2))
        gaussian /= gaussian.sum() + 1e-8
        gaussian = gaussian[None, None, :, :]  # shape (1, 1, H, W)

        loss = 0.0
        for i in range(B):
            # === Mask and normalize predicted heatmap
            mask = (pred_heatmap[i] > self.rgb_threshold).float()
            weighted_mask = pred_heatmap[i] * mask
            weighted_mask /= weighted_mask.sum() + 1e-8  # (1, H, W)

            # === Compute mean RGB in search
            rgb_search = (search_img[i] * weighted_mask).view(3, -1).sum(dim=1)

            # === Compute mean RGB in template using Gaussian
            rgb_template = (template_img[i] *
                            gaussian[0]).view(3, -1).sum(dim=1)

            loss += F.mse_loss(rgb_search, rgb_template)

        return loss / B

rgb_consistency_loss(template_img, search_img, pred_heatmap)

Compute the RGB consistency loss between template and search images based on the predicted heatmap.

Source code in src/aegear/training.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def rgb_consistency_loss(self, template_img, search_img, pred_heatmap):
    """
    Compute the RGB consistency loss between template and search images
    based on the predicted heatmap.
    """
    B, _, H, W = template_img.shape
    device = template_img.device

    # === Create fixed centered Gaussian for all batch
    grid_y, grid_x = torch.meshgrid(
        torch.linspace(0, H - 1, H, device=device),
        torch.linspace(0, W - 1, W, device=device),
        indexing='ij'
    )
    center_y = (H - 1) / 2
    center_x = (W - 1) / 2
    gaussian = torch.exp(-((grid_x - center_x)**2 +
                         (grid_y - center_y)**2) / (2 * self.rgb_sigma**2))
    gaussian /= gaussian.sum() + 1e-8
    gaussian = gaussian[None, None, :, :]  # shape (1, 1, H, W)

    loss = 0.0
    for i in range(B):
        # === Mask and normalize predicted heatmap
        mask = (pred_heatmap[i] > self.rgb_threshold).float()
        weighted_mask = pred_heatmap[i] * mask
        weighted_mask /= weighted_mask.sum() + 1e-8  # (1, H, W)

        # === Compute mean RGB in search
        rgb_search = (search_img[i] * weighted_mask).view(3, -1).sum(dim=1)

        # === Compute mean RGB in template using Gaussian
        rgb_template = (template_img[i] *
                        gaussian[0]).view(3, -1).sum(dim=1)

        loss += F.mse_loss(rgb_search, rgb_template)

    return loss / B

get_confidence(heatmap)

Get confidence score from a heatmap by finding the maximum value.

Source code in src/aegear/training.py
16
17
18
19
20
21
22
23
24
def get_confidence(heatmap):
    """
    Get confidence score from a heatmap by finding the maximum value.
    """
    b, _, _, w = heatmap.shape
    flat_idx = torch.argmax(heatmap.view(b, -1), dim=1)
    y = flat_idx // w
    x = flat_idx % w
    return heatmap[0, 0, y, x].item()

overlay_heatmap_on_rgb(rgb_tensor, heatmap, alpha=0.5, centroid_color=(0, 1, 0))

Overlay heatmap onto RGB image and draw a circle at the predicted centroid.

Parameters:

Name Type Description Default
rgb_tensor

[3, H, W] tensor

required
heatmap

[H, W] numpy array

required
alpha

blending weight

0.5
centroid_color

(R, G, B) tuple in range 0–1

(0, 1, 0)

Returns: overlay: [H, W, 3] numpy image

Source code in src/aegear/training.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def overlay_heatmap_on_rgb(rgb_tensor, heatmap, alpha=0.5, centroid_color=(0, 1, 0)):
    """
    Overlay heatmap onto RGB image and draw a circle at the predicted centroid.

    Args:
        rgb_tensor: [3, H, W] tensor
        heatmap: [H, W] numpy array
        alpha: blending weight
        centroid_color: (R, G, B) tuple in range 0–1
    Returns:
        overlay: [H, W, 3] numpy image
    """
    rgb = rgb_tensor.permute(1, 2, 0).cpu().numpy()
    rgb = rgb * 0.229 + 0.485
    rgb = rgb.clip(0, 1)

    heatmap_color = plt.cm.hot(heatmap)[..., :3]
    overlay = (1 - alpha) * rgb + alpha * heatmap_color

    # Find centroid
    flat_idx = heatmap.reshape(-1).argmax()
    h, w = heatmap.shape
    cy = flat_idx // w
    cx = flat_idx % w

    # Draw circle
    overlay_uint8 = (overlay * 255).astype(np.uint8)
    cx_int, cy_int = int(cx), int(cy)
    color_bgr = tuple(int(c * 255) for c in reversed(centroid_color))
    cv2.circle(overlay_uint8, (cx_int, cy_int), 4, color_bgr, thickness=1)

    return overlay_uint8 / 255.0

get_centroids_per_sample(heatmap)

Get centroids from a batch of heatmaps.

Source code in src/aegear/training.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def get_centroids_per_sample(heatmap):
    """
    Get centroids from a batch of heatmaps."""
    b, _, _, w = heatmap.shape
    heatmaps = heatmap.squeeze(1)
    centroids = []

    for i in range(b):
        hm = heatmaps[i]
        hm_sum = hm.mean().item()

        if hm_sum < 1e-8:
            centroids.append(None)
        else:
            flat_idx = torch.argmax(hm)
            y = flat_idx // w
            x = flat_idx % w
            conf = hm[y, x]
            centroids.append((x.float(), y.float(), conf.float()))

    return centroids

trajectory

Utility functions for working with 2D trajectories in image frames, including drawing, smoothing, and computing properties of motion paths.

Assumes trajectory is a list of (x, y) pixel coordinates sampled at video frame rate.

smooth_trajectory(trajectory, filterSize=15)

Apply Savitzky-Golay filter to smooth a trajectory.

Parameters:

Name Type Description Default
trajectory list of (t, x, y

Frame id with raw trajectory points.

required
filterSize int

Window size for filtering (must be odd and >= 5).

15

Returns:

Type Description
list[tuple[int, int, int]]

list of (t, x, y): Smoothed trajectory points.

Source code in src/aegear/trajectory.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def smooth_trajectory(trajectory: list[tuple[int, int, int]], filterSize: int = 15) -> list[tuple[int, int, int]]:
    """
    Apply Savitzky-Golay filter to smooth a trajectory.

    Parameters:
        trajectory (list of (t, x, y)): Frame id with raw trajectory points.
        filterSize (int): Window size for filtering (must be odd and >= 5).

    Returns:
        list of (t, x, y): Smoothed trajectory points.
    """
    # Ensure filterSize is odd and at least 5 (polyorder=3, so min window=5)
    if filterSize < 5:
        filterSize = 5
    if filterSize % 2 == 0:
        filterSize += 1
    if len(trajectory) < filterSize:
        return trajectory

    trajectory = np.array(trajectory)
    t = savgol_filter(trajectory[:, 0], filterSize, 3)
    x = savgol_filter(trajectory[:, 1], filterSize, 3)
    y = savgol_filter(trajectory[:, 2], filterSize, 3)

    smoothed = list(zip(t.astype(int), x.astype(int), y.astype(int)))
    return smoothed

detect_trajectory_outliers(trajectory, threshold=20.0)

Detects large jumps in pixel space, indicating likely tracking failures.

Parameters:

Name Type Description Default
trajectory list[tuple[int, int, int]]

List of (frame_idx, x, y) tuples.

required
threshold float

Maximum allowed pixel movement per frame.

20.0

Returns:

Type Description
list[int]

List of frame indices where jump exceeds threshold.

Source code in src/aegear/trajectory.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def detect_trajectory_outliers(
    trajectory: list[tuple[int, int, int]],
    threshold: float = 20.0  # distance in pixels per frame
) -> list[int]:
    """
    Detects large jumps in pixel space, indicating likely tracking failures.

    Args:
        trajectory: List of (frame_idx, x, y) tuples.
        threshold: Maximum allowed pixel movement per frame.

    Returns:
        List of frame indices where jump exceeds threshold.
    """
    if len(trajectory) < 2:
        return []

    frame_idx, xs, ys = zip(*trajectory)
    xs = np.array(xs)
    ys = np.array(ys)
    frame_idx = np.array(frame_idx)

    dx = np.diff(xs)
    dy = np.diff(ys)
    dist = np.sqrt(dx**2 + dy**2)

    # Mark current frame if jump from previous is too large
    outlier_mask = dist > threshold
    outlier_frames = frame_idx[1:][outlier_mask]  # current frame that made the jump

    return list(outlier_frames)

utils

Kalman2D

A simple 2D Kalman filter for tracking.

Source code in src/aegear/utils.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class Kalman2D:
    """A simple 2D Kalman filter for tracking."""

    def __init__(self, r=1.0, q=0.1):
        """Initialize the Kalman filter.

        Parameters
        ----------
        r : float
            The measurement noise.
        q : float
            The process noise.
        """
        self.x = np.zeros((4, 1))  # state
        self.P = np.eye(4) * 1000  # uncertainty

        self.A = np.array([[1, 0, 1, 0],
                           [0, 1, 0, 1],
                           [0, 0, 1, 0],
                           [0, 0, 0, 1]])

        self.H = np.array([[1, 0, 0, 0],
                           [0, 1, 0, 0]])

        self.R = np.eye(2) * r # measurement noise
        self.Q = np.eye(4) * q # process noise

    def reset(self, x, y):
        self.x = np.array([[x], [y], [0], [0]])
        self.P = np.eye(4)

    def update(self, z):
        # Predict
        self.x = self.A @ self.x
        self.P = self.A @ self.P @ self.A.T + self.Q

        # Update
        z = np.array(z).reshape(2, 1)
        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(4) - K @ self.H) @ self.P

        return self.x[0, 0], self.x[1, 0]

resource_path(relative_path)

Get the absolute path to the resource, works for dev and PyInstaller.

Source code in src/aegear/utils.py
12
13
14
15
16
17
18
19
def resource_path(relative_path: str) -> Path:
    """Get the absolute path to the resource, works for dev and PyInstaller."""
    try:
        base_path = Path(sys._MEIPASS)
    except AttributeError:
        # Go two levels up from aegear/app.py → project root
        base_path = Path(__file__).resolve().parents[2]
    return base_path / relative_path

get_latest_model_path(directory, model_name)

Find the latest model file in the given directory matching the base model name. Model files are expected to be named as: modelname_YYYY-MM-DD.pth

Source code in src/aegear/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def get_latest_model_path(directory, model_name):
    """
    Find the latest model file in the given directory matching the base model name.
    Model files are expected to be named as: modelname_YYYY-MM-DD.pth
    """
    pattern = re.compile(rf"{re.escape(model_name)}_(\d{{4}}-\d{{2}}-\d{{2}})\.pth")
    latest_date = None
    latest_file = None

    for filename in os.listdir(directory):
        match = pattern.fullmatch(filename)
        if match:
            date_str = match.group(1)
            try:
                file_date = datetime.strptime(date_str, "%Y-%m-%d")
                if latest_date is None or file_date > latest_date:
                    latest_date = file_date
                    latest_file = filename
            except ValueError:
                continue

    return os.path.join(directory, latest_file) if latest_file else None

video

VideoClip

Minimalistic video clip class for reading video files.

Source code in src/aegear/video.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class VideoClip:
    """Minimalistic video clip class for reading video files."""
    def __init__(self, path):
        self.path = path
        self._cap = cv2.VideoCapture(path)
        if not self._cap.isOpened():
            raise IOError(f"Cannot open video: {path}")

        self.fps = self._cap.get(cv2.CAP_PROP_FPS)
        self.num_frames = int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT))
        self.duration = self.num_frames / self.fps

    def get_frame(self, t):
        """
        Return the frame at time `t` (in seconds).
        """
        frame_id = int(t * self.fps)
        return self.get_frame_by_index(frame_id)

    def get_frame_by_index(self, frame_id):
        """
        Return the frame at the given frame index.
        """
        self._cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
        success, frame = self._cap.read()
        if not success:
            return None

        # Convert BGR to RGB
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        return frame

    def get_frame_width(self):
        """
        Return the width of the video frames.
        """
        return int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))

    def get_frame_height(self):
        """
        Return the height of the video frames.
        """
        return int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    def get_frame_shape(self):
        """
        Return the shape of the video frames.
        """
        return (self.get_frame_height(), self.get_frame_width(), 3)

    def release(self):
        self._cap.release()

    def __del__(self):
        self.release()

get_frame(t)

Return the frame at time t (in seconds).

Source code in src/aegear/video.py
16
17
18
19
20
21
def get_frame(self, t):
    """
    Return the frame at time `t` (in seconds).
    """
    frame_id = int(t * self.fps)
    return self.get_frame_by_index(frame_id)

get_frame_by_index(frame_id)

Return the frame at the given frame index.

Source code in src/aegear/video.py
23
24
25
26
27
28
29
30
31
32
33
34
35
def get_frame_by_index(self, frame_id):
    """
    Return the frame at the given frame index.
    """
    self._cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
    success, frame = self._cap.read()
    if not success:
        return None

    # Convert BGR to RGB
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    return frame

get_frame_width()

Return the width of the video frames.

Source code in src/aegear/video.py
37
38
39
40
41
def get_frame_width(self):
    """
    Return the width of the video frames.
    """
    return int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))

get_frame_height()

Return the height of the video frames.

Source code in src/aegear/video.py
43
44
45
46
47
def get_frame_height(self):
    """
    Return the height of the video frames.
    """
    return int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

get_frame_shape()

Return the shape of the video frames.

Source code in src/aegear/video.py
49
50
51
52
53
def get_frame_shape(self):
    """
    Return the shape of the video frames.
    """
    return (self.get_frame_height(), self.get_frame_width(), 3)