Skip to content

sgnl.transforms.lloid_correlate

An element that performs cross-correlation of input data with filters, with certain time delays and padding for upsampling and downsampling

LLOIDCorrelate dataclass

Bases: TSTransform

Correlates input data with filters, with certain time delays, and padding for upsampling and downsampling

Parameters:

Name Type Description Default
filters Array

Array, the filter to correlate over

None
backend type[ArrayBackend]

type[ArrayBackend], the wrapper around array operations

NumpyBackend
uppad int

int, the upsampling padding, in offsets

0
downpad int

int, the downsampling padding, in offsets

0
delays list[int]

list[int], a list of time delays for each time slice, in offsets

None
Source code in sgnl/transforms/lloid_correlate.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@dataclass
class LLOIDCorrelate(TSTransform):
    """Correlates input data with filters, with certain time delays, and padding for
    upsampling and downsampling

    Args:
        filters:
            Array, the filter to correlate over
        backend:
            type[ArrayBackend], the wrapper around array operations
        uppad:
            int, the upsampling padding, in offsets
        downpad:
            int, the downsampling padding, in offsets
        delays:
            list[int], a list of time delays for each time slice, in offsets
    """

    filters: Array = None
    backend: type[ArrayBackend] = NumpyBackend
    uppad: int = 0
    downpad: int = 0
    delays: list[int] = None
    reconstruction_segment_list: list[segments.segment] = None

    def __post_init__(self):
        assert self.filters is not None
        self.shape = self.filters.shape
        self.filters = self.filters.view(-1, 1, self.shape[-1])
        super().__post_init__()
        assert (
            len(self.sink_pads) == 1 and len(self.source_pads) == 1
        ), "only one sink_pad and one source_pad is allowed"

        self.audioadapter = Audioadapter(self.backend)
        self.unique_delays = sorted(set(self.delays))
        self.startup = True

    def corr(self, data: Array) -> Array:
        """Correlate the data with filters.

        Args:
            data:
                Array, the data to correlate with the filters

        Returns:
            Array, the output of the correlation
        """
        # FIXME: try supporting numpy?
        return Fconv1d(data, self.filters, groups=data.shape[-2]).view(
            self.shape[:-1] + (-1,)
        )

    def new(self, pad: SourcePad) -> TSFrame:
        """Correlates incoming frames with filters.

        Args:
            pad:
                SourcePad, the source pad to produce the transformed frames

        Returns:
            TSFrame, the output TSFrame
        """
        A = self.audioadapter
        frame = self.preparedframes[self.sink_pads[0]]
        outbufs = []

        # process buffer by buffer
        for buf in frame:
            # find the reference segment "this_segment"
            if buf.end_offset - buf.offset == 0:
                if self.startup:
                    this_segment1 = buf.end_offset
                else:
                    this_segment1 = buf.end_offset + self.downpad
                this_segment0 = this_segment1
                outbufs.append(
                    SeriesBuffer(
                        offset=this_segment0 + self.uppad,
                        sample_rate=buf.sample_rate,
                        data=None,
                        shape=self.shape[:-1] + (0,),
                    )
                )
            else:
                this_segment1 = buf.end_offset + self.downpad
                if self.startup:
                    this_segment0 = (
                        this_segment1 - (buf.end_offset - buf.offset) - self.downpad
                    )
                    self.startup = False
                else:
                    this_segment0 = this_segment1 - (buf.end_offset - buf.offset)
                this_segment = (this_segment0, this_segment1)
                A.push(buf)

                if (
                    self.reconstruction_segment_list is not None
                    and not self.reconstruction_segment_list.intersects_segment(
                        segments.segment(
                            LIGOTimeGPS(Offset.tosec(this_segment0)),
                            LIGOTimeGPS(Offset.tosec(this_segment1)),
                        )
                    )
                ):
                    # The ortho snr segment does not intersect the reconstruction
                    # segment list, don't need to produce this ortho snr
                    outbufs.append(
                        SeriesBuffer(
                            offset=this_segment[0] + self.uppad,
                            sample_rate=buf.sample_rate,
                            data=None,
                            shape=self.shape[:-1]
                            + (
                                Offset.tosamples(
                                    this_segment1 - this_segment0, buf.sample_rate
                                ),
                            ),
                        )
                    )
                else:
                    outs_map = {}
                    # Only do the copy for unique delays
                    copied_data = False
                    earliest = []

                    # copy out the unique segments
                    for delay in self.unique_delays:
                        # find the segment to copy out
                        cp_segment1 = this_segment1 + self.uppad - delay
                        cp_segment0 = (
                            cp_segment1
                            - (this_segment1 - this_segment0)
                            - Offset.fromsamples(self.shape[-1] - 1, buf.sample_rate)
                        )
                        earliest.append(cp_segment0)
                        if cp_segment1 > A.offset and not A.is_gap:
                            if A.pre_cat_data is None:
                                A.concatenate_data(
                                    (
                                        max(
                                            this_segment0
                                            + self.uppad
                                            - max(self.unique_delays)
                                            - Offset.fromsamples(
                                                self.shape[-1] - 1, buf.sample_rate
                                            ),
                                            A.offset,
                                        ),
                                        this_segment1
                                        + self.uppad
                                        - min(self.unique_delays),
                                    )
                                )
                            cp_segment = (max(A.offset, cp_segment0), cp_segment1)
                            # We need to do a copy
                            out = A.copy_samples_by_offset_segment(cp_segment)
                            if cp_segment0 < A.offset and out is not None:
                                # pad with zeros in front
                                pad_length = Offset.tosamples(
                                    A.offset - cp_segment0, buf.sample_rate
                                )
                                out = self.backend.pad(out, (pad_length, 0))
                            copied_data = True
                        else:
                            out = None
                        outs_map[delay] = out

                    # fill in zeros arrays
                    if copied_data is True:
                        outs = []
                        # Now stack the output array
                        if len(self.unique_delays) == 1:
                            delay = self.unique_delays[0]
                            out = outs_map[delay]
                            if out is not None:
                                outs = outs_map[delay].unsqueeze(0)
                            else:
                                outs = None
                        else:
                            for delay in self.delays:
                                out = outs_map[delay]
                                if out is None:
                                    out = self.backend.zeros(
                                        (
                                            Offset.tosamples(
                                                cp_segment1 - cp_segment0,
                                                buf.sample_rate,
                                            ),
                                        )
                                    )
                                outs.append(out)
                            outs = self.backend.stack(outs)
                    else:
                        outs = None

                    # flush data
                    flush_end_offset = min(earliest)
                    if flush_end_offset > A.offset:
                        A.flush_samples_by_end_offset(flush_end_offset)

                    # Do the correlation!
                    if outs is not None:
                        outs = self.corr(outs)
                    outbufs.append(
                        SeriesBuffer(
                            offset=this_segment[0] + self.uppad,
                            sample_rate=buf.sample_rate,
                            data=outs,
                            shape=self.shape[:-1]
                            + (
                                Offset.tosamples(
                                    this_segment1 - this_segment0, buf.sample_rate
                                ),
                            ),
                        )
                    )
        return TSFrame(buffers=outbufs, EOS=frame.EOS, metadata=frame.metadata)

