API Documentation

DataSource

Source code in src/zizou/data.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
class DataSource:
    def __init__(self, clients, chunk_size=None, cache_dir=None):
        self.chunk_size = chunk_size

        if len(clients) < 1:
            msg = "At least one data client is required."
            raise ValueError(msg)

        # Test that all waveform clients are derived from WaveformBaseclass
        for client in clients:
            if not issubclass(client.__class__, WaveformBaseclass):
                msg = "All clients must be derived from WaveformBaseclass."
                raise ValueError(msg)

        self.clients = clients
        self.cache_dir = os.path.join(os.environ["HOME"], "zizou_cache")
        if cache_dir is not None:
            self.cache_dir = cache_dir

        os.makedirs(self.cache_dir, exist_ok=True)
        self.cache_client = SDS_Client(self.cache_dir)

    def to_sds(self, tr: Trace):
        """
        Save trace to SDS directory.
        """
        start = tr.stats.starttime
        end = tr.stats.endtime
        sds_fmtstr = os.path.join(
            "{year}",
            "{network}",
            "{station}",
            "{channel}.{sds_type}",
            "{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}",
        )

        current_date = start.date
        while current_date <= end.date:
            _tr = tr.slice(UTCDateTime(current_date), UTCDateTime(current_date) + 86400)
            # print(_tr.stats.starttime, _tr.stats.endtime)
            fullpath = sds_fmtstr.format(
                year=_tr.stats.starttime.year,
                doy=_tr.stats.starttime.julday,
                sds_type="D",
                **_tr.stats,
            )
            fullpath = os.path.join(self.cache_dir, fullpath)
            dirname, _ = os.path.split(fullpath)
            os.makedirs(dirname, exist_ok=True)
            logger.debug(f"writing {_tr} to {fullpath}")
            _tr.write(fullpath, format="MSEED")
            current_date += timedelta(days=1)

    def get_waveforms(self, net, site, loc, comp, start, end, cache=False):
        t_start = start
        t_end = end
        if self.chunk_size is not None:
            t_end = t_start + self.chunk_size
            t_end = min(t_end, end)
        while t_end <= end:
            tr = Trace()
            try:
                # First check the cache
                st = self.cache_client.get_waveforms(
                    net, site, loc, comp, t_start, t_end, dtype="float64"
                )
                tr = st.merge(fill_value=np.nan)[0]
                t_diff = int(t_end - t_start)
                if abs(t_diff - (tr.stats.npts - 1) * tr.stats.delta) > 1:
                    raise IndexError
                tr.stats["cached"] = True
            except (IndexError, AttributeError):
                msg = "Data for {} between {} and {} not found in cache."
                logger.debug(
                    msg.format(".".join((net, site, loc, comp)), t_start, t_end)
                )
                for client in self.clients:
                    try:
                        tr = client.get_waveforms(net, site, loc, comp, t_start, t_end)
                    except Exception as e:
                        logger.info(e)
                        continue
                    else:
                        break
                if not tr:
                    msg = "No data found for {}"
                    logger.error(msg.format(".".join((net, site, loc, comp))))
                else:
                    if cache:
                        self.to_sds(tr)
            yield tr

            if self.chunk_size is not None:
                t_start = t_end
                cond1 = (end - t_end) < self.chunk_size
                cond2 = end > t_end
                if cond1 and cond2:
                    last_chunk = end - t_end
                    t_end = t_start + last_chunk
                else:
                    t_end = t_start + self.chunk_size
            else:
                # go beyond the end to stop the loop
                t_end = end + 1.0

to_sds(tr)

Save trace to SDS directory.

Source code in src/zizou/data.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def to_sds(self, tr: Trace):
    """
    Save trace to SDS directory.
    """
    start = tr.stats.starttime
    end = tr.stats.endtime
    sds_fmtstr = os.path.join(
        "{year}",
        "{network}",
        "{station}",
        "{channel}.{sds_type}",
        "{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}",
    )

    current_date = start.date
    while current_date <= end.date:
        _tr = tr.slice(UTCDateTime(current_date), UTCDateTime(current_date) + 86400)
        # print(_tr.stats.starttime, _tr.stats.endtime)
        fullpath = sds_fmtstr.format(
            year=_tr.stats.starttime.year,
            doy=_tr.stats.starttime.julday,
            sds_type="D",
            **_tr.stats,
        )
        fullpath = os.path.join(self.cache_dir, fullpath)
        dirname, _ = os.path.split(fullpath)
        os.makedirs(dirname, exist_ok=True)
        logger.debug(f"writing {_tr} to {fullpath}")
        _tr.write(fullpath, format="MSEED")
        current_date += timedelta(days=1)

FDSNWaveforms

Bases: WaveformBaseclass

Source code in src/zizou/data.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class FDSNWaveforms(WaveformBaseclass):
    def __init__(self, url, debug=False, fill_value=np.nan):
        """
        Get seismic waveforms from FDSN web service.
        """
        self.client = FDSN_Client(base_url=url, debug=debug)
        self.fill_value = fill_value

    def get_waveforms(self, net, site, loc, comp, startdate, enddate):
        st = self.client.get_waveforms(
            net, site, loc, comp, startdate, enddate, attach_response=True
        )
        # Prepare seismic time series
        st.remove_sensitivity()
        # in case stream has more than one trace
        st.merge(fill_value=self.fill_value)
        if self.fill_value != "interpolate":
            st.trim(
                startdate,
                enddate,
                pad=True,
                fill_value=self.fill_value,
                nearest_sample=False,
            )
        return st[0]

__init__(url, debug=False, fill_value=np.nan)

Get seismic waveforms from FDSN web service.

Source code in src/zizou/data.py
445
446
447
448
449
450
def __init__(self, url, debug=False, fill_value=np.nan):
    """
    Get seismic waveforms from FDSN web service.
    """
    self.client = FDSN_Client(base_url=url, debug=debug)
    self.fill_value = fill_value

MockSDSWaveforms

Bases: WaveformBaseclass

Mock SDSWaveforms class for testing by creating synthetic data for the requested streams.

Source code in src/zizou/data.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
class MockSDSWaveforms(WaveformBaseclass):
    """
    Mock SDSWaveforms class for testing by creating
    synthetic data for the requested streams.
    """

    def __init__(self, sds_dir):
        """
        Get seismic waveforms from a local SDS archive.
        """
        os.makedirs(sds_dir, exist_ok=True)
        self.client = SDS_Client(sds_dir)

    def save2sds(self, trace: Trace, rootdir: str):
        """
        Save a trace to a SDS directory structure.

        Parameters
        ----------
        trace : `obspy.Trace`
            Seismic/acoustic trace to be written to disk.
        rootdir : str
            Root directory for the SDS directory structure.
        """
        sds_fmtstr = os.path.join(
            "{year}",
            "{network}",
            "{station}",
            "{channel}.{sds_type}",
            "{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}",
        )

        fullpath = sds_fmtstr.format(
            year=trace.stats.starttime.year,
            doy=trace.stats.starttime.julday,
            sds_type="D",
            **trace.stats,
        )
        fullpath = os.path.join(rootdir, fullpath)
        dirname, filename = os.path.split(fullpath)
        os.makedirs(dirname, exist_ok=True)
        print("writing", fullpath)
        trace.write(fullpath, format="mseed")

    def generate_dataset(
        self,
        rootdir: str,
        net: str,
        site: str,
        loc: str,
        comp: str,
        start: str,
        end: str,
    ) -> str:
        """
        Generate a test dataset for the integration tests.

        Parameters
        ----------
        rootdir : str
            Parent directory for the test dataset.
        start : str
            Start time for the test dataset in ISO 8601 format.
        end : str
            End time for the test dataset in ISO 8601 format.
        """
        tstart = UTCDateTime(start)
        # Always generate whole day files
        _tstart = UTCDateTime(year=tstart.year, julday=tstart.julday)
        tend = UTCDateTime(end)
        tr = test_signal(
            starttime=_tstart,
            sampling_rate=10,
            nsec=86400,
            gaps=True,
            network=net,
            station=site,
            location=loc,
            channel=comp,
        )
        while _tstart < tend:
            tr.stats.starttime = _tstart
            self.save2sds(tr, rootdir)
            _tstart += 86400

    def get_waveforms(self, net, site, loc, comp, startdate, enddate):
        """
        Generate synthetic daily files that start at midnight on the
        startdate and end at midnight on the enddate. The return the requested
        timespan from these files.
        """
        self.generate_dataset(
            self.client.sds_root, net, site, loc, comp, startdate, enddate
        )
        st = self.client.get_waveforms(
            net, site, loc, comp, startdate, enddate, dtype="float64"
        )
        return st[0]

__init__(sds_dir)

Get seismic waveforms from a local SDS archive.

Source code in src/zizou/data.py
350
351
352
353
354
355
def __init__(self, sds_dir):
    """
    Get seismic waveforms from a local SDS archive.
    """
    os.makedirs(sds_dir, exist_ok=True)
    self.client = SDS_Client(sds_dir)

generate_dataset(rootdir, net, site, loc, comp, start, end)

Generate a test dataset for the integration tests.

Parameters

rootdir : str Parent directory for the test dataset. start : str Start time for the test dataset in ISO 8601 format. end : str End time for the test dataset in ISO 8601 format.

Source code in src/zizou/data.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def generate_dataset(
    self,
    rootdir: str,
    net: str,
    site: str,
    loc: str,
    comp: str,
    start: str,
    end: str,
) -> str:
    """
    Generate a test dataset for the integration tests.

    Parameters
    ----------
    rootdir : str
        Parent directory for the test dataset.
    start : str
        Start time for the test dataset in ISO 8601 format.
    end : str
        End time for the test dataset in ISO 8601 format.
    """
    tstart = UTCDateTime(start)
    # Always generate whole day files
    _tstart = UTCDateTime(year=tstart.year, julday=tstart.julday)
    tend = UTCDateTime(end)
    tr = test_signal(
        starttime=_tstart,
        sampling_rate=10,
        nsec=86400,
        gaps=True,
        network=net,
        station=site,
        location=loc,
        channel=comp,
    )
    while _tstart < tend:
        tr.stats.starttime = _tstart
        self.save2sds(tr, rootdir)
        _tstart += 86400

get_waveforms(net, site, loc, comp, startdate, enddate)

Generate synthetic daily files that start at midnight on the startdate and end at midnight on the enddate. The return the requested timespan from these files.

Source code in src/zizou/data.py
429
430
431
432
433
434
435
436
437
438
439
440
441
def get_waveforms(self, net, site, loc, comp, startdate, enddate):
    """
    Generate synthetic daily files that start at midnight on the
    startdate and end at midnight on the enddate. The return the requested
    timespan from these files.
    """
    self.generate_dataset(
        self.client.sds_root, net, site, loc, comp, startdate, enddate
    )
    st = self.client.get_waveforms(
        net, site, loc, comp, startdate, enddate, dtype="float64"
    )
    return st[0]

save2sds(trace, rootdir)

Save a trace to a SDS directory structure.

Parameters

trace : obspy.Trace Seismic/acoustic trace to be written to disk. rootdir : str Root directory for the SDS directory structure.

Source code in src/zizou/data.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def save2sds(self, trace: Trace, rootdir: str):
    """
    Save a trace to a SDS directory structure.

    Parameters
    ----------
    trace : `obspy.Trace`
        Seismic/acoustic trace to be written to disk.
    rootdir : str
        Root directory for the SDS directory structure.
    """
    sds_fmtstr = os.path.join(
        "{year}",
        "{network}",
        "{station}",
        "{channel}.{sds_type}",
        "{network}.{station}.{location}.{channel}.{sds_type}.{year}.{doy:03d}",
    )

    fullpath = sds_fmtstr.format(
        year=trace.stats.starttime.year,
        doy=trace.stats.starttime.julday,
        sds_type="D",
        **trace.stats,
    )
    fullpath = os.path.join(rootdir, fullpath)
    dirname, filename = os.path.split(fullpath)
    os.makedirs(dirname, exist_ok=True)
    print("writing", fullpath)
    trace.write(fullpath, format="mseed")

PostProcess

Class for checking waveforms and station metadata for valid values, filling waveform data gaps and removing instrument sensitivity

Source code in src/zizou/data.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class PostProcess:
    """
    Class for checking waveforms and station metadata for valid values,
    filling waveform data gaps and removing instrument sensitivity
    """

    def __init__(
        self,
        st=Stream(),
        inv=Inventory(),
        inv_dt=Inventory(),
        startdate=None,
        enddate=None,
        loc="*",
        comp="*Z",
        fill_value=np.nan,
    ):
        self.st = st
        self.inv = inv
        self.inv_dt = inv_dt
        self.startdate = startdate
        self.enddate = enddate
        self.loc = loc
        self.comp = comp
        self.fill_value = fill_value

        # Get check registry
        self.checklist = [
            x
            for x in inspect.getmembers(self)
            if inspect.ismethod(x[1]) and x[0].startswith("check")
        ]
        # Get matching post-processing functions
        self.pp_functions = []
        for check in self.checklist:
            self.pp_functions.append(
                [
                    x
                    for x in inspect.getmembers(self)
                    if x[0] == check[0].replace("check", "output")
                ][0]
            )

        # Run checks
        self.res = None

    def run_post_processing(self):
        self.res = self.run_checks()
        func = [a[1] for (a, b) in zip(self.pp_functions, self.res) if b][0]
        trace = func()
        return trace

    def run_checks(self):
        self.res = [check[1]() for check in self.checklist]
        if sum(self.res) == 0:
            raise NotImplementedError(
                "No post processing not set up for this case:\n" "{}".format(self.st)
            )
        elif sum(self.res) > 1:
            msg = (
                "More than one check is true: {}. Processing workflow "
                "is unclear.".format(
                    ", ".join([a[0] for (a, b) in zip(self.checklist, self.res) if b])
                )
            )
            raise PostProcessException(msg)
        else:
            return self.res

    def check_case_no_data(self):
        """
        Checks if either no stream, or no station metadata exist.
        """
        if len(self.st) < 1 or len(self.inv_dt) < 1:
            return True
        elif (
            len(self.inv_dt[0][0]) == 1
            and self.inv_dt[0][0][0].start_date == self.enddate
        ):
            return True
        elif (
            len(self.inv_dt[0][0]) == 1
            and self.inv_dt[0][0][0].end_date == self.startdate
        ):
            return True
        else:
            return False

    def output_case_no_data(self):
        raise PostProcessException

    def check_case_multiple_channel_periods(self):
        """
        Checks if there are multiple channel operation periods in a given
        time period. If so, different response removal is required.
        """
        if len(self.inv_dt) < 1 or len(self.st) < 1:
            return False
        elif len(self.inv_dt[0][0]) > 1:
            return True
        else:
            return False

    def output_case_multiple_channel_periods(self):
        """
        Stitches two response periods together to make a single merged
        trace.
        """
        if len(self.inv_dt[0][0]) > 2:
            raise NotImplementedError(
                "Current post processing function only valid for a "
                "maximum of two response periods."
            )
        st1 = self.st.copy()
        st1.merge(fill_value=self.fill_value)
        st2 = st1.copy()
        st1.trim(
            starttime=self.startdate,
            endtime=self.inv_dt[0][0][0].end_date,
            nearest_sample=False,
        )
        st2.trim(
            starttime=self.inv_dt[0][0][1].start_date,
            endtime=self.enddate,
            nearest_sample=False,
        )
        st1.attach_response(self.inv_dt)
        st1.remove_sensitivity()
        st2.attach_response(self.inv_dt)
        st2.remove_sensitivity()
        st1 += st2
        st1.merge(fill_value=self.fill_value)
        st1.trim(
            self.startdate,
            self.enddate,
            nearest_sample=False,
            pad=True,
            fill_value=self.fill_value,
        )
        return st1[0]

    def check_case_incomplete_station_metadata(self):
        """
        The case where a a channel period start/stops during the
        selected time window. Some parts of the trace in the stream
        might not correspond to a valid period according to station
        metadata.
        """
        if len(self.inv_dt) < 1:
            return False
        elif len(self.inv_dt[0][0]) != 1 or len(self.st) < 1:
            return False
        elif (self.startdate < self.inv_dt[0][0][0].start_date < self.enddate) or (
            self.startdate < self.inv_dt[0][0][0].end_date < self.enddate
        ):
            return True
        else:
            return False

    def output_case_incomplete_station_metadata(self):
        if self.inv_dt[0][0][0].start_date > self.startdate:
            self.st.trim(
                self.inv_dt[0][0][0].start_date, self.enddate, nearest_sample=False
            )
            self.st.attach_response(self.inv_dt)
            self.st.remove_sensitivity()
            self.st.trim(
                self.startdate,
                self.enddate,
                nearest_sample=False,
                pad=True,
                fill_value=self.fill_value,
            )
            return self.st[0]
        elif self.inv_dt[0][0][0].end_date < self.enddate:
            self.st.trim(
                self.startdate, self.inv_dt[0][0][0].end_date, nearest_sample=False
            )
            self.st.attach_response(self.inv_dt)
            self.st.remove_sensitivity()
            self.st.trim(
                self.startdate,
                self.enddate,
                nearest_sample=False,
                pad=True,
                fill_value=self.fill_value,
            )
            return self.st[0]
        else:
            raise PostProcessException(
                "Post-procesing function not equipped to" "handle:\n" "{}".format(
                    self.st
                )
            )

    def check_case_no_station_issues(self):
        if len(self.inv_dt) < 1:
            return False
        elif len(self.inv_dt[0][0]) != 1 or len(self.st) < 1:
            return False
        elif (
            len(self.inv_dt[0][0]) == 1
            and self.inv_dt[0][0][0].end_date == self.startdate
        ):
            return False
        elif (
            len(self.inv_dt[0][0]) == 1
            and self.inv_dt[0][0][0].start_date == self.enddate
        ):
            return False
        elif (self.startdate < self.inv_dt[0][0][0].start_date < self.enddate) or (
            self.startdate < self.inv_dt[0][0][0].end_date < self.enddate
        ):
            return False
        else:
            return True

    def output_case_no_station_issues(self):
        self.st.merge(fill_value=self.fill_value)
        self.st.trim(
            starttime=self.startdate,
            endtime=self.enddate,
            nearest_sample=False,
            pad=True,
            fill_value=self.fill_value,
        )
        self.st.attach_response(self.inv_dt)
        self.st.remove_sensitivity()
        return self.st[0]

check_case_incomplete_station_metadata()

The case where a a channel period start/stops during the selected time window. Some parts of the trace in the stream might not correspond to a valid period according to station metadata.

