Source code for SeisMonitor.monitor.downloader.seismonitor

"""
Concurrent seismic waveform downloader.

This module provides the :class:`MseedDownloader` class, which manages
parallel waveform downloads from multiple seismic data providers.

The downloader supports:

* Inventory and metadata generation
* Concurrent waveform downloads
* Chunked time-window requests
* Optional waveform processing pipelines

The implementation is compatible with ObsPy-based FDSN clients.

Example
-------
>>> from obspy.clients.fdsn import Client
>>> client = Client("IRIS")
>>> downloader = MseedDownloader([client])
>>> isinstance(downloader.providers, list)
True
"""

import os
import json
import time
import logging
import concurrent.futures as cf
from datetime import timedelta
from SeisMonitor.utils import printlog, isfile
from . import utils as ut


[docs] class MseedDownloader: """ Concurrent mass downloader for seismic waveform data. This class manages metadata retrieval and waveform downloads from multiple providers using concurrent execution. Parameters ---------- providers : list List of provider objects or ObsPy-compatible clients. Attributes ---------- providers : list Sanitized provider list. providers_are_processed : bool Indicates whether providers have been processed. _stations_outside_domains : set or None Stations that fall outside requested domains. Notes ----- Providers must implement the required waveform and station interfaces. """ def __init__(self, providers): """ Initialize the downloader. Parameters ---------- providers : list List of provider instances. """ self.providers = ut.sanitize_provider_times(providers) self.providers_are_processed = False self._stations_outside_domains = None
[docs] def make_inv_and_json(self, out_folder=None): """ Create merged inventory and station metadata JSON files. Parameters ---------- out_folder : str, optional Output directory where files will be written. Returns ------- tuple Tuple containing: * Inventory object * Station metadata dictionary Notes ----- If ``out_folder`` is provided, the following files are created: * ``stations.json`` * ``inv.xml`` """ tic = time.time() printlog("info", "metadata", "running to create inventory and json files") inv, json_info, self.providers, sod = ut.get_merged_inv_and_json(self.providers.copy()) self.providers_are_processed = True self._stations_outside_domains = sod if out_folder: json_out = os.path.join(out_folder, "stations.json") inv_out = os.path.join(out_folder, "inv.xml") isfile(inv_out) inv.write(inv_out, format="STATIONXML") isfile(json_out) with open(json_out, 'w') as fp: json.dump(json_info, fp) toc = time.time() exetime = timedelta(seconds=toc - tic) printlog( "info", "metadata", f"Total time of execution: {exetime.total_seconds()} seconds" ) return inv, json_info
[docs] def download( self, mseed_storage, chunklength_in_sec=None, threshold=60, overlap_in_sec=0, picker_args={}, groupby='{network}.{station}.{channel}', n_processor=None): """ Download waveform data from all configured providers. Parameters ---------- mseed_storage : str Storage path template for MiniSEED files. chunklength_in_sec : int, optional Length of each download chunk in seconds. threshold : int, default=60 Minimum waveform length threshold in seconds. overlap_in_sec : int, default=0 Overlap between adjacent chunks in seconds. picker_args : dict, optional Picker configuration dictionary. groupby : str, default="{network}.{station}.{channel}" Trace grouping pattern. n_processor : int, optional Number of concurrent workers. Notes ----- Supported path template variables include: * ``{network}`` * ``{station}`` * ``{location}`` * ``{channel}`` * ``{year}`` * ``{month}`` * ``{day}`` * ``{julday}`` * ``{starttime}`` * ``{endtime}`` """ if not self.providers_are_processed: self.make_inv_and_json() for provider in self.providers: provider = provider.copy() client = provider.client waveform_restrictions = provider.waveform_restrictions processing = provider.processing download_restrictions = ut.DownloadRestrictions( mseed_storage, chunklength_in_sec, threshold, overlap_in_sec, picker_args, groupby, n_processor ) self._run_download( client, waveform_restrictions, download_restrictions, processing )
def _run_download(self, client, waveform_restrictions, download_restrictions, processing=None): """ Execute waveform downloads for a single provider. Parameters ---------- client : object ObsPy-compatible client instance. waveform_restrictions : object Waveform restriction configuration. download_restrictions : object Download restriction configuration. processing : list of callable, optional Processing pipeline applied after download. """ tic = time.time() times = ut.get_chunktimes( starttime=waveform_restrictions.starttime, endtime=waveform_restrictions.endtime, chunklength_in_sec=download_restrictions.chunklength_in_sec, overlap_in_sec=download_restrictions.overlap_in_sec ) logger_chunktimes = logging.getLogger('Downloader: chunktime') logger_chunktimes.info(f'Total chunktime list: {len(times)}') chunktic = time.time() for chunkt, (starttime, endtime) in enumerate(times): def get_client_waveforms_by_thread(netsta): """Download waveforms for a single network-station pair.""" bulk = ( netsta[0], netsta[1], waveform_restrictions.location, waveform_restrictions.channel, starttime, endtime ) ut.write_client_waveforms( client, bulk, waveform_restrictions, download_restrictions, processing ) if download_restrictions.n_processor == 1: for netsta in waveform_restrictions.bulk_info: get_client_waveforms_by_thread(netsta) else: with cf.ThreadPoolExecutor(download_restrictions.n_processor) as executor: executor.map( get_client_waveforms_by_thread, waveform_restrictions.bulk_info ) chunktoc = time.time() wav_exetime = timedelta(seconds=chunktoc - chunktic)
if __name__ == "__main__": logging.basicConfig( level=logging.DEBUG, format='%(asctime)s [%(levelname)s] [%(name)s] %(message)s', datefmt='%m-%d %H:%M' ) from obspy.clients.fdsn import Client as FDSN_Client from obspy.core.utcdatetime import UTCDateTime from restrictions import DownloadRestrictions client = FDSN_Client('http://10.100.100.232:8091') restrictions = DownloadRestrictions( network="CM", station="AGCC,EZNC,SNPBC,MORC,OCNC,SML1C,VMM*,BRR*,LL*,OCA,PAM,BAR2,PTB,ZAR,RUS,SPBC,NOR,HEL", starttime=UTCDateTime("2017-09-22T00:00:00.000000Z"), endtime=UTCDateTime("2017-09-23T00:00:00.000000Z"), chunklength_in_sec=86400, overlap_in_sec=None, groupby='{network}.{station}.{location}', threshold=60, location_preferences=["", "00", "20", "10", "40"], channel_preferences=["HH", "BH", "EH", "HN", "HL"], to_pick=(10, 0.3) ) mseed_storage = ( "/home/ecastillo/downloads/" "{network}/{station}/{network}.{station}.{location}.{channel}__{starttime}__{endtime}.mseed" ) md = MseedDownloader([client]) md.download( restrictions, mseed_storage, n_processor=16, concurrent_feature="thread" )