"""
Utility functions for seismic waveform downloading and inventory handling.
This module provides helper classes and functions used for:
* Waveform downloading
* MiniSEED file management
* Inventory filtering
* Station metadata extraction
* Stream preference selection
* Chunked time-window generation
The implementation is designed for ObsPy-based workflows.
"""
import os
import json
import time
import logging
import datetime as dt
from SeisMonitor.utils import printlog
from obspy.core.stream import Stream
from obspy.clients.fdsn.mass_downloader import Restrictions
from obspy.core.inventory.inventory import Inventory, read_inventory
from obspy.clients.fdsn.mass_downloader.domain import RectangularDomain
[docs]
class DownloadRestrictions:
"""
Container class defining waveform download restrictions.
Parameters
----------
mseed_storage : str
MiniSEED output path template.
chunklength_in_sec : int, optional
Length of each download chunk in seconds.
threshold : int, default=60
Minimum waveform duration threshold in seconds.
overlap_in_sec : int, default=0
Chunk overlap in seconds.
picker_args : dict, optional
Picker configuration dictionary.
groupby : str, default="{network}.{station}.{channel}"
Grouping rule for traces.
n_processor : int, optional
Number of parallel workers.
Attributes
----------
mseed_storage : str
MiniSEED output path template.
chunklength_in_sec : int or None
Chunk duration in seconds.
threshold : int
Minimum waveform duration threshold.
overlap_in_sec : int
Overlap between chunks.
picker_args : dict
Picker configuration.
groupby : str
Trace grouping rule.
n_processor : int or None
Number of parallel workers.
"""
def __init__(
self,
mseed_storage,
chunklength_in_sec=None,
threshold=60,
overlap_in_sec=0,
picker_args={},
groupby='{network}.{station}.{channel}',
n_processor=None):
"""
Initialize download restrictions.
"""
self.mseed_storage = mseed_storage
self.chunklength_in_sec = chunklength_in_sec
self.threshold = threshold
self.overlap_in_sec = overlap_in_sec
self.picker_args = picker_args
self.groupby = groupby
self.n_processor = n_processor
[docs]
def sanitize_provider_times(providers):
"""
Ensure all providers share the same time interval.
Parameters
----------
providers : list
Provider list.
Returns
-------
list
Validated provider list.
Raises
------
ValueError
If providers use different time intervals.
"""
provider_times = [(p.waveform_restrictions.starttime,
p.waveform_restrictions.endtime) for p in providers]
if provider_times.count(provider_times[0]) == len(provider_times):
return providers
raise Exception("Providers must have the same interval time")
[docs]
def get_max_allowed_batch_size(data_length, segment_length, overlap):
"""
Compute the maximum valid batch size.
Parameters
----------
data_length : float
Total waveform duration in seconds.
segment_length : float
Segment duration in seconds.
overlap : float
Fractional overlap between segments.
Returns
-------
int
Maximum allowed batch size.
"""
max_batch_size = (data_length - (overlap * segment_length)) / (segment_length * (1 - overlap))
max_batch_size = int(max_batch_size)
return max(1, max_batch_size)
[docs]
def write_stream(st, mseed_storage, threshold=None, picker_args={}, ppc_and_comment=[False, ""]):
"""
Write a waveform stream to MiniSEED format.
Parameters
----------
st : Stream
ObsPy stream object.
mseed_storage : str
Output storage template.
threshold : int, optional
Minimum waveform duration threshold.
picker_args : dict, optional
Picker validation parameters.
ppc_and_comment : sequence, optional
Tuple/list containing:
* Preprocessed flag
* Log comment
"""
ppc, comment = ppc_and_comment
tr = st[0]
mseed_filename = get_mseed_filename(mseed_storage, tr, ppc)
download = True
if threshold is not None:
length = abs(tr.stats.endtime - tr.stats.starttime)
if length < threshold:
comment = f"length:{length} < threshold:{threshold}"
printlog("info", "Downloader: False", f"{mseed_filename} {comment}")
download = False
if picker_args:
overlap = picker_args["overlap"]
batch_size = picker_args["batch_size"]
segment_length = picker_args["length"]
data_length = abs(tr.stats.endtime - tr.stats.starttime)
max_batch_size = get_max_allowed_batch_size(data_length, segment_length, overlap)
if max_batch_size < batch_size:
comment = (
f"This mseed only can be used with {max_batch_size} batches. "
f"In order to download the data, the batch size must be >= {batch_size}. "
f"Modify this condition changing 'picker_args':{picker_args}' parameter."
)
printlog("info", "Downloader: False", f"{mseed_filename} {comment}")
download = False
if not os.path.isfile(mseed_filename) and download:
mseed_dir = os.path.dirname(mseed_filename)
if not os.path.isdir(mseed_dir):
os.makedirs(mseed_dir)
st.write(mseed_filename, format="MSEED")
printlog("info", "Downloader: True", f"{mseed_filename} {comment}")
elif os.path.isfile(mseed_filename):
printlog("info", "Downloader: Exist", f"{mseed_filename} {comment}")
[docs]
def get_mseed_filename(_str, tr, ppc=False):
"""
Generate a MiniSEED filename from a template.
Parameters
----------
template : str
Output path template.
trace : Trace
ObsPy trace object.
ppc : bool, default=False
Append preprocessing suffix.
Returns
-------
str
Generated file path.
Raises
------
TypeError
If the generated path is invalid.
"""
strftime = "%Y%m%dT%H%M%SZ"
params = {
'network': tr.stats.network,
'station': tr.stats.station,
'location': tr.stats.location,
'channel': tr.stats.channel,
'starttime': tr.stats.starttime.strftime(strftime),
'endtime': tr.stats.endtime.strftime(strftime),
'year': tr.stats.starttime.year,
'month': tr.stats.starttime.month,
'day': tr.stats.endtime.day,
'julday': tr.stats.endtime.julday
}
path = _str.format(**params)
if ppc:
path += ".ppc"
if not isinstance(path, (str, bytes)):
raise TypeError(f"'{path}' is not a filepath.")
return path
[docs]
def get_chunktimes(starttime, endtime, chunklength_in_sec, overlap_in_sec=0):
"""
Generate chunked time intervals.
Parameters
----------
starttime : UTCDateTime
Start time.
endtime : UTCDateTime
End time.
chunklength_in_sec : int, optional
Chunk duration in seconds.
overlap_in_sec : int, optional
Chunk overlap in seconds.
Returns
-------
list
List of ``(starttime, endtime)`` tuples.
Raises
------
ValueError
If ``chunklength_in_sec`` equals zero.
"""
if chunklength_in_sec == 0:
raise Exception("chunklength_in_sec must be different than 0")
if chunklength_in_sec is None:
return [(starttime, endtime)]
if overlap_in_sec is None:
overlap_in_sec = 0
deltat = starttime
dtt = dt.timedelta(seconds=chunklength_in_sec)
overlap_dt = dt.timedelta(seconds=overlap_in_sec)
times = []
while deltat < endtime:
if deltat + dtt > endtime:
break
times.append((deltat, deltat + dtt))
deltat += dtt - overlap_dt
if deltat < endtime:
times.append((deltat, endtime))
return times
[docs]
def get_st_according2preference(st,location_list,channel_list):
"""
Suppose that your preference_type is "location" and your
location_list is ["00","20","10"], then this function first
filter the stream according to location and returns
a new stream only with location "00", if no exist "00" will
continue with the next preference "20", and otherwise, "10".
After that, it is going to take new stream and it will go to
filter according to channel_list preference if the new stream
has more than one channel type ("HH or BH).
Parameters
----------
st : Stream
ObsPy stream object.
location_list : list
Preferred locations ordered by priority.
channel_list : list
Preferred channel prefixes ordered by priority.
Returns
-------
Stream
Filtered stream.
"""
logger =logging.getLogger(f'Downloader: preference')
if len(st) >= 1000:
stats = st[0].stats
# logger.error(f"{stats.network}-{stats.station}: "+
# f"No filter by preference. gaps: {len(st)} ")
printlog("error",'Downloader: preference',
f"{stats.network}-{stats.station}"+
f"No filter by preference. gaps: {len(st)}")
return st
preference = list(map(lambda x: (x[2],x[3]),st._get_common_channels_info().keys() ))
if not location_list:
stats = st[0].stats
new_st = st
else:
locations = list(map(lambda x: x[2],st._get_common_channels_info().keys() ))
index = 0
loc_pref = None
# print(len(location_list))
while index < len(location_list):
loc_pref = location_list[index]
if loc_pref in locations:
index = len(location_list)
else:
loc_pref = None
index += 1
if loc_pref == None:
return st
stats = st[0].stats
new_st = st.select(network=stats.network, station = stats.station,
location=loc_pref)
if not channel_list:
pass
else:
## If the same location has two differents sensor
## example : 00.HH? or 00.BH?, then choose
common_channels = new_st._get_common_channels_info().keys()
common_channels = list(common_channels)
common_channels = list(map(lambda x: x[3][:2],common_channels))
index = 0
cha_pref = None
while index < len(channel_list):
cha_pref = channel_list[index]
if cha_pref in common_channels:
index = len(channel_list)
else:
cha_pref = None
index += 1
if cha_pref == None:
return st
else:
cha_pref = f'{cha_pref}?'
new_st = new_st.select(network=stats.network, station = stats.station,
location=loc_pref, channel=cha_pref)
common_channels = list(map(lambda x: (x[2],x[3]),new_st._get_common_channels_info().keys()))
printlog("debug",f"Downloader: {stats.network}-{stats.station}: ",
f"available:{preference},"+
f" selected {common_channels} according to preference")
# logger.info(f"{stats.network}-{stats.station}: "+
# f"available:{preference},"+
# f" selected {common_channels} according to preference")
return new_st
[docs]
def get_filenames(mseed, filter_net=[], filter_sta=[], filter_cha=[]):
"""
Retrieve MiniSEED filenames with optional filtering.
Parameters
----------
mseed : str
Directory containing MiniSEED files.
filter_net : list of str, optional
Network codes to exclude.
filter_sta : list of str, optional
Station codes to exclude.
filter_cha : list of str, optional
Channel codes to include.
Returns
-------
list of str
Filtered MiniSEED filenames.
Raises
------
TypeError
If any filter parameter is not a list.
Examples
--------
>>> files = get_filenames("/data/mseed")
>>> isinstance(files, list)
True
"""
for filter_param in (filter_net, filter_sta, filter_cha):
if not isinstance(filter_param, list):
raise Exception(f"{filter_param} must be a list")
filenames = []
file_list = [f for f in os.listdir(mseed) if f.lower().endswith('.mseed')]
for filename in file_list:
net, sta, loc, cha = filename.split('__')[0].split('.')
if net in filter_net or sta in filter_sta:
continue
if filter_cha and cha not in filter_cha:
continue
filenames.append(filename)
return filenames
[docs]
def select_inventory(inv, network, station, location, channel, starttime, endtime):
"""
Filter an inventory using waveform selection criteria.
Parameters
----------
inventory : Inventory
Input ObsPy inventory.
network : str
Comma-separated network codes.
station : str
Comma-separated station codes.
location : str
Comma-separated location codes.
channel : str
Comma-separated channel codes.
starttime : UTCDateTime
Selection start time.
endtime : UTCDateTime
Selection end time.
Returns
-------
Inventory
Filtered inventory object.
"""
networks, stations, locations, channels = (
network.split(','), station.split(','),
location.split(','), channel.split(',')
)
inventory = inv.select(
network=networks[0], station=stations[0],
location=locations[0], channel=channels[0],
starttime=starttime, endtime=endtime
)
for net in networks:
for sta in stations:
for loc in locations:
for cha in channels:
one_inv = inv.select(
network=net, station=sta, location=loc, channel=cha,
starttime=starttime, endtime=endtime
)
inventory = inventory.__add__(one_inv)
return inventory
[docs]
def get_merged_inv_and_json(providers):
"""
Merge inventories and station metadata from providers.
Parameters
----------
providers : list
List of provider objects.
Returns
-------
tuple
Tuple containing:
* Merged inventory
* Station metadata dictionary
* Updated providers
* Stations outside requested domains
"""
json_info = {}
stations_outside_domains = []
inventory = Inventory()
updated_providers = []
for provider in providers:
client = provider.client
restrictions = provider.waveform_restrictions
try:
inv = (read_inventory(provider.xml) if provider.xml else
client.get_stations(
network=restrictions.network, station=restrictions.station,
location=restrictions.location, channel=restrictions.channel,
starttime=restrictions.starttime, endtime=restrictions.endtime,
level='channel'
))
if provider.xml:
inv = select_inventory(
inv, restrictions.network, restrictions.station,
restrictions.location, restrictions.channel,
restrictions.starttime, restrictions.endtime
)
except Exception:
printlog("error", "Inventory", f"No get_stations with {restrictions.__dict__}")
inv = Inventory()
inv, jinf, sod = get_inv_and_json(
inv,
restrictions.filter_networks,
restrictions.filter_stations,
restrictions.filter_domain
)
bulk_info = list(set([(net.code, sta.code) for net in inv.networks for sta in net.stations]))
provider.waveform_restrictions.bulk_info = bulk_info
inventory = inventory.__add__(inv)
json_info.update(jinf)
stations_outside_domains.append(sod)
updated_providers.append(provider)
stations_outside_domains = [x for xs in stations_outside_domains for x in xs]
return inventory, json_info, updated_providers, stations_outside_domains
[docs]
def inside_the_polygon(p, pol_points):
"""
Determine whether a point lies inside a polygon.
Parameters
----------
point : tuple
Point coordinates ``(longitude, latitude)``.
polygon_points : list of tuple
Polygon vertices.
Returns
-------
bool
``True`` if the point is inside the polygon.
"""
V = tuple(pol_points) + (pol_points[0],)
cn = 0
for i in range(len(V) - 1):
if (V[i][1] <= p[1] < V[i+1][1] or V[i][1] > p[1] >= V[i+1][1]):
vt = (p[1] - V[i][1]) / float(V[i+1][1] - V[i][1])
if p[0] < V[i][0] + vt * (V[i+1][0] - V[i][0]):
cn += 1
return cn % 2 == 1
[docs]
def get_inv_and_json(inventory, filter_networks=[], filter_stations=[], filter_domain=[-180, 180, -90, 90]):
"""
Filter an inventory and generate station metadata.
Parameters
----------
inventory : Inventory
ObsPy inventory object.
filter_networks : list of str, optional
Networks to exclude.
filter_stations : list of str, optional
Stations to exclude.
filter_domain : list of float, optional
Geographic domain defined as:
``[minlon, maxlon, minlat, maxlat]``
Returns
-------
tuple
Tuple containing:
* Filtered inventory
* Station metadata dictionary
* Stations outside domain
"""
if not filter_domain:
filter_domain = [-180, 180, -90, 90]
polygon = [
(filter_domain[0], filter_domain[2]), (filter_domain[0], filter_domain[3]),
(filter_domain[1], filter_domain[3]), (filter_domain[1], filter_domain[2]),
(filter_domain[0], filter_domain[2])
]
station_list = {}
logger = logging.getLogger('json')
toprint = []
stations_outside_domain = []
for ev in inventory:
net = ev.code
if net not in filter_networks:
for st in ev:
sta = st.code
msg = f'{net}-{sta}'
logger.debug(msg)
if sta not in filter_stations:
lon, lat, elv = st.longitude, st.latitude, st.elevation
if not inside_the_polygon((lon, lat), polygon):
inventory = inventory.remove(network=net, station=sta)
stations_outside_domain.append(sta)
continue
channels = list(set(ch.code for ch in st.channels))
if channels and sta not in station_list:
sample_rates = [st.select(channel=ch).channels[0].sample_rate for ch in channels]
station_list[sta] = {
"network": net,
"channels": channels,
"coords": [lat, lon, elv],
"sampling_rate": sample_rates
}
toprint.append(msg)
else:
inventory = inventory.remove(network=net, station=sta)
else:
inventory = inventory.remove(network=net)
logger.info(str(toprint) + ' ok')
return inventory, station_list, stations_outside_domain
# if __name__ == "__main__":
# from obspy.clients.fdsn import Client as FDSN_Client
# from obspy.core.utcdatetime import UTCDateTime
# from .restrictions import PreprocRestrictions
# IRIS_client = FDSN_Client(base_url="IRIS", user='gaprietogo@unal.edu.co',password="DaCgmn3hNjg")
# st = IRIS_client.get_waveforms(network="YU",station="FC04,FC01",location="",channel="*",starttime=UTCDateTime("2016-04-22T00:00:00.0"),endtime=UTCDateTime("2016-04-23T00:00:00.0"))
# st = get_st_according2preference(st,["","00","20","10"],["HH","BH"])
# prove = get_all_sdswaveforms(client=IRIS_client,network="YU",station="FC04,FC01",location="",channel="HH?",starttime=UTCDateTime("2016-04-22T00:00:00.0"),endtime=UTCDateTime("2016-04-23T00:00:00.0"))
# print(prove)
# client = FDSN_Client('http://sismo.sgc.gov.co:8080')
# inv = client.get_stations(network="CM",
# station="BAR2",
# location="*",
# channel="*",
# starttime=UTCDateTime("2019-04-23T00:00:00.0"),
# endtime=UTCDateTime("2019-04-23T00:02:00.0"))
# st = client.get_waveforms(network="CM",
# station="BAR2",
# location="*",
# channel="*",
# starttime=UTCDateTime("2019-04-23T00:00:00.0"),
# endtime=UTCDateTime("2019-04-23T00:02:00.0"))
# ppc_restrictions = PreprocRestrictions(["CM.BAR2"],detrend={'type':'simple'})
# preproc_stream(st,ppc_restrictions)
######### inventory
# json_path = "/home/ecastillo/repositories/AIpicker_modules/onejson.json"
# client_baseurl = "http://sismo.sgc.gov.co:8080"
# restrictions = DownloadRestrictions(network="CM",
# station="BAR2",
# location="*",
# channel="*",
# starttime=UTCDateTime("2019-04-23T00:22:34.5"),
# endtime=UTCDateTime("2019-04-25T00:23:39.5"),
# chunklength_in_sec=5000,
# overlap_in_sec=None,
# groupby='{network}.{station}.{channel}')
# xml = "/home/ecastillo/repositories/AIpicker_modules/CM.xml"
# makeStationList(json_path,client_baseurl,restrictions,from_xml=xml)
######## get stations
# json_path = "/home/ecastillo/repositories/AIpicker_modules/onejson.json"
# client_baseurl = "IRIS"
# restrictions = DownloadRestrictions(network="CI",
# station="BAK,ARV",
# location="*",
# channel="BH*",
# starttime=UTCDateTime("2020-09-01 00:00:00.00"),
# endtime=UTCDateTime("2020-09-02 00:00:00.00"),
# chunklength_in_sec=3600,
# overlap_in_sec=None,
# groupby='{network}.{station}.{channel}')
# makeStationList(json_path=json_path,client_baseurl="IRIS",restrictions=restrictions)