Source code in src/zizou/data.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def check_case_incomplete_station_metadata(self):
    """
    The case where a a channel period start/stops during the
    selected time window. Some parts of the trace in the stream
    might not correspond to a valid period according to station
    metadata.
    """
    if len(self.inv_dt) < 1:
        return False
    elif len(self.inv_dt[0][0]) != 1 or len(self.st) < 1:
        return False
    elif (self.startdate < self.inv_dt[0][0][0].start_date < self.enddate) or (
        self.startdate < self.inv_dt[0][0][0].end_date < self.enddate
    ):
        return True
    else:
        return False

check_case_multiple_channel_periods()

Checks if there are multiple channel operation periods in a given time period. If so, different response removal is required.

Source code in src/zizou/data.py
118
119
120
121
122
123
124
125
126
127
128
def check_case_multiple_channel_periods(self):
    """
    Checks if there are multiple channel operation periods in a given
    time period. If so, different response removal is required.
    """
    if len(self.inv_dt) < 1 or len(self.st) < 1:
        return False
    elif len(self.inv_dt[0][0]) > 1:
        return True
    else:
        return False

check_case_no_data()

Checks if either no stream, or no station metadata exist.

Source code in src/zizou/data.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def check_case_no_data(self):
    """
    Checks if either no stream, or no station metadata exist.
    """
    if len(self.st) < 1 or len(self.inv_dt) < 1:
        return True
    elif (
        len(self.inv_dt[0][0]) == 1
        and self.inv_dt[0][0][0].start_date == self.enddate
    ):
        return True
    elif (
        len(self.inv_dt[0][0]) == 1
        and self.inv_dt[0][0][0].end_date == self.startdate
    ):
        return True
    else:
        return False

output_case_multiple_channel_periods()

Stitches two response periods together to make a single merged trace.

Source code in src/zizou/data.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def output_case_multiple_channel_periods(self):
    """
    Stitches two response periods together to make a single merged
    trace.
    """
    if len(self.inv_dt[0][0]) > 2:
        raise NotImplementedError(
            "Current post processing function only valid for a "
            "maximum of two response periods."
        )
    st1 = self.st.copy()
    st1.merge(fill_value=self.fill_value)
    st2 = st1.copy()
    st1.trim(
        starttime=self.startdate,
        endtime=self.inv_dt[0][0][0].end_date,
        nearest_sample=False,
    )
    st2.trim(
        starttime=self.inv_dt[0][0][1].start_date,
        endtime=self.enddate,
        nearest_sample=False,
    )
    st1.attach_response(self.inv_dt)
    st1.remove_sensitivity()
    st2.attach_response(self.inv_dt)
    st2.remove_sensitivity()
    st1 += st2
    st1.merge(fill_value=self.fill_value)
    st1.trim(
        self.startdate,
        self.enddate,
        nearest_sample=False,
        pad=True,
        fill_value=self.fill_value,
    )
    return st1[0]

S3Waveforms

Bases: WaveformBaseclass

Source code in src/zizou/data.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
class S3Waveforms(WaveformBaseclass):
    def __init__(self, s3bucket, fdsn_urls, staxml_dir, debug=False, fill_value=np.nan):
        config = Config(signature_version=UNSIGNED)
        self.client = boto3.client("s3", config=config)
        self.s3_bucket = s3bucket
        self.fdsn_urls = fdsn_urls
        self.staxml_dir = staxml_dir
        self.debug = debug
        self.fill_value = fill_value

    def get_waveforms(self, net, site, loc, comp, start, end):
        """
        Get seismic waveforms from GeoNet's open data archive on AWS S3.
        """
        PATH_FORMAT = "waveforms/miniseed/{year}/{year}.{julday:03d}/{station}."
        PATH_FORMAT += "{network}/{year}.{julday:03d}.{station}."
        PATH_FORMAT += "{location}-{channel}.{network}.D"
        t_start = start
        t_end = UTCDateTime(
            year=t_start.year,
            julday=t_start.julday + 1,
            hour=0,
            minute=0,
            second=0,
            microsecond=0,
        )
        t_end = min(t_end, end)
        st = Stream()
        while t_start < end:
            s3_file_path = PATH_FORMAT.format(
                year=t_start.year,
                julday=t_start.julday,
                station=site,
                network=net,
                location=loc,
                channel=comp,
            )
            data = io.BytesIO()
            if self.debug:
                logger.debug("Requesting {}".format(s3_file_path))
            _ = self.client.download_fileobj(
                Bucket=self.s3_bucket, Key=s3_file_path, Fileobj=data
            )
            data.seek(0)
            _st = read(data, dtype="float64")
            data.close()
            _st.trim(t_start, t_end, nearest_sample=False)
            st += _st
            t_start = t_end
            t_end = t_start + timedelta(days=1)
            t_end = min(t_end, end)

        st.merge(fill_value=self.fill_value)
        inv = get_instrument_response(
            net, site, loc, staxml_dir=self.staxml_dir, fdsn_urls=self.fdsn_urls
        )
        inv_dt = inv.select(
            location=loc, channel=comp, starttime=t_start, endtime=t_end
        )

        # Post-process to remove sensitivity and account for gaps
        pp = PostProcess(
            st=st,
            inv=inv,
            inv_dt=inv_dt,
            startdate=start,
            enddate=end,
            loc=loc,
            comp=comp,
            fill_value=self.fill_value,
        )
        tr = pp.run_post_processing()
        return tr

get_waveforms(net, site, loc, comp, start, end)

Get seismic waveforms from GeoNet's open data archive on AWS S3.

Source code in src/zizou/data.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def get_waveforms(self, net, site, loc, comp, start, end):
    """
    Get seismic waveforms from GeoNet's open data archive on AWS S3.
    """
    PATH_FORMAT = "waveforms/miniseed/{year}/{year}.{julday:03d}/{station}."
    PATH_FORMAT += "{network}/{year}.{julday:03d}.{station}."
    PATH_FORMAT += "{location}-{channel}.{network}.D"
    t_start = start
    t_end = UTCDateTime(
        year=t_start.year,
        julday=t_start.julday + 1,
        hour=0,
        minute=0,
        second=0,
        microsecond=0,
    )
    t_end = min(t_end, end)
    st = Stream()
    while t_start < end:
        s3_file_path = PATH_FORMAT.format(
            year=t_start.year,
            julday=t_start.julday,
            station=site,
            network=net,
            location=loc,
            channel=comp,
        )
        data = io.BytesIO()
        if self.debug:
            logger.debug("Requesting {}".format(s3_file_path))
        _ = self.client.download_fileobj(
            Bucket=self.s3_bucket, Key=s3_file_path, Fileobj=data
        )
        data.seek(0)
        _st = read(data, dtype="float64")
        data.close()
        _st.trim(t_start, t_end, nearest_sample=False)
        st += _st
        t_start = t_end
        t_end = t_start + timedelta(days=1)
        t_end = min(t_end, end)

    st.merge(fill_value=self.fill_value)
    inv = get_instrument_response(
        net, site, loc, staxml_dir=self.staxml_dir, fdsn_urls=self.fdsn_urls
    )
    inv_dt = inv.select(
        location=loc, channel=comp, starttime=t_start, endtime=t_end
    )

    # Post-process to remove sensitivity and account for gaps
    pp = PostProcess(
        st=st,
        inv=inv,
        inv_dt=inv_dt,
        startdate=start,
        enddate=end,
        loc=loc,
        comp=comp,
        fill_value=self.fill_value,
    )
    tr = pp.run_post_processing()
    return tr

SDSWaveforms

Bases: WaveformBaseclass

Source code in src/zizou/data.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
class SDSWaveforms(WaveformBaseclass):
    def __init__(self, sds_dir, fdsn_urls, staxml_dir, fill_value=np.nan):
        """
        Get seismic waveforms from a local SDS archive.
        """
        self.client = SDS_Client(sds_dir)
        self.fdsn_urls = fdsn_urls
        self.staxml_dir = staxml_dir
        self.fill_value = fill_value

    def get_waveforms(self, net, site, loc, comp, startdate, enddate):
        """
        :param fill_value: Default value used to fill gaps and pad traces to
                           cover the whole time period defined by `startdate`
                           and `enddate`. If 'interpolate' is chosen, the
                           traces will not be padded.
        :type fill_value: int, float, 'interpolate', np.nan, or None
        """
        inv = get_instrument_response(
            net, site, loc, staxml_dir=self.staxml_dir, fdsn_urls=self.fdsn_urls
        )

        st = self.client.get_waveforms(
            net, site, loc, comp, startdate, enddate, dtype="float64"
        )

        inv_dt = inv.select(
            location=loc, channel=comp, starttime=startdate, endtime=enddate - 1
        )

        # Post-process to remove sensitivity and account for gaps
        pp = PostProcess(
            st=st,
            inv=inv,
            inv_dt=inv_dt,
            startdate=startdate,
            enddate=enddate,
            loc=loc,
            comp=comp,
            fill_value=self.fill_value,
        )
        tr = pp.run_post_processing()
        return tr

__init__(sds_dir, fdsn_urls, staxml_dir, fill_value=np.nan)

Get seismic waveforms from a local SDS archive.

Source code in src/zizou/data.py
300
301
302
303
304
305
306
307
def __init__(self, sds_dir, fdsn_urls, staxml_dir, fill_value=np.nan):
    """
    Get seismic waveforms from a local SDS archive.
    """
    self.client = SDS_Client(sds_dir)
    self.fdsn_urls = fdsn_urls
    self.staxml_dir = staxml_dir
    self.fill_value = fill_value

get_waveforms(net, site, loc, comp, startdate, enddate)

Parameters:
  • fill_value (int, float, 'interpolate', np.nan, | None) –

    Default value used to fill gaps and pad traces to cover the whole time period defined by startdate and enddate. If 'interpolate' is chosen, the traces will not be padded.

Source code in src/zizou/data.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def get_waveforms(self, net, site, loc, comp, startdate, enddate):
    """
    :param fill_value: Default value used to fill gaps and pad traces to
                       cover the whole time period defined by `startdate`
                       and `enddate`. If 'interpolate' is chosen, the
                       traces will not be padded.
    :type fill_value: int, float, 'interpolate', np.nan, or None
    """
    inv = get_instrument_response(
        net, site, loc, staxml_dir=self.staxml_dir, fdsn_urls=self.fdsn_urls
    )

    st = self.client.get_waveforms(
        net, site, loc, comp, startdate, enddate, dtype="float64"
    )

    inv_dt = inv.select(
        location=loc, channel=comp, starttime=startdate, endtime=enddate - 1
    )

    # Post-process to remove sensitivity and account for gaps
    pp = PostProcess(
        st=st,
        inv=inv,
        inv_dt=inv_dt,
        startdate=startdate,
        enddate=enddate,
        loc=loc,
        comp=comp,
        fill_value=self.fill_value,
    )
    tr = pp.run_post_processing()
    return tr

VolcanoMetadata

Read volcano/station/channel metadata

Source code in src/zizou/data.py
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
class VolcanoMetadata:
    """
    Read volcano/station/channel metadata
    """

    def __init__(self, configfile):
        try:
            with open(configfile, "r") as fh:
                cfg = yaml.safe_load(fh)
        except OSError:
            cfg = yaml.safe_load(configfile)
        self.data = cfg["metadata"]

    def get_available_volcanoes(self):
        return [vol["name"] for vol in self.data["volcano"]]

    def get_available_streams(self, volcano):
        """
        Get available sensor streams for a given volcano.

        Parameters
        ----------
        volcano : str
            Volcano name

        Returns
        -------
        list
            List of streams in the format 'NET.STA.LOC.CHAN'
        """
        _streams = []
        for _v in self.data["volcano"]:
            if _v["name"].lower() == volcano.lower():
                for _n in _v["network"]:
                    net_code = _n["net_code"]
                    for _s in _n["stations"]:
                        sta_code = _s["sta_code"]
                        location = _s["location"]
                        for _c in _s["channel"]:
                            chan_code = _c["code"]
                            _stream = ".".join(
                                (net_code, sta_code, location, chan_code)
                            )
                            _streams.append(_stream)
        return _streams

    def get_site_information(self, sitename):
        """
        Get site information for a given site. Note
        that if the same site name exists at multiple
        volcanoes, the first match will be returned.

        Parameters
        ----------
        sitename : str
            Three-letter site code.

        Returns
        -------
        dict
            Dictionary with site information as follows:
            {'volcano': str, 'site': str, 'channels': list,
             'latitude': float, 'longitude': float, 'starttime': datetime}
        """
        for _v in self.data["volcano"]:
            for _n in _v["network"]:
                for _s in _n["stations"]:
                    if _s["sta_code"] == sitename:
                        try:
                            starttime = datetime.strptime(
                                _s["starttime"], "%Y-%m-%dT%H:%M:%SZ"
                            )
                        except KeyError:
                            starttime = None

                        try:
                            channels = [c["code"] for c in _s["channel"]]
                        except KeyError:
                            channels = []
                        return dict(
                            volcano=_v["name"],
                            site=_s["sta_code"],
                            channels=channels,
                            latitude=float(_s["latitude"]),
                            longitude=float(_s["longitude"]),
                            starttime=starttime,
                        )
        return None

    def get_eruption_dates(self, volcano):
        """
        Get eruption dates for a given volcano.

        Parameters
        ----------
        volcano : str
            Volcano name.

        Returns
        -------
        list
            List of datetime.date objects representing eruption dates.
        """
        eruption_dates = []
        metadata = [v for v in self.data["volcano"] if v["name"] == volcano]
        er = metadata[0]["eruptions"]
        if er:
            for e in er:
                d, m, y = e.split("-")
                eruption_dates.append(date(int(y), int(m), int(d)))
        return eruption_dates

    def get_unrest_periods(self, volcano):
        """
        Get unrest periods for a given volcano.

        Parameters
        ----------
        volcano : str
            Volcano name.

        Returns
        -------
        list
            List of datetime.date objects representing start and end dates.
        """
        unrest_dates = []
        metadata = [v for v in self.data["volcano"] if v["name"] == volcano]
        ur = metadata[0]["unrest periods"]
        if ur:
            for u in ur:
                ds, ms, ys = u["starttime"].split("-")
                de, me, ye = u["endtime"].split("-")
                unrest_dates.append(
                    [date(int(ys), int(ms), int(ds)), date(int(ye), int(me), int(de))]
                )
        return unrest_dates

get_available_streams(volcano)

Get available sensor streams for a given volcano.

Parameters

volcano : str Volcano name

Returns

list List of streams in the format 'NET.STA.LOC.CHAN'

Source code in src/zizou/data.py
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
def get_available_streams(self, volcano):
    """
    Get available sensor streams for a given volcano.

    Parameters
    ----------
    volcano : str
        Volcano name

    Returns
    -------
    list
        List of streams in the format 'NET.STA.LOC.CHAN'
    """
    _streams = []
    for _v in self.data["volcano"]:
        if _v["name"].lower() == volcano.lower():
            for _n in _v["network"]:
                net_code = _n["net_code"]
                for _s in _n["stations"]:
                    sta_code = _s["sta_code"]
                    location = _s["location"]
                    for _c in _s["channel"]:
                        chan_code = _c["code"]
                        _stream = ".".join(
                            (net_code, sta_code, location, chan_code)
                        )
                        _streams.append(_stream)
    return _streams

get_eruption_dates(volcano)

Get eruption dates for a given volcano.

Parameters

volcano : str Volcano name.

Returns

list List of datetime.date objects representing eruption dates.

Source code in src/zizou/data.py
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
def get_eruption_dates(self, volcano):
    """
    Get eruption dates for a given volcano.

    Parameters
    ----------
    volcano : str
        Volcano name.

    Returns
    -------
    list
        List of datetime.date objects representing eruption dates.
    """
    eruption_dates = []
    metadata = [v for v in self.data["volcano"] if v["name"] == volcano]
    er = metadata[0]["eruptions"]
    if er:
        for e in er:
            d, m, y = e.split("-")
            eruption_dates.append(date(int(y), int(m), int(d)))
    return eruption_dates

get_site_information(sitename)

Get site information for a given site. Note that if the same site name exists at multiple volcanoes, the first match will be returned.

Parameters

sitename : str Three-letter site code.

Returns

dict Dictionary with site information as follows: {'volcano': str, 'site': str, 'channels': list, 'latitude': float, 'longitude': float, 'starttime': datetime}

Source code in src/zizou/data.py
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
def get_site_information(self, sitename):
    """
    Get site information for a given site. Note
    that if the same site name exists at multiple
    volcanoes, the first match will be returned.

    Parameters
    ----------
    sitename : str
        Three-letter site code.

    Returns
    -------
    dict
        Dictionary with site information as follows:
        {'volcano': str, 'site': str, 'channels': list,
         'latitude': float, 'longitude': float, 'starttime': datetime}
    """
    for _v in self.data["volcano"]:
        for _n in _v["network"]:
            for _s in _n["stations"]:
                if _s["sta_code"] == sitename:
                    try:
                        starttime = datetime.strptime(
                            _s["starttime"], "%Y-%m-%dT%H:%M:%SZ"
                        )
                    except KeyError:
                        starttime = None

                    try:
                        channels = [c["code"] for c in _s["channel"]]
                    except KeyError:
                        channels = []
                    return dict(
                        volcano=_v["name"],
                        site=_s["sta_code"],
                        channels=channels,
                        latitude=float(_s["latitude"]),
                        longitude=float(_s["longitude"]),
                        starttime=starttime,
                    )
    return None

get_unrest_periods(volcano)

Get unrest periods for a given volcano.

Parameters

volcano : str Volcano name.

Returns

list List of datetime.date objects representing start and end dates.

Source code in src/zizou/data.py
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
def get_unrest_periods(self, volcano):
    """
    Get unrest periods for a given volcano.

    Parameters
    ----------
    volcano : str
        Volcano name.

    Returns
    -------
    list
        List of datetime.date objects representing start and end dates.
    """
    unrest_dates = []
    metadata = [v for v in self.data["volcano"] if v["name"] == volcano]
    ur = metadata[0]["unrest periods"]
    if ur:
        for u in ur:
            ds, ms, ys = u["starttime"].split("-")
            de, me, ye = u["endtime"].split("-")
            unrest_dates.append(
                [date(int(ys), int(ms), int(ds)), date(int(ye), int(me), int(de))]
            )
    return unrest_dates

get_instrument_response(net, site, loc, staxml_dir='./instrument_response', fdsn_urls='https://service.geonet.org.nz')

Retrieve instrument response file in STATIONXML format.