corr(data)

Correlate the data with filters.

Parameters:

Name Type Description Default
data Array

Array, the data to correlate with the filters

required

Returns:

Type Description
Array

Array, the output of the correlation

Source code in sgnl/transforms/lloid_correlate.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def corr(self, data: Array) -> Array:
    """Correlate the data with filters.

    Args:
        data:
            Array, the data to correlate with the filters

    Returns:
        Array, the output of the correlation
    """
    # FIXME: try supporting numpy?
    return Fconv1d(data, self.filters, groups=data.shape[-2]).view(
        self.shape[:-1] + (-1,)
    )

new(pad)

Correlates incoming frames with filters.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the source pad to produce the transformed frames

required

Returns:

Type Description
TSFrame

TSFrame, the output TSFrame

Source code in sgnl/transforms/lloid_correlate.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def new(self, pad: SourcePad) -> TSFrame:
    """Correlates incoming frames with filters.

    Args:
        pad:
            SourcePad, the source pad to produce the transformed frames

    Returns:
        TSFrame, the output TSFrame
    """
    A = self.audioadapter
    frame = self.preparedframes[self.sink_pads[0]]
    outbufs = []

    # process buffer by buffer
    for buf in frame:
        # find the reference segment "this_segment"
        if buf.end_offset - buf.offset == 0:
            if self.startup:
                this_segment1 = buf.end_offset
            else:
                this_segment1 = buf.end_offset + self.downpad
            this_segment0 = this_segment1
            outbufs.append(
                SeriesBuffer(
                    offset=this_segment0 + self.uppad,
                    sample_rate=buf.sample_rate,
                    data=None,
                    shape=self.shape[:-1] + (0,),
                )
            )
        else:
            this_segment1 = buf.end_offset + self.downpad
            if self.startup:
                this_segment0 = (
                    this_segment1 - (buf.end_offset - buf.offset) - self.downpad
                )
                self.startup = False
            else:
                this_segment0 = this_segment1 - (buf.end_offset - buf.offset)
            this_segment = (this_segment0, this_segment1)
            A.push(buf)

            if (
                self.reconstruction_segment_list is not None
                and not self.reconstruction_segment_list.intersects_segment(
                    segments.segment(
                        LIGOTimeGPS(Offset.tosec(this_segment0)),
                        LIGOTimeGPS(Offset.tosec(this_segment1)),
                    )
                )
            ):
                # The ortho snr segment does not intersect the reconstruction
                # segment list, don't need to produce this ortho snr
                outbufs.append(
                    SeriesBuffer(
                        offset=this_segment[0] + self.uppad,
                        sample_rate=buf.sample_rate,
                        data=None,
                        shape=self.shape[:-1]
                        + (
                            Offset.tosamples(
                                this_segment1 - this_segment0, buf.sample_rate
                            ),
                        ),
                    )
                )
            else:
                outs_map = {}
                # Only do the copy for unique delays
                copied_data = False
                earliest = []

                # copy out the unique segments
                for delay in self.unique_delays:
                    # find the segment to copy out
                    cp_segment1 = this_segment1 + self.uppad - delay
                    cp_segment0 = (
                        cp_segment1
                        - (this_segment1 - this_segment0)
                        - Offset.fromsamples(self.shape[-1] - 1, buf.sample_rate)
                    )
                    earliest.append(cp_segment0)
                    if cp_segment1 > A.offset and not A.is_gap:
                        if A.pre_cat_data is None:
                            A.concatenate_data(
                                (
                                    max(
                                        this_segment0
                                        + self.uppad
                                        - max(self.unique_delays)
                                        - Offset.fromsamples(
                                            self.shape[-1] - 1, buf.sample_rate
                                        ),
                                        A.offset,
                                    ),
                                    this_segment1
                                    + self.uppad
                                    - min(self.unique_delays),
                                )
                            )
                        cp_segment = (max(A.offset, cp_segment0), cp_segment1)
                        # We need to do a copy
                        out = A.copy_samples_by_offset_segment(cp_segment)
                        if cp_segment0 < A.offset and out is not None:
                            # pad with zeros in front
                            pad_length = Offset.tosamples(
                                A.offset - cp_segment0, buf.sample_rate
                            )
                            out = self.backend.pad(out, (pad_length, 0))
                        copied_data = True
                    else:
                        out = None
                    outs_map[delay] = out

                # fill in zeros arrays
                if copied_data is True:
                    outs = []
                    # Now stack the output array
                    if len(self.unique_delays) == 1:
                        delay = self.unique_delays[0]
                        out = outs_map[delay]
                        if out is not None:
                            outs = outs_map[delay].unsqueeze(0)
                        else:
                            outs = None
                    else:
                        for delay in self.delays:
                            out = outs_map[delay]
                            if out is None:
                                out = self.backend.zeros(
                                    (
                                        Offset.tosamples(
                                            cp_segment1 - cp_segment0,
                                            buf.sample_rate,
                                        ),
                                    )
                                )
                            outs.append(out)
                        outs = self.backend.stack(outs)
                else:
                    outs = None

                # flush data
                flush_end_offset = min(earliest)
                if flush_end_offset > A.offset:
                    A.flush_samples_by_end_offset(flush_end_offset)

                # Do the correlation!
                if outs is not None:
                    outs = self.corr(outs)
                outbufs.append(
                    SeriesBuffer(
                        offset=this_segment[0] + self.uppad,
                        sample_rate=buf.sample_rate,
                        data=outs,
                        shape=self.shape[:-1]
                        + (
                            Offset.tosamples(
                                this_segment1 - this_segment0, buf.sample_rate
                            ),
                        ),
                    )
                )
    return TSFrame(buffers=outbufs, EOS=frame.EOS, metadata=frame.metadata)