Source code in src/zizou/data.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def get_instrument_response(
    net,
    site,
    loc,
    staxml_dir="./instrument_response",
    fdsn_urls=("https://service.geonet.org.nz"),
):
    """
    Retrieve instrument response file in STATIONXML format.
    """
    fn = "{:s}.xml".format(site)
    fn = os.path.join(staxml_dir, fn)
    try:
        inv = read_inventory(fn, format="STATIONXML")
    except FileNotFoundError:
        try:
            os.makedirs(staxml_dir)
        except FileExistsError:
            pass
        for url in fdsn_urls:
            try:
                client = FDSN_Client(base_url=url)
                inv = client.get_stations(
                    network=net,
                    location=loc,
                    station=site,
                    channel="*",
                    level="response",
                )
            except Exception as e:
                logger.info(e)
            else:
                break
        inv.write(fn, format="STATIONXML")
    return inv

tilde_request(domain, name, station, method, sensor, aspect, startdate, enddate)

Request data from the tilde API (https://tilde.geonet.org.nz/v3/api-docs/). See the tilde discovery tool for more information: https://tilde.geonet.org.nz/ui/data-discovery/

Parameters

domain : str The domain of the data (e.g. 'manualcollect') name : str The name of the data (e.g. 'plume-SO2-gasflux') station : str The station code (e.g. 'WI000') method : str The method of the data (e.g. 'contouring') sensor : str The sensor of the data (e.g. 'MC01') aspect : str The aspect of the data (e.g. 'nil') startdate : date The start date of the data enddate : date The end date of the data

Returns

pd.DataFrame A pandas dataframe with the requested data

Source code in src/zizou/data.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def tilde_request(
    domain: str,
    name: str,
    station: str,
    method: str,
    sensor: str,
    aspect: str,
    startdate: datetime,
    enddate: datetime,
) -> pd.DataFrame:
    """
    Request data from the tilde API (https://tilde.geonet.org.nz/v3/api-docs/).
    See the tilde discovery tool for more information:
    https://tilde.geonet.org.nz/ui/data-discovery/

    Parameters
    ----------
    domain : str
        The domain of the data (e.g. 'manualcollect')
    name : str
        The name of the data (e.g. 'plume-SO2-gasflux')
    station : str
        The station code (e.g. 'WI000')
    method : str
        The method of the data (e.g. 'contouring')
    sensor : str
        The sensor of the data (e.g. 'MC01')
    aspect : str
        The aspect of the data (e.g. 'nil')
    startdate : date
        The start date of the data
    enddate : date
        The end date of the data

    Returns
    -------
    pd.DataFrame
        A pandas dataframe with the requested data
    """
    tilde_url = "https://tilde.geonet.org.nz/v3/data"
    # split the request into historic and latest data
    get_historic = True
    get_latest = False
    startdate = startdate.astimezone(timezone.utc)
    enddate = enddate.astimezone(timezone.utc)
    _tstart = str(startdate.date())
    _tend = str(enddate.date())
    _today = datetime.now(timezone.utc).date()
    if enddate.date() > (_today - timedelta(days=29)):
        _tend = str((enddate.date() - timedelta(days=29)))
        get_latest = True
    if startdate.date() > (_today - timedelta(days=29)):
        get_historic = False

    assert get_historic or get_latest, "Check start and end dates."

    if get_latest:
        latest = f"{tilde_url}/{domain}/{station}/{name}/{sensor}/{method}/{aspect}/latest/30d"
        rval = requests.get(latest, headers={"Accept": "text/csv"})
        if rval.status_code != 200:
            msg = f"Download of {name} for {station} failed with status code {rval.status_code}"
            msg += f" and url {latest}"
            raise ValueError(msg)
        df_latest = pd.read_csv(
            io.StringIO(rval.text),
            index_col="timestamp",
            parse_dates=["timestamp"],
            usecols=["timestamp", "value", "error"],
            date_format="ISO8601",
        )
    if get_historic:
        historic = f"{tilde_url}/{domain}/{station}/{name}/{sensor}/{method}/{aspect}/"
        historic += f"{_tstart}/{_tend}"
        rval = requests.get(historic, headers={"Accept": "text/csv"})
        if rval.status_code != 200:
            msg = f"Download of {name} for {station} failed with status code {rval.status_code}"
            msg += f" and url {historic}"
            raise ValueError(msg)
        data = io.StringIO(rval.text)
        df_historic = pd.read_csv(
            data,
            index_col="timestamp",
            parse_dates=["timestamp"],
            usecols=["timestamp", "value", "error"],
            date_format="ISO8601",
        )
        if get_latest and len(df_latest) > 0:
            df = df_historic.combine_first(df_latest)
        else:
            df = df_historic
    else:
        df = df_latest
    df.rename(columns={"value": "obs", "error": "err"}, inplace=True)
    df.index.name = "dt"
    return df.loc[startdate:enddate]

Compute the Real-time Seismic-Amplitude Measurement (RSAM).

EnergyExplainedByRSAM

Bases: RSAM

The proportion of signal energy in the RSAM bandwidth, relative to a 0.5-10 Hz bandwidth.

Source code in src/zizou/rsam.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class EnergyExplainedByRSAM(RSAM):
    """
    The proportion of signal energy in the RSAM
    bandwidth, relative to a 0.5-10 Hz bandwidth.
    """

    features = ["rsam_energy_prop"]

    def __init__(
        self,
        interval=600.0,
        filtertype=None,
        filterfreq=(None, None),
        filtertype_wb="bandpass",
        filterfreq_wb=(0.5, 10.0),
        configfile=None,
    ):
        super(EnergyExplainedByRSAM, self).__init__(
            interval=interval,
            filtertype=filtertype,
            filterfreq=filterfreq,
            configfile=configfile,
        )
        self.filtertype_wb = filtertype_wb
        self.filterfreq_wb = filterfreq_wb

        # base params already read from configfile during superclass init
        # reopen and extract wideband filter params if any
        if configfile is not None:
            try:
                with open(configfile, "r") as fh:
                    c = yaml.safe_load(fh)
            except OSError:
                c = yaml.safe_load(configfile)
            cr = c.get("rsam")
            if cr is not None:
                self.filtertype_wb = cr.get("filtertype_wb", filtertype_wb)
                freq = cr.get("filterfreq_wb")
                if freq is not None:
                    self.filterfreq_wb = (freq["low"], freq["high"])

    def compute(self, trace):
        """
        :param trace: The seismic waveform data
        :type trace: :class:`obspy.Trace`
        """
        if len(trace) < 1:
            msg = "Trace is empty."
            raise ValueError(msg)

        logger.info(
            "Computing Energy explained by RSAM for {} between {} and {}.".format(
                ".".join(
                    (
                        trace.stats.network,
                        trace.stats.station,
                        trace.stats.location,
                        trace.stats.channel,
                    )
                ),
                trace.stats.starttime.isoformat(),
                trace.stats.endtime.isoformat(),
            )
        )

        tr = trace.copy()
        starttime = tr.stats.starttime
        endtime = tr.stats.endtime

        # handle gaps (i.e. NaNs)
        mdata = np.ma.masked_invalid(tr.data)
        tr.data = mdata

        st_rsam = tr.split()
        st_wb = st_rsam.copy()

        apply_freq_filter(st_rsam, self.filtertype, self.filterfreq)
        st_rsam.merge(fill_value=np.nan)
        st_rsam.trim(starttime, endtime, pad=True, fill_value=np.nan)
        tr_rsam = st_rsam[0]

        # Get trace in wide bandwidth
        apply_freq_filter(st_wb, self.filtertype_wb, self.filterfreq_wb)
        st_wb.merge(fill_value=np.nan)
        st_wb.trim(starttime, endtime, pad=True, fill_value=np.nan)
        tr_wb = st_wb[0]

        # initialise dataframe
        feature_data = []
        feature_idx = []

        # loop through data in interval blocks
        for t0, t1 in trace_window_times(tr_rsam, self.interval, min_len=0.8):
            energy_ratio = rsam_wideband_energy_ratio(
                tr_rsam.slice(t0, t1, nearest_sample=False).data,
                tr_wb.slice(t0, t1, nearest_sample=False).data,
            )
            feature_data.append(energy_ratio)
            feature_idx.append(t0.strftime("%Y-%m-%dT%H:%M:%S"))

        xdf = pd.DataFrame(
            data=feature_data,
            index=pd.DatetimeIndex(feature_idx, name="datetime"),
            columns=["rsam_energy_prop"],
            dtype=float,
        ).to_xarray()

        xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
        xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
        xdf.attrs["station"] = trace.stats.station

        self.feature = xdf
        self.trace = tr
        return self.feature

compute(trace)

Parameters:
  • trace (:class:`obspy.Trace`) –

    The seismic waveform data

Source code in src/zizou/rsam.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def compute(self, trace):
    """
    :param trace: The seismic waveform data
    :type trace: :class:`obspy.Trace`
    """
    if len(trace) < 1:
        msg = "Trace is empty."
        raise ValueError(msg)

    logger.info(
        "Computing Energy explained by RSAM for {} between {} and {}.".format(
            ".".join(
                (
                    trace.stats.network,
                    trace.stats.station,
                    trace.stats.location,
                    trace.stats.channel,
                )
            ),
            trace.stats.starttime.isoformat(),
            trace.stats.endtime.isoformat(),
        )
    )

    tr = trace.copy()
    starttime = tr.stats.starttime
    endtime = tr.stats.endtime

    # handle gaps (i.e. NaNs)
    mdata = np.ma.masked_invalid(tr.data)
    tr.data = mdata

    st_rsam = tr.split()
    st_wb = st_rsam.copy()

    apply_freq_filter(st_rsam, self.filtertype, self.filterfreq)
    st_rsam.merge(fill_value=np.nan)
    st_rsam.trim(starttime, endtime, pad=True, fill_value=np.nan)
    tr_rsam = st_rsam[0]

    # Get trace in wide bandwidth
    apply_freq_filter(st_wb, self.filtertype_wb, self.filterfreq_wb)
    st_wb.merge(fill_value=np.nan)
    st_wb.trim(starttime, endtime, pad=True, fill_value=np.nan)
    tr_wb = st_wb[0]

    # initialise dataframe
    feature_data = []
    feature_idx = []

    # loop through data in interval blocks
    for t0, t1 in trace_window_times(tr_rsam, self.interval, min_len=0.8):
        energy_ratio = rsam_wideband_energy_ratio(
            tr_rsam.slice(t0, t1, nearest_sample=False).data,
            tr_wb.slice(t0, t1, nearest_sample=False).data,
        )
        feature_data.append(energy_ratio)
        feature_idx.append(t0.strftime("%Y-%m-%dT%H:%M:%S"))

    xdf = pd.DataFrame(
        data=feature_data,
        index=pd.DatetimeIndex(feature_idx, name="datetime"),
        columns=["rsam_energy_prop"],
        dtype=float,
    ).to_xarray()

    xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
    xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
    xdf.attrs["station"] = trace.stats.station

    self.feature = xdf
    self.trace = tr
    return self.feature

RSAM

Bases: FeatureBaseClass

RSAM is the mean of the absolute value of a signal filtered between 2 and 5 Hz.

Parameters:
  • interval (int, optional, default: 600.0 ) –

    Length of the time interval in seconds over which to compute RSAM.

  • filtertype (str, optional, default: 'bandpass' ) –

    The type of filtering that is applied before computing RSAM. Can be either band-pass ('bp'), high-pass ('hp'), or low-pass ('lp').

  • filterfreq (tuple, optional, default: (2, 5) ) –

    The low and high cutoff frequency. If filtertype is highpass or lowpass, only the first or last value is used, respectively.

Source code in src/zizou/rsam.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class RSAM(FeatureBaseClass):
    """
    RSAM is the mean of the absolute value of a signal filtered
    between 2 and 5 Hz.

    :param interval: Length of the time interval in seconds over
                     which to compute RSAM.
    :type interval: int, optional
    :param filtertype: The type of filtering that is applied before
                       computing RSAM. Can be either band-pass ('bp'),
                       high-pass ('hp'), or low-pass ('lp').
    :type filtertype: str, optional
    :param filterfreq: The low and high cutoff frequency. If filtertype is
                       highpass or lowpass, only the first or last value is
                       used, respectively.

    :type filterfreq: tuple, optional
    """

    features = ["rsam"]

    def __init__(
        self,
        interval=600.0,
        filtertype="bandpass",
        filterfreq=(2, 5),
        configfile=None,
        reindex=True,
    ):
        super(RSAM, self).__init__()
        self.interval = interval
        self.filtertype = filtertype
        self.filterfreq = filterfreq
        self.feature = None
        self.reindex = reindex

        if configfile is not None:
            try:
                with open(configfile, "r") as fh:
                    c = yaml.safe_load(fh)
            except OSError:
                c = yaml.safe_load(configfile)
            self.interval = c["default"].get("interval", interval)
            cr = c.get("rsam")
            if cr is not None:
                self.filtertype = cr.get("filtertype", filtertype)
                freq = cr.get("filterfreq")
                if freq is not None:
                    self.filterfreq = (freq["low"], freq["high"])
                self.reindex = cr.get("reindex", reindex)

    def compute(self, trace):
        """
        :param trace: The seismic waveform data
        :type trace: :class:`obspy.Trace`
        """
        if len(trace) < 1:
            raise ValueError("Trace is empty.")

        logger.info(
            "Computing RSAM for {} between {} and {}.".format(
                ".".join(
                    (
                        trace.stats.network,
                        trace.stats.station,
                        trace.stats.location,
                        trace.stats.channel,
                    )
                ),
                trace.stats.starttime.isoformat(),
                trace.stats.endtime.isoformat(),
            )
        )

        tr = trace.copy()
        starttime = tr.stats.starttime
        endtime = tr.stats.endtime

        # handle gaps (i.e. NaNs)
        mdata = np.ma.masked_invalid(tr.data)
        tr.data = mdata
        st = tr.split()

        apply_freq_filter(st, self.filtertype, self.filterfreq)
        st.merge(fill_value=np.nan)
        st.trim(starttime, endtime, pad=True, fill_value=np.nan)
        tr = st[0]

        # initialise dataframe
        feature_data = []
        feature_idx = []

        # loop through data in interval blocks
        for trint in trace_window_data(tr, self.interval, min_len=0.8):
            absolute = np.absolute(trint.data)  # absolute value
            trint.data = absolute  # assign back to trace
            mean = trint.data.mean()
            # convert to nanometres so dealing with whole numbers
            feature_data.append(mean / 1e-9)
            feature_idx.append(trint.stats.starttime.strftime("%Y-%m-%dT%H:%M:%S"))

        xda = xr.DataArray(
            feature_data, coords=[pd.DatetimeIndex(feature_idx)], dims="datetime"
        )
        xdf = xr.Dataset({"rsam": xda})
        xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
        xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
        xdf.attrs["station"] = trace.stats.station
        if self.reindex:
            starttime = UTCDateTime(str(xda.datetime.data[0]))
            starttime = round_time(starttime, self.interval)
            endtime = UTCDateTime(str(xda.datetime.data[-1]))
            endtime = round_time(endtime, self.interval)
            new_index = pd.date_range(
                starttime.datetime, endtime.datetime, freq="%dS" % int(self.interval)
            )
            xdf = xdf.reindex(dict(datetime=new_index), method="nearest")
        self.feature = xdf
        return self.feature

compute(trace)

Parameters:
  • trace (:class:`obspy.Trace`) –

    The seismic waveform data

Source code in src/zizou/rsam.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def compute(self, trace):
    """
    :param trace: The seismic waveform data
    :type trace: :class:`obspy.Trace`
    """
    if len(trace) < 1:
        raise ValueError("Trace is empty.")

    logger.info(
        "Computing RSAM for {} between {} and {}.".format(
            ".".join(
                (
                    trace.stats.network,
                    trace.stats.station,
                    trace.stats.location,
                    trace.stats.channel,
                )
            ),
            trace.stats.starttime.isoformat(),
            trace.stats.endtime.isoformat(),
        )
    )

    tr = trace.copy()
    starttime = tr.stats.starttime
    endtime = tr.stats.endtime

    # handle gaps (i.e. NaNs)
    mdata = np.ma.masked_invalid(tr.data)
    tr.data = mdata
    st = tr.split()

    apply_freq_filter(st, self.filtertype, self.filterfreq)
    st.merge(fill_value=np.nan)
    st.trim(starttime, endtime, pad=True, fill_value=np.nan)
    tr = st[0]

    # initialise dataframe
    feature_data = []
    feature_idx = []

    # loop through data in interval blocks
    for trint in trace_window_data(tr, self.interval, min_len=0.8):
        absolute = np.absolute(trint.data)  # absolute value
        trint.data = absolute  # assign back to trace
        mean = trint.data.mean()
        # convert to nanometres so dealing with whole numbers
        feature_data.append(mean / 1e-9)
        feature_idx.append(trint.stats.starttime.strftime("%Y-%m-%dT%H:%M:%S"))

    xda = xr.DataArray(
        feature_data, coords=[pd.DatetimeIndex(feature_idx)], dims="datetime"
    )
    xdf = xr.Dataset({"rsam": xda})
    xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
    xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
    xdf.attrs["station"] = trace.stats.station
    if self.reindex:
        starttime = UTCDateTime(str(xda.datetime.data[0]))
        starttime = round_time(starttime, self.interval)
        endtime = UTCDateTime(str(xda.datetime.data[-1]))
        endtime = round_time(endtime, self.interval)
        new_index = pd.date_range(
            starttime.datetime, endtime.datetime, freq="%dS" % int(self.interval)
        )
        xdf = xdf.reindex(dict(datetime=new_index), method="nearest")
    self.feature = xdf
    return self.feature

SSAM

Bases: FeatureBaseClass

SSAM is basically the same as computing a spectrogram. Data are split into interval length segments and the spectrum of each section is computed. Before computing the spectrum, the mean is removed and a hanning window is applied to each segment. The percentage of overlap of each segment can be specified with per_lap and Welch-type smoothing can be applied.

Parameters:
  • interval (int, default: 10 ) –

    Segment length in seconds.

  • per_lap (float | int, default: 0 ) –

    If the value is less than 1 it is treated as the percentage of segment overlap; else it is the step size in sample points

  • scale_by_freq (boolean, default: True ) –

    Divide by the sampling frequency so that density function has units of dB/Hz and can be integrated by the frequency values.

  • smooth (int, default: None ) –

    The number of windows over which to average the spectrogram.

  • frequencies (:class:`numpy.ndarray`, default: linspace(0, 25, 251) ) –

    Frequencies at which to return the spectrogram. This uses linear interpolation to compute the spectrogram at the given frequencies from the original spectrogram.

  • timestamp (str, default: 'center' ) –

    Can be either 'center' or 'start'. If 'center', the timestamp of each spectrum is the center of the windows from which the spectrum was computed. If 'start' it is the timestamp of the first sample of the first window.

  • resample_int ((str, str), default: None ) –

    Interval to upsample the dataset using linear interpolation over the first interval and then downsample using the mean over the second interval.

  • configfile (str | dict >>> from zizou.ssam import SSAM, test_signal >>> import numpy as np >>> tr = test_signal(gaps=True) >>> s = SSAM(interval=60, per_lap=.9, smooth=10, frequencies=np.arange(0, 25.1, .1)), default: None ) –

    Configuration as an .ini file or a dictionary.

Source code in src/zizou/ssam.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
class SSAM(FeatureBaseClass):
    """
    SSAM is basically the same as computing a spectrogram. Data are split into
    interval length segments and the spectrum of each section is
    computed. Before computing the spectrum, the mean is removed and
    a hanning window is applied to each segment. The percentage of
    overlap of each segment can be specified with per_lap and Welch-type
    smoothing can be applied.

    :param interval: Segment length in seconds.
    :type interval: int
    :param per_lap: If the value is less than 1 it is treated as the percentage
                    of segment overlap; else it is the step size in sample points
    :type per_lap: float or int
    :param scale_by_freq: Divide by the sampling frequency so that density
                          function has units of dB/Hz and can be integrated
                          by the frequency values.
    :type scale_by_freq: boolean
    :param smooth: The number of windows over which to
                   average the spectrogram.
    :type smooth: int
    :param frequencies: Frequencies at which to return the spectrogram. This
                        uses linear interpolation to compute the spectrogram
                        at the given frequencies from the original spectrogram.
    :type frequencies: :class:`numpy.ndarray`
    :param timestamp: Can be either 'center' or 'start'. If 'center', the
                      timestamp of each spectrum is the center of the windows
                      from which the spectrum was computed. If 'start' it is
                      the timestamp of the first sample of the first window.
    :type timestamp: str
    :param resample_int: Interval to upsample the dataset using linear
                         interpolation over the first interval and then
                         downsample using the mean over the second interval.
    :type resample_int: (str, str)
    :param configfile: Configuration as an .ini file or a dictionary.
    :type configfile: str or dict

    >>> from zizou.ssam import SSAM, test_signal
    >>> import numpy as np
    >>> tr = test_signal(gaps=True)
    >>> s = SSAM(interval=60, per_lap=.9, smooth=10,
                 frequencies=np.arange(0, 25.1, .1))

    """

    features = ["ssam", "filterbank", "sonogram"]

    def __init__(
        self,
        interval=10,
        per_lap=0,
        scale_by_freq=True,
        smooth=None,
        frequencies=np.linspace(0, 25, 251),
        fbfilters=16,
        nhob=8,
        timestamp="center",
        resample_int=None,
        configfile=None,
    ):
        super(SSAM, self).__init__()
        self.interval = interval
        self.per_lap = per_lap
        self.scale_by_freq = scale_by_freq
        self.smooth = smooth
        self.frequencies = frequencies
        self.timestamp = timestamp
        self.resample_int = resample_int
        self.fbfilters = fbfilters
        self.nhob = nhob

        self.feature = None

        if configfile is not None:
            try:
                with open(configfile, "r") as fh:
                    c = yaml.safe_load(fh)
            except OSError:
                c = yaml.safe_load(configfile)
            cs = c.get("ssam")
            if cs is not None:
                self.interval = cs.get("interval", interval)
                self.per_lap = cs.get("per_lap", per_lap)
                self.scale_by_freq = cs.get("scale_by_freq", scale_by_freq)
                self.smooth = cs.get("smooth", smooth)
                self.timestamp = cs.get("timestamp", timestamp)
                self.nhob = cs.get("nhob", nhob)
                freqs = cs.get("frequencies", frequencies)
                if freqs is not None:
                    self.frequencies = np.arange(
                        freqs["start"], freqs["end"], freqs["step"]
                    )

                self.fbfilters = cs.get("fbfilters", fbfilters)
                ri = cs.get("resample_int", resample_int)
                if ri is not None:
                    self.resample_int = (ri["upsample"], ri["downsample"])

    @staticmethod
    def _nearest_pow_2(x):
        """
        Find power of two nearest to x

        >>> _nearest_pow_2(3)
        2.0
        >>> _nearest_pow_2(15)
        16.0

        :param x: Number
        :type x: float
        :return: Nearest power of 2 to x
        :rtype: Int
        """
        a = M.pow(2, M.ceil(np.log2(x)))
        b = M.pow(2, M.floor(np.log2(x)))
        if abs(a - x) < abs(b - x):
            return a
        else:
            return b

    def _average(self, spec, t):
        """
        Average spectrogram over nwin windows to produce a smoother
        spectrogram. If used with overlapping windows it is known
        as the Welch algorithm

        :param spec: Input spectrogram
        :type spec: :class:`numpy.ndarray`
        :param t: Time axis of spectrogram.
        :type t: :class:`numpy.ndarray`
        :param nwin: Number of windows over which to average
        :type nwin: int
        :return: smoothed_spec, new-time
        :rtype: tuple of :class:`~numpy.ndarray`
        """
        start = 0
        end = 0
        ntimes = spec.shape[1]
        av_mat = []
        nt = []
        while end < ntimes:
            av_arr = np.zeros(spec.shape[1])
            end = start + self.smooth
            if end > ntimes:
                end = ntimes
            av_arr[start:end] = 1.0
            av_mat.append(av_arr)
            if self.timestamp == "center":
                nt.append(t[start : end : self.smooth - 1].mean())
            elif self.timestamp == "start":
                nt.append(t[start])
            start = end

        av_mat = np.array(av_mat).T
        nt = np.array(nt)
        # find all columns with NaNs
        colidx = np.unique(np.where(np.isnan(spec))[1])
        # set corresponding rows in averaging matrix to NaN
        av_mat[colidx, :] = 0
        factor = np.sum(av_mat, axis=0)
        av_mat = np.divide(av_mat, factor, where=av_mat > 0)
        mspec = np.ma.masked_invalid(spec)
        mav_mat = np.ma.masked_invalid(av_mat)
        nspec = np.ma.dot(mspec, mav_mat)
        return nspec.data, nt

    @staticmethod
    def _resample(xdf, resample_int=("1min", "10min")):
        """
        Resample spectrograms to the desired sampling interval using xarray
        functionality.

        :param xdf: An xarray dataset as returned by :class:`zizou.ssam.SSAM`
        :type xdf: :class:`xarray.Dataset`
        :param resample_int: Intervals to upsample the dataset using linear
                             interpolation over the first interval and then
                             downsample using the mean over the second
                             interval.
        :type resample_int: (str, str)
        :return: Stacked dataset.
        :rtype: :class:`~xarray.Dataset`
        """
        upsample_int, downsample_int = resample_int
        attrs = xdf.attrs
        if upsample_int is not None:
            xdf = xdf.resample(datetime=upsample_int).interpolate("linear")
        if downsample_int is not None:
            xdf = xdf.resample(datetime=downsample_int).mean()
        # Resample transposes the dimensions so we need to revert this
        xdf["ssam"] = xdf.ssam.transpose("frequency", "datetime")
        xdf["filterbank"] = xdf.filterbank.transpose("fbfrequency", "datetime")
        xdf["sonogram"] = xdf.sonogram.transpose("sonofrequency", "datetime")
        xdf.attrs = attrs
        return xdf

    @staticmethod
    def filterbank(nfilters, nfft, fs):
        """
        Computes the integral in an array of bandpass filters that
        separates the input signal into multiple components.
        Each component consists of a single frequency sub-band
        of the original signal.

        :param nfilters: Number of filters in the filter bank.
        :type nfitlers: int
        :param nfft: Number of samples
        :type nfft: int
        :param fs: Sampling frquency
        :type fs: float

        """
        fl = fs / nfft
        fh = fs / 2
        freqs = np.logspace(np.log10(fl), np.log10(fh), nfilters + 2)
        bins = np.floor((nfft + 1) * freqs / fs)
        fbank = np.zeros((nfilters, int(np.floor(nfft / 2 + 1))))
        for m in range(1, nfilters + 1):
            f_m_minus = int(bins[m - 1])  # left
            f_m = int(bins[m])  # center
            f_m_plus = int(bins[m + 1])  # right

            for k in range(f_m_minus, f_m):
                fbank[m - 1, k] = (k - bins[m - 1]) / (bins[m] - bins[m - 1])
            for k in range(f_m, f_m_plus):
                fbank[m - 1, k] = (bins[m + 1] - k) / (bins[m + 1] - bins[m])

        return freqs[1:-1], fbank

    @staticmethod
    def sonogram(data, fs, fc1, nofb=8):
        """
        Computes the sonogram of the given data which can be windowed or not.
        The sonogram is determined by the power in half octave bands of the given
        data.

        If data are windowed the analytic signal and the envelope of each window
        is returned.

        :param data: Data to make envelope of.
        :type data: :class:`~numpy.ndarray`
        :param fs: Sampling frequency in Hz.
        :param fc1: Center frequency of lowest half octave band.
        :param nofb: Number of half octave bands.
        :param no_win: Number of data windows.
        :return: Central frequencies, Half octave bands.
        """
        fc = float(fc1) * 1.5 ** np.arange(nofb)
        fmin = fc / np.sqrt(5.0 / 3.0)
        fmax = fc * np.sqrt(5.0 / 3.0)
        z_tot = np.sum(np.abs(data) ** 2, axis=0)
        nfft = data.shape[0]
        start = np.around(fmin * nfft / fs, 0).astype(np.int_) - 1
        end = np.around(fmax * nfft / fs, 0).astype(np.int_)
        z = np.zeros([nofb, data.shape[1]])
        for i in range(nofb):
            z[i, :] = np.sum(np.abs(data[start[i] : end[i], :]) ** 2, axis=0)
        hob = np.log(z / z_tot[np.newaxis, :])
        return fc, hob

    def compute(self, trace):
        """
        Compute SSAM for the given trace.

        :param trace: The seismic waveform data.
        :type trace: :class:`obspy.Trace`
        :return: spectrogram, frequencies, times
        :rtype: tuple
        """
        if len(trace) < 1:
            msg = "Trace is empty."
            raise ValueError(msg)

        logger.info(
            "Computing spectrograms for {} between {} and {}.".format(
                ".".join(
                    (
                        trace.stats.network,
                        trace.stats.station,
                        trace.stats.location,
                        trace.stats.channel,
                    )
                ),
                trace.stats.starttime.isoformat(),
                trace.stats.endtime.isoformat(),
            )
        )

        tr = trace.copy()
        x = tr.data
        npts = tr.stats.npts
        Fs = tr.stats.sampling_rate
        interval = min(self.interval * Fs, npts)

        NFFT = int(self._nearest_pow_2(interval))
        if NFFT > npts:
            NFFT = int(self._nearest_pow_2(npts / 8.0))

        if self.per_lap < 1.0:
            noverlap = int(NFFT * float(self.per_lap))
        else:
            noverlap = NFFT - self.per_lap

        scaling_factor = 2.0
        result, windowVals = window_array(x, NFFT, noverlap)
        result = np.fft.rfft(result, n=NFFT, axis=0)
        freqs = np.fft.rfftfreq(NFFT, 1 / Fs)

        result = np.conj(result) * result
        # Also include scaling factors for one-sided densities and dividing by
        # the sampling frequency, if desired. Scale everything, except the DC
        # component and the NFFT/2 component:
        slc = slice(1, -1, None)

        result[slc] *= scaling_factor

        # MATLAB divides by the sampling frequency so that density function
        # has units of dB/Hz and can be integrated by the plotted frequency
        # values. Perform the same scaling here.
        if self.scale_by_freq:
            result /= Fs
            # Scale the spectrum by the norm of the window to compensate for
            # windowing loss; see Bendat & Piersol Sec 11.5.2.
            result /= (np.abs(windowVals) ** 2).sum()
        else:
            # In this case, preserve power in the segment, not amplitude
            result /= np.abs(windowVals).sum() ** 2

        result = result.real
        steplength = NFFT - noverlap
        steps = np.arange(result.shape[1]) * steplength
        if self.timestamp == "center":
            t = (steps + NFFT / 2) / Fs
        elif self.timestamp == "start":
            t = steps / Fs
        # smooth the spectrogram using the Welch algorithm
        if self.smooth is not None:
            result, t = self._average(result, t)

        features = {}
        dates = [(trace.stats.starttime + _t).datetime for _t in t]

        fb_freqs, fb = self.filterbank(self.fbfilters, NFFT, Fs)
        result_fb = np.dot(fb, result)
        filterbank = xr.DataArray(
            result_fb,
            coords=[fb_freqs, pd.to_datetime(dates)],
            dims=["fbfrequency", "datetime"],
        )
        features["filterbank"] = filterbank

        sono_freq, sono_amp = self.sonogram(result, Fs, 0.68, self.nhob)
        sono = xr.DataArray(
            sono_amp,
            coords=[sono_freq, pd.to_datetime(dates)],
            dims=["sonofrequency", "datetime"],
        )
        features["sonogram"] = sono

        # Linear interpolation onto a new set of frequencies
        def myinterp(a):
            return np.interp(self.frequencies, freqs, a)

        result = np.apply_along_axis(myinterp, 0, result)
        freqs = self.frequencies

        ssam = xr.DataArray(
            result,
            coords=[freqs, pd.to_datetime(dates)],
            dims=["frequency", "datetime"],
        )
        features["ssam"] = ssam
        self.feature = xr.Dataset(features)
        self.feature.attrs["starttime"] = trace.stats.starttime.isoformat()
        self.feature.attrs["endtime"] = trace.stats.endtime.isoformat()
        self.feature.attrs["station"] = trace.stats.station

        if self.resample_int is not None:
            self.feature = self._resample(self.feature, self.resample_int)
        return self.feature

compute(trace)

Compute SSAM for the given trace.

Parameters:
  • trace (:class:`obspy.Trace`) –

    The seismic waveform data.

Returns:
  • tuple

    spectrogram, frequencies, times

Source code in src/zizou/ssam.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def compute(self, trace):
    """
    Compute SSAM for the given trace.

    :param trace: The seismic waveform data.
    :type trace: :class:`obspy.Trace`
    :return: spectrogram, frequencies, times
    :rtype: tuple
    """
    if len(trace) < 1:
        msg = "Trace is empty."
        raise ValueError(msg)

    logger.info(
        "Computing spectrograms for {} between {} and {}.".format(
            ".".join(
                (
                    trace.stats.network,
                    trace.stats.station,
                    trace.stats.location,
                    trace.stats.channel,
                )
            ),
            trace.stats.starttime.isoformat(),
            trace.stats.endtime.isoformat(),
        )
    )

    tr = trace.copy()
    x = tr.data
    npts = tr.stats.npts
    Fs = tr.stats.sampling_rate
    interval = min(self.interval * Fs, npts)

    NFFT = int(self._nearest_pow_2(interval))
    if NFFT > npts:
        NFFT = int(self._nearest_pow_2(npts / 8.0))

    if self.per_lap < 1.0:
        noverlap = int(NFFT * float(self.per_lap))
    else:
        noverlap = NFFT - self.per_lap

    scaling_factor = 2.0
    result, windowVals = window_array(x, NFFT, noverlap)
    result = np.fft.rfft(result, n=NFFT, axis=0)
    freqs = np.fft.rfftfreq(NFFT, 1 / Fs)

    result = np.conj(result) * result
    # Also include scaling factors for one-sided densities and dividing by
    # the sampling frequency, if desired. Scale everything, except the DC
    # component and the NFFT/2 component:
    slc = slice(1, -1, None)

    result[slc] *= scaling_factor

    # MATLAB divides by the sampling frequency so that density function
    # has units of dB/Hz and can be integrated by the plotted frequency
    # values. Perform the same scaling here.
    if self.scale_by_freq:
        result /= Fs
        # Scale the spectrum by the norm of the window to compensate for
        # windowing loss; see Bendat & Piersol Sec 11.5.2.
        result /= (np.abs(windowVals) ** 2).sum()
    else:
        # In this case, preserve power in the segment, not amplitude
        result /= np.abs(windowVals).sum() ** 2

    result = result.real
    steplength = NFFT - noverlap
    steps = np.arange(result.shape[1]) * steplength
    if self.timestamp == "center":
        t = (steps + NFFT / 2) / Fs
    elif self.timestamp == "start":
        t = steps / Fs
    # smooth the spectrogram using the Welch algorithm
    if self.smooth is not None:
        result, t = self._average(result, t)

    features = {}
    dates = [(trace.stats.starttime + _t).datetime for _t in t]

    fb_freqs, fb = self.filterbank(self.fbfilters, NFFT, Fs)
    result_fb = np.dot(fb, result)
    filterbank = xr.DataArray(
        result_fb,
        coords=[fb_freqs, pd.to_datetime(dates)],
        dims=["fbfrequency", "datetime"],
    )
    features["filterbank"] = filterbank

    sono_freq, sono_amp = self.sonogram(result, Fs, 0.68, self.nhob)
    sono = xr.DataArray(
        sono_amp,
        coords=[sono_freq, pd.to_datetime(dates)],
        dims=["sonofrequency", "datetime"],
    )
    features["sonogram"] = sono

    # Linear interpolation onto a new set of frequencies
    def myinterp(a):
        return np.interp(self.frequencies, freqs, a)

    result = np.apply_along_axis(myinterp, 0, result)
    freqs = self.frequencies

    ssam = xr.DataArray(
        result,
        coords=[freqs, pd.to_datetime(dates)],
        dims=["frequency", "datetime"],
    )
    features["ssam"] = ssam
    self.feature = xr.Dataset(features)
    self.feature.attrs["starttime"] = trace.stats.starttime.isoformat()
    self.feature.attrs["endtime"] = trace.stats.endtime.isoformat()
    self.feature.attrs["station"] = trace.stats.station

    if self.resample_int is not None:
        self.feature = self._resample(self.feature, self.resample_int)
    return self.feature

filterbank(nfilters, nfft, fs) staticmethod

Computes the integral in an array of bandpass filters that separates the input signal into multiple components. Each component consists of a single frequency sub-band of the original signal.

Parameters:
  • nfilters

    Number of filters in the filter bank.

  • nfft (int) –

    Number of samples

  • fs (float) –

    Sampling frquency

Source code in src/zizou/ssam.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@staticmethod
def filterbank(nfilters, nfft, fs):
    """
    Computes the integral in an array of bandpass filters that
    separates the input signal into multiple components.
    Each component consists of a single frequency sub-band
    of the original signal.

    :param nfilters: Number of filters in the filter bank.
    :type nfitlers: int
    :param nfft: Number of samples
    :type nfft: int
    :param fs: Sampling frquency
    :type fs: float

    """
    fl = fs / nfft
    fh = fs / 2
    freqs = np.logspace(np.log10(fl), np.log10(fh), nfilters + 2)
    bins = np.floor((nfft + 1) * freqs / fs)
    fbank = np.zeros((nfilters, int(np.floor(nfft / 2 + 1))))
    for m in range(1, nfilters + 1):
        f_m_minus = int(bins[m - 1])  # left
        f_m = int(bins[m])  # center
        f_m_plus = int(bins[m + 1])  # right

        for k in range(f_m_minus, f_m):
            fbank[m - 1, k] = (k - bins[m - 1]) / (bins[m] - bins[m - 1])
        for k in range(f_m, f_m_plus):
            fbank[m - 1, k] = (bins[m + 1] - k) / (bins[m + 1] - bins[m])

    return freqs[1:-1], fbank

sonogram(data, fs, fc1, nofb=8) staticmethod

Computes the sonogram of the given data which can be windowed or not. The sonogram is determined by the power in half octave bands of the given data.

If data are windowed the analytic signal and the envelope of each window is returned.

Parameters:
  • data (:class:`~numpy.ndarray`) –

    Data to make envelope of.

  • fs

    Sampling frequency in Hz.

  • fc1

    Center frequency of lowest half octave band.

  • nofb

    Number of half octave bands.

  • no_win

    Number of data windows.

Returns:
  • Central frequencies, Half octave bands.

Source code in src/zizou/ssam.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
@staticmethod
def sonogram(data, fs, fc1, nofb=8):
    """
    Computes the sonogram of the given data which can be windowed or not.
    The sonogram is determined by the power in half octave bands of the given
    data.

    If data are windowed the analytic signal and the envelope of each window
    is returned.

    :param data: Data to make envelope of.
    :type data: :class:`~numpy.ndarray`
    :param fs: Sampling frequency in Hz.
    :param fc1: Center frequency of lowest half octave band.
    :param nofb: Number of half octave bands.
    :param no_win: Number of data windows.
    :return: Central frequencies, Half octave bands.
    """
    fc = float(fc1) * 1.5 ** np.arange(nofb)
    fmin = fc / np.sqrt(5.0 / 3.0)
    fmax = fc * np.sqrt(5.0 / 3.0)
    z_tot = np.sum(np.abs(data) ** 2, axis=0)
    nfft = data.shape[0]
    start = np.around(fmin * nfft / fs, 0).astype(np.int_) - 1
    end = np.around(fmax * nfft / fs, 0).astype(np.int_)
    z = np.zeros([nofb, data.shape[1]])
    for i in range(nofb):
        z[i, :] = np.sum(np.abs(data[start[i] : end[i], :]) ** 2, axis=0)
    hob = np.log(z / z_tot[np.newaxis, :])
    return fc, hob

Compute the displacement seismic amplitude ratio (DSAR).

DSAR

Bases: FeatureBaseClass

DSAR is the ratio of low frequency to high frequency amplitudes, as suggested in: https://doi.org/10.1130/G46107.1

The time-domain displacement signal is bandpass filtered in two frequency bands, 4.5 to 8 Hz and 8 to 16 Hz, and DSAR is defined as the median ratio of the absolute signals. The quantity is interpreted in the publication as a change in seismic attenuation in the vicinity of the seismic station.

Source code in src/zizou/dsar.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class DSAR(FeatureBaseClass):
    """
    DSAR is the ratio of
    low frequency to high frequency amplitudes, as suggested in:
    https://doi.org/10.1130/G46107.1

    The time-domain displacement signal is bandpass filtered in two
    frequency bands, 4.5 to 8 Hz and 8 to 16 Hz, and DSAR is defined
    as the median ratio of the absolute signals. The quantity is
    interpreted in the publication as a change in seismic attenuation
    in the vicinity of the seismic station.
    """

    features = ["dsar"]

    def __init__(
        self,
        interval=600.0,
        filtertype="bandpass",
        lowerfreqband=(4.5, 8),
        higherfreqband=(8, 16),
        configfile=None,
        reindex=True,
    ):
        super(DSAR, self).__init__()
        self.interval = float(interval)
        self.filtertype = filtertype
        self.lowerfreqband = lowerfreqband
        self.higherfreqband = higherfreqband
        self.reindex = reindex

        if configfile is not None:
            try:
                with open(configfile, "r") as fh:
                    c = yaml.safe_load(fh)
            except OSError:
                c = yaml.safe_load(configfile)
            self.interval = c["default"].get("interval", interval)
            cd = c.get("dsar")
            if cd is not None:
                self.reindex = cd.get("reindex", reindex)
                self.filtertype = cd.get("filtertype", filtertype)
                lfreq = cd.get("lowerfreqband")
                if lfreq is not None:
                    self.lowerfreqband = (lfreq["low"], lfreq["high"])
                hfreq = cd.get("higherfreqband")
                if hfreq is not None:
                    self.higherfreqband = (hfreq["low"], hfreq["high"])

    def compute(self, trace):
        """
        Compute the displacement seismic amplitude ratio (DSAR).

        # TODO:
        Publication is quite vague and doesn't have enough detail
        to understand the method. Unclear whether median is every
        10 minutes or every day. This affects the bootstrap calc-
        ulation too, but this might not be necessary..

        :param stream: The SLCN code for the seismic stream
        :type stream: str
        :param startdate: Datetime of the first sample.
        :type startdate: :class:`obspy.UTCDateTime`
        :param enddate: Datetime of the last sample.
        :type enddate: :class:`obspy.UTCDateTime`
        :param interval: Length of the time interval in seconds over
                         which to compute DSAR (default 10 mins)
        :type interval: int, optional
        """
        if len(trace) < 1:
            raise ValueError("Trace is empty.")

        logger.info(
            "Computing DSAR for {} between {} and {}.".format(
                ".".join(
                    (
                        trace.stats.network,
                        trace.stats.station,
                        trace.stats.location,
                        trace.stats.channel,
                    )
                ),
                trace.stats.starttime.isoformat(),
                trace.stats.endtime.isoformat(),
            )
        )

        tr_lf = trace.copy()
        # handle gaps (i.e. NaNs)
        mdata = np.ma.masked_invalid(tr_lf.data)
        tr_lf.data = mdata
        # Get second trace for alternative filtering
        tr_hf = tr_lf.copy()

        # Filter settings
        lf_f1, lf_f2 = self.lowerfreqband
        hf_f1, hf_f2 = self.higherfreqband
        st_lf = tr_lf.split()
        st_hf = tr_hf.split()
        # Integrate to displacement
        st_lf.integrate("cumtrapz")
        st_hf.integrate("cumtrapz")

        st_lf.filter(
            self.filtertype, freqmin=lf_f1, freqmax=lf_f2, corners=4, zerophase=False
        )
        st_hf.filter(
            self.filtertype, freqmin=hf_f1, freqmax=hf_f2, corners=4, zerophase=False
        )

        starttime = trace.stats.starttime
        endtime = trace.stats.endtime
        st_lf.merge(fill_value=np.nan)
        st_lf.trim(starttime, endtime, pad=True, fill_value=np.nan)
        st_hf.merge(fill_value=np.nan)
        st_hf.trim(starttime, endtime, pad=True, fill_value=np.nan)
        tr_lf = st_lf[0]
        tr_hf = st_hf[0]

        # DSAR calculation
        feature_data = []
        feature_idx = []

        # loop through data in interval blocks
        for t0, t1 in trace_window_times(tr_lf, self.interval, min_len=0.8):
            tr_lf_int = tr_lf.slice(t0, t1)
            tr_hf_int = tr_hf.slice(t0, t1)

            absolute_lf = np.absolute(tr_lf_int.data)
            absolute_hf = np.absolute(tr_hf_int.data)

            # calculate ratio
            if np.isnan(absolute_lf).any() or np.isnan(absolute_hf).any():
                median = np.nan
            else:
                m = absolute_hf > 0
                ratio = absolute_lf[m] / absolute_hf[m]
                median = np.median(ratio)

            feature_data.append(median)
            feature_idx.append(t0.strftime("%Y-%m-%dT%H:%M:%S"))

        xda = xr.DataArray(
            feature_data, coords=[pd.DatetimeIndex(feature_idx)], dims="datetime"
        )
        xdf = xr.Dataset({"dsar": xda})
        xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
        xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
        xdf.attrs["station"] = trace.stats.station
        if self.reindex:
            starttime = UTCDateTime(str(xda.datetime.data[0]))
            starttime = round_time(starttime, self.interval)
            endtime = UTCDateTime(str(xda.datetime.data[-1]))
            endtime = round_time(endtime, self.interval)
            new_index = pd.date_range(
                starttime.datetime, endtime.datetime, freq="%dS" % int(self.interval)
            )
            xdf = xdf.reindex(dict(datetime=new_index), method="nearest")
        self.feature = xdf
        return self.feature

compute(trace)

Compute the displacement seismic amplitude ratio (DSAR).

TODO:

Publication is quite vague and doesn't have enough detail to understand the method. Unclear whether median is every 10 minutes or every day. This affects the bootstrap calc- ulation too, but this might not be necessary..

Parameters:
  • stream (str) –

    The SLCN code for the seismic stream

  • startdate (:class:`obspy.UTCDateTime`) –

    Datetime of the first sample.

  • enddate (:class:`obspy.UTCDateTime`) –

    Datetime of the last sample.

  • interval (int, optional) –

    Length of the time interval in seconds over which to compute DSAR (default 10 mins)

Source code in src/zizou/dsar.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def compute(self, trace):
    """
    Compute the displacement seismic amplitude ratio (DSAR).

    # TODO:
    Publication is quite vague and doesn't have enough detail
    to understand the method. Unclear whether median is every
    10 minutes or every day. This affects the bootstrap calc-
    ulation too, but this might not be necessary..

    :param stream: The SLCN code for the seismic stream
    :type stream: str
    :param startdate: Datetime of the first sample.
    :type startdate: :class:`obspy.UTCDateTime`
    :param enddate: Datetime of the last sample.
    :type enddate: :class:`obspy.UTCDateTime`
    :param interval: Length of the time interval in seconds over
                     which to compute DSAR (default 10 mins)
    :type interval: int, optional
    """
    if len(trace) < 1:
        raise ValueError("Trace is empty.")

    logger.info(
        "Computing DSAR for {} between {} and {}.".format(
            ".".join(
                (
                    trace.stats.network,
                    trace.stats.station,
                    trace.stats.location,
                    trace.stats.channel,
                )
            ),
            trace.stats.starttime.isoformat(),
            trace.stats.endtime.isoformat(),
        )
    )

    tr_lf = trace.copy()
    # handle gaps (i.e. NaNs)
    mdata = np.ma.masked_invalid(tr_lf.data)
    tr_lf.data = mdata
    # Get second trace for alternative filtering
    tr_hf = tr_lf.copy()

    # Filter settings
    lf_f1, lf_f2 = self.lowerfreqband
    hf_f1, hf_f2 = self.higherfreqband
    st_lf = tr_lf.split()
    st_hf = tr_hf.split()
    # Integrate to displacement
    st_lf.integrate("cumtrapz")
    st_hf.integrate("cumtrapz")

    st_lf.filter(
        self.filtertype, freqmin=lf_f1, freqmax=lf_f2, corners=4, zerophase=False
    )
    st_hf.filter(
        self.filtertype, freqmin=hf_f1, freqmax=hf_f2, corners=4, zerophase=False
    )

    starttime = trace.stats.starttime
    endtime = trace.stats.endtime
    st_lf.merge(fill_value=np.nan)
    st_lf.trim(starttime, endtime, pad=True, fill_value=np.nan)
    st_hf.merge(fill_value=np.nan)
    st_hf.trim(starttime, endtime, pad=True, fill_value=np.nan)
    tr_lf = st_lf[0]
    tr_hf = st_hf[0]

    # DSAR calculation
    feature_data = []
    feature_idx = []

    # loop through data in interval blocks
    for t0, t1 in trace_window_times(tr_lf, self.interval, min_len=0.8):
        tr_lf_int = tr_lf.slice(t0, t1)
        tr_hf_int = tr_hf.slice(t0, t1)

        absolute_lf = np.absolute(tr_lf_int.data)
        absolute_hf = np.absolute(tr_hf_int.data)

        # calculate ratio
        if np.isnan(absolute_lf).any() or np.isnan(absolute_hf).any():
            median = np.nan
        else:
            m = absolute_hf > 0
            ratio = absolute_lf[m] / absolute_hf[m]
            median = np.median(ratio)

        feature_data.append(median)
        feature_idx.append(t0.strftime("%Y-%m-%dT%H:%M:%S"))

    xda = xr.DataArray(
        feature_data, coords=[pd.DatetimeIndex(feature_idx)], dims="datetime"
    )
    xdf = xr.Dataset({"dsar": xda})
    xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
    xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
    xdf.attrs["station"] = trace.stats.station
    if self.reindex:
        starttime = UTCDateTime(str(xda.datetime.data[0]))
        starttime = round_time(starttime, self.interval)
        endtime = UTCDateTime(str(xda.datetime.data[-1]))
        endtime = round_time(endtime, self.interval)
        new_index = pd.date_range(
            starttime.datetime, endtime.datetime, freq="%dS" % int(self.interval)
        )
        xdf = xdf.reindex(dict(datetime=new_index), method="nearest")
    self.feature = xdf
    return self.feature

SpectralFeatures

Bases: FeatureBaseClass

Source code in src/zizou/spectral_features.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class SpectralFeatures(FeatureBaseClass):
    features = ["central_freq", "bandwidth", "predom_freq"]

    def __init__(
        self,
        interval=600.0,
        filtertype="highpass",
        filterfreq=(0.5, None),
        ncep=8,
        configfile=None,
    ):
        super(SpectralFeatures, self).__init__()
        self.interval = float(interval)
        self.filtertype = filtertype
        self.filterfreq = filterfreq
        self.ncep = int(ncep)
        if configfile is not None:
            try:
                with open(configfile, "r") as fh:
                    c = yaml.safe_load(fh)
            except OSError:
                c = yaml.safe_load(configfile)
            self.interval = c["default"].get("interval", interval)
            cs = c.get("spectral_features")
            if cs is not None:
                self.filtertype = cs.get("filtertype", filtertype)
                freqs = cs.get("filterfreq")
                if freqs is not None:
                    self.filterfreq = (freqs["low"], freqs["high"])
                self.ncep = cs.get("num_cepstral", ncep)

    def compute(self, trace):
        """
        :param trace: The seismic waveform data
        :type trace: :class:`obspy.Trace`
        :return:
        DataFrame containing
        - central frequency (acceleration) in Hz
        - bandwidth parameter (acceleration)
        - predominant frequency (acceleration) in Hz
        - eight cepstral coefficients
        """
        if len(trace) < 1:
            msg = "Trace is empty."
            raise ValueError(msg)

        logger.info(
            "Computing spectral features for {} between {} and {}.".format(
                ".".join(
                    (
                        trace.stats.network,
                        trace.stats.station,
                        trace.stats.location,
                        trace.stats.channel,
                    )
                ),
                trace.stats.starttime.isoformat(),
                trace.stats.endtime.isoformat(),
            )
        )

        fs = trace.stats.sampling_rate

        keys = ["central_freq", "bandwidth", "predom_freq"]
        # keys += [f'cepstral_coef_{ind}' for ind in np.arange(self.ncep)]

        # initialise dataframe
        feature_data = []
        feature_idx = []

        # loop through data in interval blocks
        for tr_int in trace_window_data(trace, self.interval, min_len=0.8):
            if np.any(np.isnan(tr_int.data)):
                vals = np.full(len(keys), np.nan)
            else:
                tr_int.detrend("constant")
                tr_int.detrend("linear")
                apply_freq_filter(tr_int, self.filtertype, self.filterfreq)
                tr_int_diff = tr_int.differentiate()

                # Calculate psdf
                nfft = obspy.signal.util.next_pow_2(tr_int_diff.stats.npts)
                frequency = np.linspace(0, fs, nfft + 1)
                frequency = frequency[0 : nfft // 2]
                data_f = scipy.fftpack.fft(tr_int_diff.data, nfft)
                data_psd = np.abs(data_f[0 : nfft // 2]) ** 2

                # Interpolate onto logspace frequency vector
                f_ls = np.logspace(-2, 2, 401)  # These constants should be accessible
                data_psd_raw = np.interp(f_ls, frequency, data_psd)
                data_psd_ls = obspy.signal.util.smooth(data_psd_raw, 3)

                central_freq = self.centralFrequency(data_psd_ls, f_ls)
                bandwidth = self.bandwidth(data_psd_ls, f_ls)
                predom_freq = self.predominantFrequency(data_psd_ls.data, f_ls)
                # cepstral_coef = self.cepstralCoefficients(
                #     tr_int.data, fs, self.ncep
                # )
                vals = [central_freq, bandwidth, predom_freq]
                # vals = np.concatenate((vals, cepstral_coef, hob), axis=None)

            feature_data.append(vals)
            feature_idx.append(tr_int.stats.starttime.strftime("%Y-%m-%dT%H:%M:%S"))

        xdf = pd.DataFrame(
            data=feature_data,
            index=pd.DatetimeIndex(feature_idx, name="datetime"),
            columns=keys,
            dtype=float,
        ).to_xarray()
        xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
        xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
        xdf.attrs["station"] = trace.stats.station

        self.feature = xdf
        self.trace = trace
        return self.feature

    @staticmethod
    def predominantFrequency(psd, freq):
        """
        The peak frequency of the acceleration power spectral
        density function, in Hz.
        """
        predom_freq = freq[np.argmax(psd)]
        return predom_freq

    @staticmethod
    def centralFrequency(psd, freq):
        """
        The 'central' frequency of the acceleration power
        spectral density function (Hz). Defined as the
        square root of the ratio between the second and
        zero-th spectral moments. The spectral moments
        are calculated with log-spaced frequencies.
        """
        m0 = 2 * scipy.integrate.simpson(freq**0 * psd)
        m2 = 2 * scipy.integrate.simpson(freq**2 * psd)
        return np.sqrt(m2 / m0)

    @staticmethod
    def bandwidth(psd, freq):
        """
        The Vanmarcke (1970) bandwidth parameter, q where
        0 <= q <= 1. Calculated using the spectral moments
        of the acceleration power spectral density function.
        0 is a Dirac delta PSDF (sine wave), 1 is a flat PSDF
        (white signal).
        """
        m0 = 2 * scipy.integrate.simpson(freq**0 * psd)
        m1 = 2 * scipy.integrate.simpson(freq**1 * psd)
        m2 = 2 * scipy.integrate.simpson(freq**2 * psd)
        return np.sqrt(1 - m1**2 / m2 / m0)

    @staticmethod
    def cepstralCoefficients(data, fs, ncep=8, nfilters=10, window="Hamming"):
        cep = obspy.signal.freqattributes.log_cepstrum(
            data=np.expand_dims(data, axis=0),
            fs=fs,
            nc=ncep,
            p=nfilters,
            n=None,
            w=window,
        )
        return cep

bandwidth(psd, freq) staticmethod

The Vanmarcke (1970) bandwidth parameter, q where 0 <= q <= 1. Calculated using the spectral moments of the acceleration power spectral density function. 0 is a Dirac delta PSDF (sine wave), 1 is a flat PSDF (white signal).

Source code in src/zizou/spectral_features.py
158
159
160
161
162
163
164
165
166
167
168
169
170
@staticmethod
def bandwidth(psd, freq):
    """
    The Vanmarcke (1970) bandwidth parameter, q where
    0 <= q <= 1. Calculated using the spectral moments
    of the acceleration power spectral density function.
    0 is a Dirac delta PSDF (sine wave), 1 is a flat PSDF
    (white signal).
    """
    m0 = 2 * scipy.integrate.simpson(freq**0 * psd)
    m1 = 2 * scipy.integrate.simpson(freq**1 * psd)
    m2 = 2 * scipy.integrate.simpson(freq**2 * psd)
    return np.sqrt(1 - m1**2 / m2 / m0)

centralFrequency(psd, freq) staticmethod

The 'central' frequency of the acceleration power spectral density function (Hz). Defined as the square root of the ratio between the second and zero-th spectral moments. The spectral moments are calculated with log-spaced frequencies.

Source code in src/zizou/spectral_features.py
145
146
147
148
149
150
151
152
153
154
155
156
@staticmethod
def centralFrequency(psd, freq):
    """
    The 'central' frequency of the acceleration power
    spectral density function (Hz). Defined as the
    square root of the ratio between the second and
    zero-th spectral moments. The spectral moments
    are calculated with log-spaced frequencies.
    """
    m0 = 2 * scipy.integrate.simpson(freq**0 * psd)
    m2 = 2 * scipy.integrate.simpson(freq**2 * psd)
    return np.sqrt(m2 / m0)

compute(trace)

Parameters:
  • trace (:class:`obspy.Trace`) –

    The seismic waveform data

Returns:
  • DataFrame containing - central frequency (acceleration) in Hz - bandwidth parameter (acceleration) - predominant frequency (acceleration) in Hz - eight cepstral coefficients

Source code in src/zizou/spectral_features.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def compute(self, trace):
    """
    :param trace: The seismic waveform data
    :type trace: :class:`obspy.Trace`
    :return:
    DataFrame containing
    - central frequency (acceleration) in Hz
    - bandwidth parameter (acceleration)
    - predominant frequency (acceleration) in Hz
    - eight cepstral coefficients
    """
    if len(trace) < 1:
        msg = "Trace is empty."
        raise ValueError(msg)

    logger.info(
        "Computing spectral features for {} between {} and {}.".format(
            ".".join(
                (
                    trace.stats.network,
                    trace.stats.station,
                    trace.stats.location,
                    trace.stats.channel,
                )
            ),
            trace.stats.starttime.isoformat(),
            trace.stats.endtime.isoformat(),
        )
    )

    fs = trace.stats.sampling_rate

    keys = ["central_freq", "bandwidth", "predom_freq"]
    # keys += [f'cepstral_coef_{ind}' for ind in np.arange(self.ncep)]

    # initialise dataframe
    feature_data = []
    feature_idx = []

    # loop through data in interval blocks
    for tr_int in trace_window_data(trace, self.interval, min_len=0.8):
        if np.any(np.isnan(tr_int.data)):
            vals = np.full(len(keys), np.nan)
        else:
            tr_int.detrend("constant")
            tr_int.detrend("linear")
            apply_freq_filter(tr_int, self.filtertype, self.filterfreq)
            tr_int_diff = tr_int.differentiate()

            # Calculate psdf
            nfft = obspy.signal.util.next_pow_2(tr_int_diff.stats.npts)
            frequency = np.linspace(0, fs, nfft + 1)
            frequency = frequency[0 : nfft // 2]
            data_f = scipy.fftpack.fft(tr_int_diff.data, nfft)
            data_psd = np.abs(data_f[0 : nfft // 2]) ** 2

            # Interpolate onto logspace frequency vector
            f_ls = np.logspace(-2, 2, 401)  # These constants should be accessible
            data_psd_raw = np.interp(f_ls, frequency, data_psd)
            data_psd_ls = obspy.signal.util.smooth(data_psd_raw, 3)

            central_freq = self.centralFrequency(data_psd_ls, f_ls)
            bandwidth = self.bandwidth(data_psd_ls, f_ls)
            predom_freq = self.predominantFrequency(data_psd_ls.data, f_ls)
            # cepstral_coef = self.cepstralCoefficients(
            #     tr_int.data, fs, self.ncep
            # )
            vals = [central_freq, bandwidth, predom_freq]
            # vals = np.concatenate((vals, cepstral_coef, hob), axis=None)

        feature_data.append(vals)
        feature_idx.append(tr_int.stats.starttime.strftime("%Y-%m-%dT%H:%M:%S"))

    xdf = pd.DataFrame(
        data=feature_data,
        index=pd.DatetimeIndex(feature_idx, name="datetime"),
        columns=keys,
        dtype=float,
    ).to_xarray()
    xdf.attrs["starttime"] = trace.stats.starttime.isoformat()
    xdf.attrs["endtime"] = trace.stats.endtime.isoformat()
    xdf.attrs["station"] = trace.stats.station

    self.feature = xdf
    self.trace = trace
    return self.feature

predominantFrequency(psd, freq) staticmethod

The peak frequency of the acceleration power spectral density function, in Hz.

Source code in src/zizou/spectral_features.py
136
137
138
139
140
141
142
143
@staticmethod
def predominantFrequency(psd, freq):
    """
    The peak frequency of the acceleration power spectral
    density function, in Hz.
    """
    predom_freq = freq[np.argmax(psd)]
    return predom_freq

The main PCA script

PCA

Bases: AnomalyDetectionBaseClass

Source code in src/zizou/pca.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class PCA(AnomalyDetectionBaseClass):

    def __init__(self, modelfile='pcaModel.pkl', pca_features=None, interval=600,
                 n_components=4):
        self.pca_features = pca_features
        self.modelfile = modelfile
        self.interval = interval
        self.n_components = n_components
        self.__pca_model = None
        self.__mean = None
        self.__std = None
        self.__feature = None
        self.__training_run = False

    @property
    def pca_model(self):
        return self.__pca_model

    @property
    def mean(self):
        return self.__mean

    @property
    def std(self):
        return self.__std

    @property
    def feature(self):
        return self.__feature

    def fit(self, data):
        if os.path.isfile(self.modelfile):
            self._read_model_parameters(self.modelfile)
        else:
            pca = sklearn.decomposition.PCA(n_components=self.n_components)
            data = self.get_features(self.fq).values
            pca.fit(data)
            self.__pca_model = pca
            self.__mean = mns
            self.__std = stds
            self._write_model_parameters(self.modelfile)
        self.__training_run = True

    def _write_model_parameters(self, modelfile):
        dirName = os.path.dirname(modelfile)
        if not os.path.exists(dirName):
            os.makedirs(dirName)
        with open(modelfile, 'wb') as fh:
            pickle.dump([self.__pca_model, self.__mean, self.__std], fh)

    def _read_model_parameters(self, modelfile):
        with open(modelfile, 'rb') as fh:
            self.__pca_model, self.__mean, self.__std = pickle.load(fh)

    def infer(self, fq, savedir=None, overwrite=False):
        '''
        :param fq: Object to request features.
        :type fq: :class:`FeatureRequest`
        :param savedir:
        :param overwrite:
        :return:
        '''
        if not self.__training_run:
            msg = "Run 'training' first."
            raise ModelException(msg)

        feats, mns, stds = self.featureStack(fq)
        vals = self.pca_model.transform(feats.values.T)
        keys = ['pc%d' % i for i in range(self.n_components)]
        foo = xr.DataArray(vals, coords=[feats.datetime.values,
                                         keys],
                           dims=['datetime', 'pca_component'])
        xdf = xr.Dataset({'pca': foo})
        xdf.attrs['starttime'] = fq.starttime.isoformat()
        xdf.attrs['endtime'] = fq.endtime.isoformat()
        xdf.attrs['station'] = fq.site
        if savedir is not None:
            self.save_model_output(xdf, savedir, overwrite=overwrite)
        return xdf

infer(fq, savedir=None, overwrite=False)

Parameters:
  • fq (:class:`FeatureRequest`) –

    Object to request features.

  • savedir
  • overwrite
Returns:
Source code in src/zizou/pca.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def infer(self, fq, savedir=None, overwrite=False):
    '''
    :param fq: Object to request features.
    :type fq: :class:`FeatureRequest`
    :param savedir:
    :param overwrite:
    :return:
    '''
    if not self.__training_run:
        msg = "Run 'training' first."
        raise ModelException(msg)

    feats, mns, stds = self.featureStack(fq)
    vals = self.pca_model.transform(feats.values.T)
    keys = ['pc%d' % i for i in range(self.n_components)]
    foo = xr.DataArray(vals, coords=[feats.datetime.values,
                                     keys],
                       dims=['datetime', 'pca_component'])
    xdf = xr.Dataset({'pca': foo})
    xdf.attrs['starttime'] = fq.starttime.isoformat()
    xdf.attrs['endtime'] = fq.endtime.isoformat()
    xdf.attrs['station'] = fq.site
    if savedir is not None:
        self.save_model_output(xdf, savedir, overwrite=overwrite)
    return xdf

Train an autoencoder to reduce dimensionality of input features.

TiedAutoEncoder

Bases: Module

Applies an tied-weight autoencoder to the incoming data. From https://gist.github.com/northanapon/375e17fb395391c144deff20914e51df Parameters


in_features : int size of each input sample. h_features : List[int] a list of size of each layer for the encoder and the reverse for the decoder. activation : function, optional an activation function to apply for each layer (the default is torch.tanh).

Source code in src/zizou/autoencoder.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class TiedAutoEncoder(nn.Module):
    """
    Applies an tied-weight autoencoder to the incoming data.
    From https://gist.github.com/northanapon/375e17fb395391c144deff20914e51df
    Parameters
    -------------
    in_features : int
        size of each input sample.
    h_features : List[int]
        a list of size of each layer for the encoder and
        the reverse for the decoder.
    activation : function, optional
        an activation function to apply for each layer
        (the default is torch.tanh).
    """

    def __init__(self, in_features: int, h_features: List[int], activation=F.relu):
        """Create an autoencoder."""
        super().__init__()
        self.in_features = in_features
        self.h_features = h_features
        self.encoded_features = h_features[-1]
        self.activation = activation
        self.weights = nn.ParameterList([])
        self.enc_biases = nn.ParameterList([])
        self.dec_biases = nn.ParameterList([])
        in_dim = in_features
        for h in h_features:
            self.weights.append(nn.Parameter(torch.DoubleTensor(h, in_dim)))
            self.enc_biases.append(nn.Parameter(torch.DoubleTensor(h)))
            in_dim = h
        for h in reversed(h_features[:-1]):
            self.dec_biases.append(nn.Parameter(torch.DoubleTensor(h)))
        self.dec_biases.append(nn.Parameter(torch.DoubleTensor(in_features)))
        self.reset_parameters()
        self.loss = torch.nn.MSELoss(reduction="none")

    def forward(self, x: torch.DoubleTensor):
        """Return result of encoding and decoding."""
        dec = self.encode(x)
        return self.decode(dec)

    def encode(self, x: torch.DoubleTensor):
        """Return encoded data."""
        o = x
        for w, b in zip(self.weights[:-1], self.enc_biases[:-1]):
            o = F.linear(o, w, b)
            o = self.activation(o)
        return F.linear(o, self.weights[-1], self.enc_biases[-1])

    def decode(self, o: torch.DoubleTensor):
        """Return decoded data."""
        r_weights = list(reversed(self.weights))
        for w, b in zip(r_weights[:-1], self.dec_biases[:-1]):
            o = F.linear(o, w.t(), b)
            o = self.activation(o)
        return F.linear(o, r_weights[-1].t(), self.dec_biases[-1])

    def reset_parameters(self):
        """Reset linear module parameters (from nn.Linear)."""
        for w, b in zip(self.weights, self.enc_biases):
            nn.init.kaiming_uniform_(w, a=math.sqrt(5))
            if b is not None:
                fan_in, _ = nn.init._calculate_fan_in_and_fan_out(w)
                bound = 1 / math.sqrt(fan_in)
                nn.init.uniform_(b, -bound, bound)
        for w, b in zip(self.weights, self.dec_biases):
            if b is not None:
                fan_in, _ = nn.init._calculate_fan_in_and_fan_out(w.t())
                bound = 1 / math.sqrt(fan_in)
                nn.init.uniform_(b, -bound, bound)

__init__(in_features, h_features, activation=F.relu)

Create an autoencoder.

Source code in src/zizou/autoencoder.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(self, in_features: int, h_features: List[int], activation=F.relu):
    """Create an autoencoder."""
    super().__init__()
    self.in_features = in_features
    self.h_features = h_features
    self.encoded_features = h_features[-1]
    self.activation = activation
    self.weights = nn.ParameterList([])
    self.enc_biases = nn.ParameterList([])
    self.dec_biases = nn.ParameterList([])
    in_dim = in_features
    for h in h_features:
        self.weights.append(nn.Parameter(torch.DoubleTensor(h, in_dim)))
        self.enc_biases.append(nn.Parameter(torch.DoubleTensor(h)))
        in_dim = h
    for h in reversed(h_features[:-1]):
        self.dec_biases.append(nn.Parameter(torch.DoubleTensor(h)))
    self.dec_biases.append(nn.Parameter(torch.DoubleTensor(in_features)))
    self.reset_parameters()
    self.loss = torch.nn.MSELoss(reduction="none")

decode(o)

Return decoded data.

Source code in src/zizou/autoencoder.py
79
80
81
82
83
84
85
def decode(self, o: torch.DoubleTensor):
    """Return decoded data."""
    r_weights = list(reversed(self.weights))
    for w, b in zip(r_weights[:-1], self.dec_biases[:-1]):
        o = F.linear(o, w.t(), b)
        o = self.activation(o)
    return F.linear(o, r_weights[-1].t(), self.dec_biases[-1])

encode(x)

Return encoded data.

Source code in src/zizou/autoencoder.py
71
72
73
74
75
76
77
def encode(self, x: torch.DoubleTensor):
    """Return encoded data."""
    o = x
    for w, b in zip(self.weights[:-1], self.enc_biases[:-1]):
        o = F.linear(o, w, b)
        o = self.activation(o)
    return F.linear(o, self.weights[-1], self.enc_biases[-1])

forward(x)

Return result of encoding and decoding.

Source code in src/zizou/autoencoder.py
66
67
68
69
def forward(self, x: torch.DoubleTensor):
    """Return result of encoding and decoding."""
    dec = self.encode(x)
    return self.decode(dec)

reset_parameters()

Reset linear module parameters (from nn.Linear).

Source code in src/zizou/autoencoder.py
87
88
89
90
91
92
93
94
95
96
97
98
99
def reset_parameters(self):
    """Reset linear module parameters (from nn.Linear)."""
    for w, b in zip(self.weights, self.enc_biases):
        nn.init.kaiming_uniform_(w, a=math.sqrt(5))
        if b is not None:
            fan_in, _ = nn.init._calculate_fan_in_and_fan_out(w)
            bound = 1 / math.sqrt(fan_in)
            nn.init.uniform_(b, -bound, bound)
    for w, b in zip(self.weights, self.dec_biases):
        if b is not None:
            fan_in, _ = nn.init._calculate_fan_in_and_fan_out(w.t())
            bound = 1 / math.sqrt(fan_in)
            nn.init.uniform_(b, -bound, bound)

get_trace(data, colour, kws_ssam)

Return a plotly graphics object from a feature.

Parameters:
  • data (:class:`xarray.DataArray`) –

    Feature data to plot.

  • colour (str) –

    Colour for line plots.

Returns:
  • :class:`plotly.graph_objs.Scatter` | :class:`plotly.graph_objs.Figure`

    A 1-D line plot or a 2D heatmap.

Source code in src/zizou/visualise.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def get_trace(data, colour, kws_ssam):
    """
    Return a plotly graphics object from a feature.

    :param data: Feature data to plot.
    :type data: :class:`xarray.DataArray`
    :param colour: Colour for line plots.
    :type colour: str
    :returns: A 1-D line plot or a 2D heatmap.
    :rtype: :class:`plotly.graph_objs.Scatter` or
            :class:`plotly.graph_objs.Figure`
    """
    if data.name == "ssam":
        return plot_ssam_plotly(data, new_fig=False, dbscale=True, **kws_ssam)
    elif data.name == "filterbank":
        return plot_ssam_plotly(
            data, new_fig=False, dbscale=True, canvas_dim=None, **kws_ssam
        )
    elif data.name == "sonogram":
        return plot_ssam_plotly(
            data, new_fig=False, dbscale=False, canvas_dim=None, **kws_ssam
        )
    else:
        # data = data.fillna(data.mean())
        dates = data.datetime.data
        dates = pd.to_datetime(dates).to_pydatetime()
        featureData = data.to_dataframe()[data.name].interpolate("index").values
        return go.Scatter(
            x=dates,
            y=featureData,
            opacity=0.8,
            line_color=colour,
            name="<b>{}</b>".format(str(data.name)),
        )

multi_feature_plot(feature1, feature2, equal_yrange=False, rangeslider=False, log=False, kws_ssam=None)

Return a combined time-series plot of two features.

Parameters:
  • feature1 (str) –

    First feature to plot with y-scale on the left.

  • feature2 (str) –

    Second feature to plot with y-scale on the right.

  • equal_yrange (bool, default: False ) –

    If True align yscales.

  • rangeslider (bool, default: False ) –

    If True show secondary panel for zooming.

  • kws_ssam (dict, default: None ) –

    Keywords to be passed to :class:zizou.visualise.plot_ssam_plotly

Returns:
  • :class:`plotly.graph_objs.Figure`

    plotly figure that can be displayed in a Jupyter notebook or in a dash app.

Source code in src/zizou/visualise.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def multi_feature_plot(
    feature1, feature2, equal_yrange=False, rangeslider=False, log=False, kws_ssam=None
):
    """
    Return a combined time-series plot of two features.

    :param feature1: First feature to plot with y-scale on
                     the left.
    :type feature1: str
    :param feature2: Second feature to plot with y-scale on
                     the right.
    :type feature2: str
    :param equal_yrange: If True align yscales.
    :type equal_yrange: bool
    :param rangeslider: If True show secondary panel for zooming.
    :type rangeslider: bool
    :param kws_ssam: Keywords to be passed to
                     :class:`zizou.visualise.plot_ssam_plotly`
    :type kws_ssam: dict
    :returns: plotly figure that can be displayed in a Jupyter
              notebook or in a dash app.
    :rtype: :class:`plotly.graph_objs.Figure`
    """
    if kws_ssam is None:
        kws_ssam = dict()

    if feature2.name in ["ssam", "sonogram", "filterbank"]:
        # Always plot 2D features first
        feature_tmp = feature1
        feature1 = feature2
        feature2 = feature_tmp

    fig = make_subplots(specs=[[{"secondary_y": True}]])

    trace = get_trace(feature1, "deepskyblue", kws_ssam)
    fig.add_trace(trace, secondary_y=False)

    fig.update_yaxes(
        tickfont=dict(color="deepskyblue"), title_font=dict(color="deepskyblue")
    )
    if feature2.name not in ["ssam", "sonogram", "filterbank"]:
        if feature1.name in ["ssam", "sonogram", "filterbank"]:
            trace = get_trace(feature2, "darkred", kws_ssam)
            fig.add_trace(trace, secondary_y=True)
            fig.update_yaxes(
                secondary_y=True,
                tickfont=dict(color="darkred"),
                title_font=dict(color="darkred"),
            )
            fig.update_yaxes(
                tickfont=dict(color="#4d4db3"),
                title_font=dict(color="#4d4db3"),
                secondary_y=False,
            )
        else:
            trace = get_trace(feature2, "dimgray", kws_ssam)
            fig.add_trace(trace, secondary_y=True)
            fig.update_yaxes(
                secondary_y=True,
                tickfont=dict(color="dimgray"),
                title_font=dict(color="dimgray"),
            )

    fig.update_layout(
        title={
            "text": "Features time series",
            "y": 0.9,
            "x": 0.5,
            "xanchor": "center",
            "yanchor": "top",
        },
        height=500,
        xaxis_rangeslider_visible=rangeslider,
    )

    # Set y-axes titles
    fig.update_yaxes(
        title_text="<b>{}</b>".format(str(feature1.name)), secondary_y=False
    )
    fig.update_yaxes(
        title_text="<b>{}</b>".format(str(feature2.name)), secondary_y=True
    )

    if equal_yrange:
        if feature1.name in ["sonogram", "filterbank", "ssam"]:
            # Smallest positive number
            y = fig["data"][0].y
            y_min = y[y > 0][0]
            y_max = fig["data"][0].y.max()
            y_range = np.array([y_min, y_max])
            if log:
                y_range = np.log10(y_range)
            fig.update_yaxes(range=y_range)
        else:
            y_min = min(float(feature1.min()), float(feature2.min()))
            y_max = max(float(feature1.max()), float(feature2.max()))
            if y_max > y_min:
                fig.update_yaxes(range=[y_min, y_max])

    if log:
        fig.update_yaxes(type="log")

    return fig

multi_feature_plot_mpl(feature1, feature2, figsize=(12, 5), **kwargs)

Plot two features in one figure. 2D features have to be plotted first.

Parameters

feature1 : xarray.DataArray First feature to plot. Can be 1D or 2D. feature2 : xarray.DataArray Second feature to plot. Has to be 1D. figsize : tuple, optional Figure size, by default (12, 5)

Returns

matplotlib.figure.Figure Figure with two y-axis.

Source code in src/zizou/visualise.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def multi_feature_plot_mpl(feature1, feature2, figsize=(12, 5), **kwargs):
    """
    Plot two features in one figure. 2D features have to be plotted first.

    Parameters
    ----------
    feature1 : xarray.DataArray
        First feature to plot. Can be 1D or 2D.
    feature2 : xarray.DataArray
        Second feature to plot. Has to be 1D.
    figsize : tuple, optional
        Figure size, by default (12, 5)

    Returns
    -------
    matplotlib.figure.Figure
        Figure with two y-axis.

    """
    fig, ax1 = plt.subplots(figsize=figsize)
    if len(feature1.values.shape) == 2:
        ax1 = plot_ssam_mpl(feature1, axes=ax1)
    else:
        ax1.plot(mdates.date2num(feature1.datetime), feature1.data, color="deepskyblue")
        ax1.set_ylabel(feature1.name, color="deepskyblue")
    ax2 = ax1.twinx()
    ax2.plot(mdates.date2num(feature2.datetime), feature2.data, color="black")
    date_format = mdates.DateFormatter("%Y-%m-%d %H:%M:%S")
    ax1.xaxis.set_major_formatter(date_format)
    fig.autofmt_xdate()
    fig.colorbar(ax1.get_images()[0], ax=ax1)
    return fig

plot_ssam_mpl(xdf, axes=None, cmap=cm.viridis, figsize=(12, 6), **kwargs)

Plot spectrogram data using matplotlib.

Parameters:
  • xdf (:class:`xarray.Dataset`) –

    An xarray dataset as returned by :class:zizou.ssam.SSAM

  • log (bool) –

    Logarithmic frequency axis if True, linear frequency axis otherwise.

  • axes (:class:`matplotlib.axes.Axes`, default: None ) –

    Plot into given axes.

  • dbscale (bool) –

    If True 10 * log10 of color values is taken, if False the sqrt is taken.

  • cmap (:class:`matplotlib.colors.Colormap`, default: viridis ) –

    Specify a custom colormap instance.

  • clip ([float, float]) –

    adjust colormap to clip at lower and/or upper end. The given percentages of the amplitude range (linear or logarithmic depending on option dbscale) are clipped.

  • figsize ([float, float] >>> import numpy as np >>> from zizou.ssam import SSAM, test_signal >>> from zizou.visualise import plot_ssam_mpl >>> ft = SSAM(interval=60, per_lap=.9, smooth=10, frequencies=np.arange(0., 25.1, .1), timestamp='start') >>> _ = ft.compute(test_signal()) >>> fig = plot_ssam_mpl(ft.feature), default: (12, 6) ) –

    Horizontal and vertical dimension of :class:matplotlib.figure.Figure

Source code in src/zizou/visualise.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def plot_ssam_mpl(xdf, axes=None, cmap=cm.viridis, figsize=(12, 6), **kwargs):
    """
    Plot spectrogram data using matplotlib.

    :param xdf: An xarray dataset as returned by :class:`zizou.ssam.SSAM`
    :type xdf: :class:`xarray.Dataset`
    :param log: Logarithmic frequency axis if True, linear frequency axis
                otherwise.
    :type log: bool
    :param axes: Plot into given axes.
    :type axes: :class:`matplotlib.axes.Axes`
    :param dbscale: If True 10 * log10 of color values is taken, if False the
                    sqrt is taken.
    :type dbscale: bool
    :param cmap: Specify a custom colormap instance.
    :type cmap: :class:`matplotlib.colors.Colormap`
    :param clip: adjust colormap to clip at lower and/or upper end. The given
                 percentages of the amplitude range (linear or logarithmic depending
                 on option `dbscale`) are clipped.
    :type clip: [float, float]
    :param figsize: Horizontal and vertical dimension 
                    of :class:`matplotlib.figure.Figure`
    :type figsize: [float, float]

    >>> import numpy as np
    >>> from zizou.ssam import SSAM, test_signal
    >>> from zizou.visualise import plot_ssam_mpl
    >>> ft = SSAM(interval=60, per_lap=.9, smooth=10, \
                  frequencies=np.arange(0., 25.1, .1), \
                  timestamp='start')
    >>> _ = ft.compute(test_signal())
    >>> fig = plot_ssam_mpl(ft.feature)

    """

    dates, freq, spec = spectrogram_preprocessing(xdf, **kwargs)
    dates = mdates.date2num(dates)

    # db scale and remove zero/offset for amplitude
    clip = kwargs.get("clip", [0.0, 1.0])
    vmin, vmax = clip
    norm = Normalize(vmin, vmax, clip=True)
    # ignore warnings due to NaNs
    if not axes:
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(111)
    else:
        ax = axes

    # calculate half bin width
    halfbin_time = (dates[1] - dates[0]) / 2.0
    halfbin_freq = (freq[1] - freq[0]) / 2.0

    if kwargs.get("log", False):
        # pcolor expects one bin more at the right end
        freq = np.concatenate((freq, [freq[-1] + 2 * halfbin_freq]))
        dates = np.concatenate((dates, [dates[-1] + 2 * halfbin_time]))
        # center bin
        dates -= halfbin_time
        freq -= halfbin_freq
        # Log scaling for frequency values (y-axis)
        ax.set_yscale("log")
        # Plot times
        ps = ax.pcolormesh(dates, freq, spec, cmap=cmap)
    else:
        # this method is much much faster!
        spec = np.flipud(spec)
        # center bin
        extent = (
            dates[0] - halfbin_time,
            dates[-1] + halfbin_time,
            freq[0] - halfbin_freq,
            freq[-1] + halfbin_freq,
        )
        ps = ax.imshow(spec, interpolation="nearest", extent=extent, cmap=cmap)

    # set correct way of axis, whitespace before and after with window
    # length
    ax.axis("tight")
    ax.grid(False)

    if axes:
        return ax

    fig.colorbar(ps)
    ax.set_ylabel("Frequency [Hz]")
    date_format = mdates.DateFormatter("%Y-%m-%d %H:%M:%S")
    ax.xaxis.set_major_formatter(date_format)
    # x_ticks = np.datetime64(dates[0]) + ax.get_xticks().astype('timedelta64[s]')
    # x_tick_labels = x_ticks.astype('datetime64[h]')
    # ax.set_xticklabels(x_tick_labels, rotation=45)
    fig.autofmt_xdate()

    return fig

plot_ssam_plotly(xdf, cmap='Ice_r', new_fig=True, **kwargs)

Plot spectrogram data using matplotlib.

Parameters:
  • data (:class:`xarray.Dataset`) –

    An xarray dataset as returned by :class:zizou.ssam.SSAM

  • log (bool) –

    Logarithmic frequency axis if True, linear frequency axis otherwise.

  • dbscale (bool) –

    If True 10 * log10 of color values is taken, if False the sqrt is taken.

  • cmap (list, default: 'Ice_r' ) –

    Specify a custom colormap instance from plotly.colors.sequential.

  • clip ([float, float]) –

    adjust colormap to clip at lower and/or upper end. The given percentages of the amplitude range (linear or logarithmic depending on option dbscale) are clipped.

  • new_fig

    Whether to return a figure or just a graph object.

  • max_tpoints (int >>> import numpy as np >>> from zizou.ssam import SSAM, test_signal >>> from zizou.visualise import plot_ssam_plotly >>> ft = SSAM(interval=60, per_lap=.9, smooth=10, frequencies=np.arange(0., 25.1, .1), timestamp='start') >>> _ = ft.compute(test_signal()) >>> fig = plot_ssam_plotly(ft.feature)) –

    Maximum number of time intervals. If the datasets exceeds that number it will be resampled to max_tpoints.

Source code in src/zizou/visualise.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def plot_ssam_plotly(xdf, cmap="Ice_r", new_fig=True, **kwargs):
    """
    Plot spectrogram data using matplotlib.

    :param data: An xarray dataset as returned by :class:`zizou.ssam.SSAM`
    :type data: :class:`xarray.Dataset`
    :param log: Logarithmic frequency axis if True, linear frequency axis
                otherwise.
    :type log: bool
    :param dbscale: If True 10 * log10 of color values is taken, if False the
                    sqrt is taken.
    :type dbscale: bool
    :param cmap: Specify a custom colormap instance from
                 `plotly.colors.sequential`.
    :type cmap: list
    :param clip: adjust colormap to clip at lower and/or upper end. The given
                 percentages of the amplitude range (linear or logarithmic
                 depending on option `dbscale`) are clipped.
    :type clip: [float, float]
    :param new_fig: Whether to return a figure or just a graph object.
    :type new_flag: boolean
    :param max_tpoints: Maximum number of time intervals. If the datasets
                        exceeds that number it will be resampled
                        to max_tpoints.
    :type max_tpoints: int

    >>> import numpy as np
    >>> from zizou.ssam import SSAM, test_signal
    >>> from zizou.visualise import plot_ssam_plotly
    >>> ft = SSAM(interval=60, per_lap=.9, smooth=10, \
                  frequencies=np.arange(0., 25.1, .1), \
                  timestamp='start')
    >>> _ = ft.compute(test_signal())
    >>> fig = plot_ssam_plotly(ft.feature)
    """
    dates, freq, spec = spectrogram_preprocessing(xdf, **kwargs)
    if not new_fig:
        return go.Heatmap(z=spec, x=dates, y=freq, colorscale=cmap)

    fig = go.Figure(data=go.Heatmap(z=spec, x=dates, y=freq, colorscale=cmap))

    fig.update_layout(xaxis_nticks=20, yaxis_title="Frequency [Hz]")

    if kwargs.get("log", False):
        fig.update_yaxes(type="log")

    return fig

AnomalyDetectionBaseClass

Bases: object

Source code in src/zizou/anomaly_base.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class AnomalyDetectionBaseClass(object):
    def __init__(self, features, stacks=None, transforms=None):
        self.features = features
        self.stacks = stacks if stacks is not None else dict()
        self.transforms = transforms if transforms is not None else dict()

    def fit(self, starttime, endtime):
        """
        Train the model.
        """
        msg = "'training' is not implemented yet"
        raise NotImplementedError(msg)

    def infer(self, starttime, endtime):
        """
        Use the model to infer.
        """
        msg = "'inference' is not implemented yet"
        raise NotImplementedError(msg)

    def write_model_parameters(self):
        """
        Write principal components or
        model weights to file
        """
        msg = "'write_model_parameters' is not implemented yet"
        raise NotImplementedError(msg)

    def read_model_parameters(self):
        """
        Read principal components or
        model weights from file
        """
        msg = "'read_model_parameters' is not implemented yet"
        raise NotImplementedError(msg)

    def write_hyperparameters(self):
        """
        Store things like random seed,
        learning rate, number of principal
        components etc.
        """
        msg = "'write_hyperparameters' is not implemented yet"
        raise NotImplementedError(msg)

    def compute_anomaly_index(self):
        """
        A scalar value between 0 and 1 indicating
        the degree of anomaly (1=most anomaluous).
        """
        msg = "'compute_anomaly_index' is not implemented yet"
        raise NotImplementedError(msg)

compute_anomaly_index()

A scalar value between 0 and 1 indicating the degree of anomaly (1=most anomaluous).

Source code in src/zizou/anomaly_base.py
279
280
281
282
283
284
285
def compute_anomaly_index(self):
    """
    A scalar value between 0 and 1 indicating
    the degree of anomaly (1=most anomaluous).
    """
    msg = "'compute_anomaly_index' is not implemented yet"
    raise NotImplementedError(msg)

fit(starttime, endtime)

Train the model.

Source code in src/zizou/anomaly_base.py
240
241
242
243
244
245
def fit(self, starttime, endtime):
    """
    Train the model.
    """
    msg = "'training' is not implemented yet"
    raise NotImplementedError(msg)

infer(starttime, endtime)

Use the model to infer.

Source code in src/zizou/anomaly_base.py
247
248
249
250
251
252
def infer(self, starttime, endtime):
    """
    Use the model to infer.
    """
    msg = "'inference' is not implemented yet"
    raise NotImplementedError(msg)

read_model_parameters()

Read principal components or model weights from file

Source code in src/zizou/anomaly_base.py
262
263
264
265
266
267
268
def read_model_parameters(self):
    """
    Read principal components or
    model weights from file
    """
    msg = "'read_model_parameters' is not implemented yet"
    raise NotImplementedError(msg)

write_hyperparameters()

Store things like random seed, learning rate, number of principal components etc.

Source code in src/zizou/anomaly_base.py
270
271
272
273
274
275
276
277
def write_hyperparameters(self):
    """
    Store things like random seed,
    learning rate, number of principal
    components etc.
    """
    msg = "'write_hyperparameters' is not implemented yet"
    raise NotImplementedError(msg)

write_model_parameters()

Write principal components or model weights to file

Source code in src/zizou/anomaly_base.py
254
255
256
257
258
259
260
def write_model_parameters(self):
    """
    Write principal components or
    model weights to file
    """
    msg = "'write_model_parameters' is not implemented yet"
    raise NotImplementedError(msg)

Functionality common to several modules.

apply_freq_filter(data, filtertype, filterfreq, corners=4, zerophase=False, **options)

Apply frequency filter to seismic data.

Filter parameters are taken from instance attributes filtertype and filterfreq.

Parameters:
  • data (:class:`obspy.core.trace.Trace` | :class:``obspy.core.stream.Stream`) –

    The seismic data to be filtered.

  • corners (int, default: 4 ) –

    Filter corners / order, defaults to 4.

  • zerophase (bool, default: False ) –

    If True, apply filter once forwards and once backwards. Defaults to False.

  • options

    Passed to underlying obspy filter methods.

Source code in src/zizou/util.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def apply_freq_filter(
    data, filtertype, filterfreq, corners=4, zerophase=False, **options
):
    """Apply frequency filter to seismic data.

    Filter parameters are taken from instance attributes
    `filtertype` and `filterfreq`.

    :param data: The seismic data to be filtered.
    :type data: :class:`obspy.core.trace.Trace` or
        :class:``obspy.core.stream.Stream`
    :param corners: Filter corners / order, defaults to 4.
    :type corners: int
    :param zerophase: If True, apply filter once forwards and once
        backwards. Defaults to False.
    :type zerophase: bool
    :param options: Passed to underlying obspy filter methods.
    """
    f_min, f_max = filterfreq
    options.update({"zerophase": zerophase, "corners": corners})

    if filtertype in ["lp", "lowpass"]:
        if f_max is None:
            raise ValueError("Define upper frequency cutoff for low-pass filter.")
        data.filter("lowpass", freq=f_max, **options)

    elif filtertype in ["hp", "highpass"]:
        if f_min is None:
            raise ValueError("Define lower frequency cutoff for high-pass filter.")
        data.filter("highpass", freq=f_min, **options)

    elif filtertype in ["bp", "bandpass"]:
        if f_min is None or f_max is None:
            raise ValueError(
                "Define upper and lower requency cutoff for band-pass filter."
            )
        data.filter("bandpass", freqmin=f_min, freqmax=f_max, **options)

    elif filtertype is not None:
        raise ValueError(f"Unrecognised filtertype '{filtertype}'")

apply_hanning(x, return_window=None)

Apply a hanning window to the given 1D or 2D array along the given axis.

Parameters:
  • x (:class:`numpy.ndarray`) –

    1-D or 2-D array or sequence containing the data

  • return_window (bool, default: None ) –

    If true, also return the 1D values of the window that was applied.

Returns:
  • wtr[, wvals] input multiplied with the hanning window function, hanning window function

Source code in src/zizou/util.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def apply_hanning(x, return_window=None):
    """
    Apply a hanning window to the given 1D or 2D array along
    the given axis.

    :param x: 1-D or 2-D array or sequence containing the data
    :type x: :class:`numpy.ndarray`
    :param return_window: If true, also return the 1D values of the window
                          that was applied.
    :type return_window: bool
    :return: **wtr[, wvals]** input multiplied with the hanning
             window function, hanning window function
    """
    x = np.asarray(x)

    if x.ndim < 1 or x.ndim > 2:
        raise ValueError("only 1D or 2D arrays can be used")

    xshapetarg = x.shape[0]
    windowVals = np.hanning(xshapetarg) * np.ones(xshapetarg, dtype=x.dtype)

    if x.ndim == 1:
        if return_window:
            return windowVals * x, windowVals
        else:
            return windowVals * x

    windowVals = windowVals[:, np.newaxis]

    if return_window:
        return windowVals * x, windowVals
    else:
        return windowVals * x

demean(x, axis=None)

Return x minus the mean(x).

Parameters:
  • x (:class:`~numpy.ndarray`) –

    array or sequence containing the data can have any dimensionality

  • axis (int, default: None ) –

    The axis along which to take the mean. See numpy.mean for a description of this argument.

Returns:
  • :class:`~numpy.ndarray`

    input minus the arithmetic mean along the specified axis.

Source code in src/zizou/util.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def demean(x, axis=None):
    """
    Return x minus the mean(x).

    :param x: array or sequence containing the data can
              have any dimensionality
    :type x: :class:`~numpy.ndarray`
    :param axis: The axis along which to take the mean. See numpy.mean
                 for a description of this argument.
    :type axis: int
    :return: input minus the arithmetic mean along the specified axis.
    :rtype: :class:`~numpy.ndarray`
    """
    x = np.asarray(x)

    if axis is not None and axis + 1 > x.ndim:
        raise ValueError("axis(=%s) out of bounds" % axis)

    return x - np.nanmean(x, axis=axis, keepdims=True)

generate_test_data(dim=1, ndays=30, nfreqs=10, tstart=datetime.utcnow(), feature_name=None, freq_name=None)

Generate a 1D or 2D feature for testing.

Source code in src/zizou/util.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def generate_test_data(
    dim=1,
    ndays=30,
    nfreqs=10,
    tstart=datetime.utcnow(),
    feature_name=None,
    freq_name=None,
):
    """
    Generate a 1D or 2D feature for testing.
    """
    assert dim < 3
    assert dim > 0

    nints = ndays * 6 * 24
    dates = pd.date_range(tstart.strftime("%Y-%m-%d"), freq="10min", periods=nints)
    rs = np.random.default_rng(42)
    # Random walk as test signal
    data = np.abs(np.cumsum(rs.normal(0, 8.0, len(dates))))
    if dim == 2:
        data = np.tile(data, (nfreqs, 1))
    # Add 10% NaNs
    idx_nan = rs.integers(0, nints - 1, int(0.1 * nints))
    if dim == 1:
        data[idx_nan] = np.nan
        if feature_name is None:
            feature_name = "rsam"
        xrd = xr.Dataset(
            {feature_name: xr.DataArray(data, coords=[dates], dims=["datetime"])}
        )
    if dim == 2:
        data[:, idx_nan] = np.nan
        freqs = np.arange(nfreqs)
        if feature_name is None:
            feature_name = "ssam"
        if freq_name is None:
            freq_name = "frequency"
        xrd = xr.Dataset(
            {
                feature_name: xr.DataArray(
                    data, coords=[freqs, dates], dims=[freq_name, "datetime"]
                )
            }
        )
    xrd.attrs["starttime"] = UTCDateTime(dates[0]).isoformat()
    xrd.attrs["endtime"] = UTCDateTime(dates[-1]).isoformat()
    xrd.attrs["station"] = "MDR"
    return xrd

round_time(time, interval)

Find closest multiple of interval to time.

Parameters:
  • time (:class:`obspy.UTCDateTime`) –

    Time to round

  • interval (int) –

    Interval length in seconds

Returns:
  • :class:`obspy.UtCDateTime`

    The new time that is a multiple of interval

Source code in src/zizou/util.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def round_time(time, interval):
    """
    Find closest multiple of interval to time.

    :param time: Time to round
    :type time: :class:`obspy.UTCDateTime`
    :param interval: Interval length in seconds
    :type interval: int
    :returns: The new time that is a multiple of interval
    :rtype: :class:`obspy.UtCDateTime`
    """
    tmp = (time + 0.5 * interval).timestamp % interval
    ntime = (time + 0.5 * interval).timestamp - tmp
    return UTCDateTime(ntime)

stack_feature(xrdata, stack_length, interval='10min')

Stack features in a dataarray.

Parameters

xrdata : xarray.DataArray The data array to stack. stack_length : str The length of the stack. interval : str, optional The interval between stacks, by default "10min".

Source code in src/zizou/util.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def stack_feature(xrdata, stack_length, interval="10min"):
    """
    Stack features in a dataarray.

    Parameters
    ----------
    xrdata : xarray.DataArray
        The data array to stack.
    stack_length : str
        The length of the stack.
    interval : str, optional
        The interval between stacks, by default "10min".
    """
    num_periods = None
    valid_stack_units = ["W", "D", "h", "T", "min", "S"]
    if re.match(r"\d*\s*(\w*)", stack_length).group(1) not in valid_stack_units:
        raise ValueError(
            "Stack length should be one of: {}".format(", ".join(valid_stack_units))
        )

    if pd.to_timedelta(stack_length) < pd.to_timedelta(interval):
        raise ValueError(
            "Stack length {} is less than interval {}".format(stack_length, interval)
        )

    num_periods = pd.to_timedelta(stack_length) / pd.to_timedelta(interval)
    if not num_periods.is_integer():
        raise ValueError(
            "Stack length {} / interval {} = {}, but it needs"
            " to be a whole number".format(stack_length, interval, num_periods)
        )

    # Stack features
    logger.debug("Stacking feature...")
    try:
        xdf = xrdata.rolling(
            datetime=int(num_periods), center=False, min_periods=1
        ).mean()
    except ValueError as e:
        logger.error(e)
        logger.error(
            "Stack length {} is not valid for feature {}".format(
                stack_length, xrdata.name
            )
        )
    else:
        return xdf

stride_windows(x, n, noverlap)

Get all windows of x with length n as a single array, using strides to avoid data duplication.

Parameters:
  • x (:class:`~numpy.ndarray`) –

    1D array or sequence containing the data.

  • n (int) –

    The number of data points in each window.

  • noverlap (int) –

    The overlap between adjacent windows. Default is 0 (no overlap)

Returns:
  • :class:`~numpy.ndarray`

    Array with the windowed data in columns.

Source code in src/zizou/util.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def stride_windows(x, n, noverlap):
    """
    Get all windows of x with length n as a single array,
    using strides to avoid data duplication.

    :param x: 1D array or sequence containing the data.
    :type x: :class:`~numpy.ndarray`
    :param n: The number of data points in each window.
    :type n: int
    :param noverlap: The overlap between adjacent windows.
                     Default is 0 (no overlap)
    :type noverlap: int
    :return: Array with the windowed data in columns.
    :rtype: :class:`~numpy.ndarray`
    """

    if noverlap >= n:
        raise ValueError("noverlap must be less than n")
    if n < 1:
        raise ValueError("n cannot be less than 1")

    if n == 1 and noverlap == 0:
        return x[np.newaxis]
    if n > x.size:
        raise ValueError("n cannot be greater than the length of x")

    # np.lib.stride_tricks.as_strided easily leads to memory corruption for
    # non integer shape and strides, i.e. noverlap or n. See #3845.
    noverlap = int(noverlap)
    n = int(n)

    step = n - noverlap
    shape = (n, (x.shape[-1] - noverlap) // step)
    strides = (x.strides[0], step * x.strides[0])
    return np.lib.stride_tricks.as_strided(
        x, shape=shape, strides=strides, writeable=False
    )

test_signal(nsec=3600, sampling_rate=100.0, frequencies=[0.1, 3.0, 10.0], amplitudes=[0.1, 1.0, 0.7], phases=[0.0, np.pi * 0.25, np.pi], offsets=[0.0, 0.0, 0.0], starttime=UTCDateTime(1970, 1, 1), gaps=False, noise=True, noise_std=0.5, sinusoid=True, addchirp=True, network='NZ', station='BLUB', location='', channel='HHZ')

Produce a test signal for which we know where the peaks are in the spectrogram.

Parameters:
  • nsec (int, default: 3600 ) –

    Length of the trace in seconds.

  • sampling_rate (float, default: 100.0 ) –

    Sampling rate of the signal in Hz.

  • starttime (:class:`~obspy.UTCDateTime`, default: UTCDateTime(1970, 1, 1) ) –

    Starttime of the trace.

  • gaps (boolean, default: False ) –

    If 'True' add gaps to the test signal.

  • noise_std (float, default: 0.5 ) –

    Standard deviation of the noise.

  • sinusoid (bool, default: True ) –

    Add a signal with a sinusoidal frequency change.

  • station (str, default: 'BLUB' ) –

    Station name of the returned trace.

  • location (str, default: '' ) –

    Location of the returned trace.

Returns:
  • :class:`~obspy.Trace`

    Time series of test data

Source code in src/zizou/util.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def test_signal(
    nsec=3600,
    sampling_rate=100.0,
    frequencies=[0.1, 3.0, 10.0],
    amplitudes=[0.1, 1.0, 0.7],
    phases=[0.0, np.pi * 0.25, np.pi],
    offsets=[0.0, 0.0, 0.0],
    starttime=UTCDateTime(1970, 1, 1),
    gaps=False,
    noise=True,
    noise_std=0.5,
    sinusoid=True,
    addchirp=True,
    network="NZ",
    station="BLUB",
    location="",
    channel="HHZ",
):
    """
    Produce a test signal for which we know where the peaks
    are in the spectrogram.

    :param nsec: Length of the trace in seconds.
    :type nsec: int
    :param sampling_rate: Sampling rate of the signal in Hz.
    :type sampling_rate: float
    :param starttime: Starttime of the trace.
    :type starttime: :class:`~obspy.UTCDateTime`
    :param gaps: If 'True' add gaps to the test signal.
    :type gaps: boolean
    :param noise_std: Standard deviation of the noise.
    :type noise_std: float
    :param sinusoid: Add a signal with a sinusoidal frequency change.
    :type sinusoid: bool
    :param station: Station name of the returned trace.
    :type station: str
    :param location: Location of the returned trace.
    :type location: str
    :return: Time series of test data
    :rtype: :class:`~obspy.Trace`
    """
    t = np.arange(0, nsec, 1 / sampling_rate)
    signals = []
    # Some constant signals with different phases
    for f, A, ph, dt in zip(frequencies, amplitudes, phases, offsets):
        _s = np.zeros(t.size)
        dt_idx = int(dt * sampling_rate)
        _s[dt_idx:] = A * np.sin(2.0 * np.pi * f * t[dt_idx:] + ph)
        signals.append(_s)
    if addchirp:
        # Frequency-swept signals
        # Chirp
        s4 = 0.8 * chirp(t, 5, t[-1], 15, method="quadratic")
        signals.append(s4)

    if sinusoid:
        vals = [6]
        for k in range(0, 7):
            vals.append(
                2
                * (-1) ** k
                * np.power(2 * np.pi / nsec, 2 * k + 1)
                / M.factorial(2 * k + 1)
            )
            vals.append(0)
        p = np.poly1d(np.array(vals[::-1]))
        s5 = sweep_poly(t, p)
        signals.append(s5)

    # add some noise
    if noise:
        rs = np.random.default_rng(42)
        noise = rs.normal(loc=0.0, scale=noise_std, size=t.size)
        for s in signals:
            noise += s
        signal = noise
    else:
        signal = signals[0]
        for s in signals[1:]:
            signal += s
    stats = {
        "network": network,
        "station": station,
        "location": location,
        "channel": channel,
        "npts": len(signal),
        "sampling_rate": sampling_rate,
        "mseed": {"dataquality": "D"},
    }
    stats["starttime"] = starttime
    stats["endtime"] = stats["starttime"] + nsec
    if gaps:
        p1 = (0.27, 0.33)
        p2 = (0.69, 0.83)
        p3 = (0.95, 1)
        for pmin, pmax in [p1, p2, p3]:
            idx0 = int(pmin * nsec * sampling_rate)
            idx1 = int(pmax * nsec * sampling_rate)
            signal[idx0:idx1] = np.nan
    return Trace(data=signal, header=stats)

trace_window_data(trace, window_len, min_len=0.8)

Subdivide an obspy.core.trace.Trace into windows of interval length and iterate over windows.

:yield: A trace data windowNone

Parameters:
  • trace (:class:`obspy.core.trace.Trace`) –

    The data to be windowed.

  • window_len (float) –

    The window length in seconds.

  • min_len (float, optional, default: 0.8 ) –

    Windows with length < min_length * window_len are skipped, defaults to 0.8

Source code in src/zizou/util.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def trace_window_data(trace, window_len, min_len=0.8):
    """Subdivide an `obspy.core.trace.Trace` into windows of `interval`
    length and iterate over windows.

    :param trace: The data to be windowed.
    :type trace: :class:`obspy.core.trace.Trace`
    :param window_len: The window length in seconds.
    :type window_len: float
    :param min_len: Windows with length < `min_length * window_len` are
        skipped, defaults to 0.8
    :type min_len: float, optional
    :yield: A trace data windowNone
    :rtype: `obspy.core.trace.Trace`
    """

    for t0, t1 in trace_window_times(trace, window_len, min_len):
        yield trace.slice(t0, t1, nearest_sample=False)

trace_window_times(trace, window_len, min_len=0.8)

Subdivide an obspy.core.trace.Trace into windows of window_len length and iterate over window start & end times.

:yield: The window start & end times

Parameters:
  • trace (:class:`obspy.core.trace.Trace`) –

    The data to be windowed.

  • window_len (float) –

    The window length in seconds.

  • min_len (float, optional, default: 0.8 ) –

    Windows with length < min_len * window_len are skipped, defaults to 0.8

Source code in src/zizou/util.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def trace_window_times(trace, window_len, min_len=0.8):
    """Subdivide an `obspy.core.trace.Trace` into windows of `window_len`
    length and iterate over window start & end times.

    :param trace: The data to be windowed.
    :type trace: :class:`obspy.core.trace.Trace`
    :param window_len: The window length in seconds.
    :type window_len: float
    :param min_len: Windows with length < `min_len * window_len` are
        skipped, defaults to 0.8
    :type min_len: float, optional
    :yield: The window start & end times
    :rtype: :class:`obspy.core.utcdatetime.UTCDateTime`,
        :class:`obspy.core.utcdatetime.UTCDateTime`,
    """

    min_len = float(min_len)
    if not 0 < min_len <= 1.0:
        raise ValueError("min_len <= 0 or > 1.")
    min_len = min_len * window_len

    if (trace.stats.endtime - trace.stats.starttime) < min_len:
        raise ValueError("window length > total trace data.")

    t0 = trace.stats.starttime
    while t0 < trace.stats.endtime:
        t1 = min(t0 + window_len, trace.stats.endtime)
        this_window_len = t1 - t0
        if this_window_len >= min_len:
            if this_window_len < window_len:
                t0 = t1 - window_len
            yield t0, t1

        t0 += window_len

window_array(x, nwin, noverlap, remove_mean=True, taper=True, padval=0, return_window=True)

Take a 1-D numpy array and devide it into (overlapping) windows.

Parameters:
  • x (:class:`~numpy.ndarray`) –

    1D array containing the data.

  • n (int) –

    The number of data points in each window. If this is not a power of 2, the length of each window will be the nearest power of 2.

  • noverlap (int) –

    The number of overlapping points between adjacent windows.

  • remove_mean (bool, default: True ) –

    Remove the individual mean from each window.

  • taper (bool, default: True ) –

    Apply a hanning taper to each window.

  • return_window (bool, default: True ) –

    If taper is 'True', return the values of the hanning taper.

  • padval (float, class:`numpy.nan`, | None, default: 0 ) –

    If this is not 'None', padval will be used to pad the input array such that its length is a multiple of the window length.

Returns:
  • :class:`~numpy.ndarray`

    Array with the windowed data in columns.

Source code in src/zizou/util.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def window_array(
    x, nwin, noverlap, remove_mean=True, taper=True, padval=0, return_window=True
):
    """
    Take a 1-D numpy array and devide it into (overlapping)
    windows.

    :param x: 1D array containing the data.
    :type x: :class:`~numpy.ndarray`
    :param n: The number of data points in each window. If this
              is not a power of 2, the length of each window will be
              the nearest power of 2.
    :type n: int
    :param noverlap: The number of overlapping points between
                     adjacent windows.
    :type noverlap: int
    :param remove_mean: Remove the individual mean from each
                        window.
    :type remove_mean: bool
    :param taper: Apply a hanning taper to each window.
    :type taper: bool
    :param return_window: If taper is 'True', return the values
                          of the hanning taper.
    :type return_window: bool
    :param padval: If this is not 'None', padval will be used to
                   pad the input array such that its length is
                   a multiple of the window length.
    :type padval: float, class:`numpy.nan`, or None
    :return: Array with the windowed data in columns.
    :rtype: :class:`~numpy.ndarray`
    """
    x = x.astype("float")
    npts = x.size
    if nwin > npts:
        msg = "window length can't be greater than array length"
        raise ValueError(msg)

    # step length that the window is advanced by
    step = nwin - noverlap
    # compute the remainder of the trace after dividing it
    # into windows
    rest = (npts - nwin) % step

    # if the remainder is >0 pad with zeros
    if rest > 0:
        npts_pad = npts + (step - rest)
        x = np.resize(x, int(npts_pad))
        x[npts:] = padval

    result = stride_windows(x, nwin, noverlap)
    if remove_mean:
        result = demean(result, axis=0)
    if taper:
        result, windowVals = apply_hanning(result, return_window=True)
        if return_window:
            return result, windowVals
    return result