Source code for SeisMonitor.monitor.magnitude.mag

"""
Magnitude calculation utilities for local magnitude (Ml) and moment
magnitude (Mw) estimation.

This module provides tools for:

* Selecting waveform streams according to channel/location preferences.
* Calculating local magnitude (Ml).
* Calculating moment magnitude (Mw).
* Managing waveform retrieval and response selection.

The implementation is based partially on:

    https://github.com/krischer/moment_magnitude_calculator

:author:
    Emmanuel Castillo

:copyright:
    Lion Krischer (2012)

:license:
    Original concepts adapted from ObsPy-based implementations.

Example
-------
>>> from obspy.core.event import read_events
>>> catalog = read_events("catalog.xml")
>>> magnitude = Magnitude(providers, catalog, "./output")
"""

######## 
# Moment magnitude adapted from 
# https://github.com/krischer/moment_magnitude_calculator/tree/master/scripts
# :copyright:
# Lion Krischer (krischer@geophysik.uni-muenchen.de), 2012
from obspy.core.event.resourceid import ResourceIdentifier
from obspy.core.event.base import TimeWindow,WaveformStreamID
from obspy.core.inventory.inventory import Inventory
from obspy.core.stream import Stream
from obspy.io.xseed.parser import Parser
from obspy.core.event import Magnitude as Mag
from obspy.core.event import Catalog
from obspy.core.event import read_events, Comment
from SeisMonitor.utils import isfile
from SeisMonitor.core import utils as scut
from obspy.core.event.base import CreationInfo
import concurrent.futures as cf
from obspy import UTCDateTime
# import mtspec
import numpy as np
import scipy
import os
from SeisMonitor.utils import printlog
from . import utils as ut

[docs] class MwPhysicalMagParams(): """ Physical parameters used during Mw calculation. Parameters ---------- vp : float, optional P-wave velocity in m/s. vsp_factor : float, optional Vp/Vs ratio. density : float, optional Density in kg/m³. waterlevel : float, optional Water level for deconvolution stabilization. p_radiation_pattern : float, optional Radiation pattern coefficient for P waves. s_radiation_pattern : float, optional Radiation pattern coefficient for S waves. """ def __init__(self, vp=4800, vsp_factor=1.73, density=2650.0, waterlevel=10, p_radiation_pattern=0.52, s_radiation_pattern=0.63): self.vp = vp self.vsp_factor = vsp_factor self.vs = vp/vsp_factor self.density = density self.waterlevel = waterlevel self.p_radiation_pattern = p_radiation_pattern self.s_radiation_pattern = s_radiation_pattern
[docs] class MwProcessingMagParams(): def __init__(self, time_before_pick = 5, time_after_pick = 30, padding = 40, only_proc_p_pick=False, only_proc_s_pick=False): """ Processing parameters used during Mw estimation. Parameters ---------- time_before_pick : float, optional Time before pick in seconds. time_after_pick : float, optional Time after pick in seconds. padding : float, optional Additional waveform padding in seconds. only_proc_p_pick : bool, optional Process only P-wave picks. only_proc_s_pick : bool, optional Process only S-wave picks. """ self.time_before_pick = time_before_pick self.time_after_pick = time_after_pick self.padding = padding self.only_proc_p_pick = only_proc_p_pick self.only_proc_s_pick = only_proc_s_pick
[docs] def get_st_according2preference(st,location_list,channel_list): """ Select a preferred stream according to location and channel priorities. The function filters streams first by location preference and then by channel preference. Parameters ---------- st : obspy.Stream Input waveform stream. location_list : list of str Preferred locations ordered by priority. channel_list : list of str Preferred channel codes ordered by priority. Returns ------- obspy.Stream or None Filtered stream according to preferences. Example ------- >>> new_st = get_st_according2preference( ... st, ... ["00", "20"], ... ["HH", "BH"] ... ) """ if (st == None) or (len(st)==0) : return None preference = list(map(lambda x: (x[2],x[3]),st._get_common_channels_info().keys() )) if not location_list: stats = st[0].stats new_st = st else: locations = list(map(lambda x: x[2],st._get_common_channels_info().keys() )) index = 0 loc_pref = None # print(len(location_list)) while index < len(location_list): loc_pref = location_list[index] if loc_pref in locations: index = len(location_list) else: loc_pref = None index += 1 if loc_pref == None: return st stats = st[0].stats new_st = st.select(network=stats.network, station = stats.station, location=loc_pref) if not channel_list: pass else: ## If the same location has two differents sensor ## example : 00.HH? or 00.BH?, then choose common_channels = new_st._get_common_channels_info().keys() common_channels = list(common_channels) common_channels = list(map(lambda x: x[3][:2],common_channels)) index = 0 cha_pref = None while index < len(channel_list): cha_pref = channel_list[index] if cha_pref in common_channels: index = len(channel_list) else: cha_pref = None index += 1 if cha_pref == None: return st else: cha_pref = f'{cha_pref}?' new_st = new_st.select(network=stats.network, station = stats.station, location=loc_pref, channel=cha_pref) common_channels = list(map(lambda x: (x[2],x[3]),new_st._get_common_channels_info().keys())) printlog("debug",f"Magnitude: {stats.network}-{stats.station}: ", f"available:{preference},"+ f" selected {common_channels} according to preference") # logger.info(f"{stats.network}-{stats.station}: "+ # f"available:{preference},"+ # f" selected {common_channels} according to preference") return new_st
[docs] class Magnitude(): """ Magnitude calculator for seismic events. Parameters ---------- providers : list List of waveform providers. catalog : str or obspy.Catalog Catalog object or path to QuakeML file. out_dir : str Output directory for generated magnitude files. """ def __init__(self,providers,catalog,out_dir) -> None: """ Initialize the Magnitude calculator. """ # self.client = client # super().__init__(sds_archive,sds_type,format) # self.response = response self.providers = providers if isinstance(catalog,Catalog): self.catalog = catalog else: self.catalog = read_events(catalog) self.agency = self.catalog.creation_info.agency_id self.out_dir = out_dir self.xml_ml_out_file = os.path.join(out_dir,"Ml_magnitude.xml") self.xml_mw_out_file = os.path.join(out_dir,"Mw_magnitude.xml")
[docs] def get_Ml(self, mag_type="RSNC", trimmedtime=50, padding=20,waterlevel=10, zone=None, out_format="QUAKEML"): """ Calculate local magnitude (Ml) for all events. Parameters ---------- mag_type : str, optional Magnitude relationship type. trimmedtime : float, optional Signal trimming window in seconds. padding : float, optional Waveform retrieval padding. waterlevel : float, optional Water level for response correction. zone : str, optional Regional magnitude zone. out_format : str, optional Output catalog format. Returns ------- obspy.Catalog Updated catalog with Ml magnitudes. """ events_mag = [] for n_ev,event in enumerate(self.catalog,1): if not event.origins: print ("No origin for event %s" % event.resource_id) continue ori_pref = event.preferred_origin() if ori_pref == None: event.preferred_origin_id = event.origins[0].resource_id.id ori_pref = event.preferred_origin() origin_time = ori_pref.time latitude = ori_pref.latitude longitude = ori_pref.longitude depth = ori_pref.depth print(f"Event No. {n_ev}/{len(self.catalog)}: ({origin_time}) ",event.resource_id) # depth = ori_pref.depth *1e3 if latitude ==None or longitude==None: continue amplitudes = [] station_magnitudes = [] Mls = [] def _get_maginfo_by_station(pick): # for pick in event.picks: if pick.phase_hint.upper() != "S": # continue return None st = self._get_corresponding_st(pick.waveform_id, pick.time, padding) if st == None: # continue return None stats = st[0].stats ev_params = {"picktime":pick.time, "latitude":latitude, "longitude":longitude} resp = Inventory() for provider in self.providers: if len(resp) > 0: continue response = provider.inventory # selected_response = response.select(network = pick.waveform_id.network_code, # station = pick.waveform_id.station_code, # location = "*", # channel = pick.waveform_id.channel_code[0:2]+"*") selected_response = response.select(network = stats.network, station = stats.station, location = stats.location, channel = stats.channel[:2]+"*" ) resp = selected_response.__add__(selected_response) ampl,epi_dist,tr_id = ut.get_Ml_magparams_by_station(st,resp, ev_params, trimmedtime, waterlevel) net,sta,loc,cha = tr_id.split(".") amp = ut.write_amplitude_values(ampl, time_window=TimeWindow(begin=0, end=trimmedtime, reference=ev_params["picktime"]), pick_id=pick.resource_id, waveform_id=WaveformStreamID(net,sta,loc,cha), magnitude_hint="Ml", agency=self.agency) amplitudes.append(amp) if (ampl == None) or (ampl==0): # continue return None Ml = ut.get_Ml(ampl,epi_dist,mag_type,zone) Mls.append(Ml) stamag = ut.write_magsta_values(value=Ml,uncertainty=None, mag_type="Ml", origin_id =ori_pref.resource_id, amplitude_id=amp.resource_id, method_id=ResourceIdentifier(id="https://docs.obspy.org/tutorial/advanced_exercise/advanced_exercise_solution_3b.html"), waveform_id=WaveformStreamID(net,sta,loc,cha), agency=self.agency ) station_magnitudes.append(stamag) staname = ".".join((stats.network, stats.station, stats.location ,stats.channel[:2]+"*")) print(f"\t-> Ml | {staname}-{pick.phase_hint.upper()} | {Ml} (Amp: {ampl} m, Epi_dist: {round(epi_dist,2)} km)") with cf.ThreadPoolExecutor() as executor: executor.map(_get_maginfo_by_station,event.picks) if not Mls: Ml = 0 Ml_std = 0 print(f"No magnitude for {event.resource_id}") else: Ml = scipy.stats.trim_mean(Mls,0.25) Ml_std = np.array(Mls).std() mag = ut.write_magnitude_values(Ml,Ml_std,len(Mls),"Ml", evaluation_mode = "automatic", evaluation_status = "preliminary", method_id=ResourceIdentifier(id="https://docs.obspy.org/tutorial/advanced_exercise/advanced_exercise_solution_3b.html"), origin_id=ori_pref.resource_id, agency=self.agency, comments=None) event.amplitudes = amplitudes event.station_magnitudes = station_magnitudes event.magnitudes.append(mag) event.preferred_magnitude_id = mag.resource_id events_mag.append(event) print(f"{event.resource_id} | Ml: {round(Ml,2)} | Ml_std {round(Ml_std,2)} | stations:{len(Mls)}\n\n") # _get_maginfo_by_station(pick) catalog = Catalog(events = events_mag ) catalog = scut.add_aditional_catalog_info(catalog,agency=self.agency) if self.xml_ml_out_file != None: print (f"Writing output file in {self.xml_ml_out_file}") isfile(self.xml_ml_out_file) catalog.write(self.xml_ml_out_file, out_format) return self.catalog
[docs] def get_Mw(self,physparams,procparams, out_format="QUAKEML"): """ Calculate moment magnitude (Mw). Parameters ---------- physparams : MwPhysicalMagParams Physical Mw parameters. procparams : MwProcessingMagParams Signal processing parameters. out_format : str, optional Output format. Returns ------- obspy.Catalog Updated catalog with Mw magnitudes. """ Mws = [] Mws_std = [] for n_ev,event in enumerate(self.catalog,1): print(f"Event No. {n_ev}:",event.resource_id,f"from {len(self.catalog)} events.") if not event.origins: print ("No origin for event %s" % event.resource_id) continue origin_time = event.origins[0].time latitude = event.origins[0].latitude longitude = event.origins[0].longitude depth = event.origins[0].depth moments = [] corner_frequencies = [] for pick in event.picks: if procparams.only_proc_s_pick: if pick.phase_hint.upper() == "P": continue elif procparams.only_proc_p_pick: if pick.phase_hint.upper() == "S": continue st = self._get_corresponding_st(pick.waveform_id, pick.time, procparams.padding ) if st == None: continue resp = Inventory() for provider in self.providers: if len(resp) > 0: continue if pick.waveform_id.channel_code == None: channel = "*" else: channel = pick.waveform_id.channel_code[0:2] +"*" response = provider.inventory selected_response = response.select(network = pick.waveform_id.network_code, station = pick.waveform_id.station_code, location = "*", channel = channel ) resp = selected_response.__add__(selected_response) st = ut.Mw_st_processing(st, resp, physparams.waterlevel, pick.time-procparams.padding) if st == None: continue traveltime = pick.time - origin_time M_0,corner_freqs = ut.get_M0_magnitude_by_pick(st,pick.time,traveltime, pick.phase_hint, physparams, procparams) if M_0 != None: staname = ".".join((pick.waveform_id.network_code, pick.waveform_id.station_code, pick.waveform_id.location_code ,channel)) Mw_sta = 2.0 / 3.0 * (np.log10(M_0) - 9.1) print(f"\t-> Mw | {staname}-{pick.phase_hint.upper()} | {Mw_sta}") moments.append(M_0) corner_frequencies.extend(corner_freqs) if not len(moments): print (f"No moments could be calculated for event: {event.resource_id}") continue # Calculate the seismic moment via basic statistics. moments = np.array(moments) # moment = moments.mean() moment = scipy.stats.trim_mean(moments,0.25) moment_std = moments.std() corner_frequencies = np.array(corner_frequencies) Mw = 2.0 / 3.0 * (np.log10(moment) - 9.1) Mw_std = 2.0 / 3.0 * moment_std / (moment * np.log(10)) Mws_std.append(Mw_std) Mws.append(Mw) print(f"{event.resource_id} | Mw: {round(Mw,2)} | Mw_std {round(Mw_std,2)} | stations:{len(moments)}\n\n") mag = ut.write_magnitude_values(Mw,Mw_std,len(moments),"Mw", evaluation_mode = "automatic", evaluation_status = "preliminary", method_id=ResourceIdentifier("smi:com.github/krischer/moment_magnitude_calculator/automatic/1"), origin_id=event.origins[0].resource_id, agency=self.agency, comments=None) event.magnitudes.append(mag) event.preferred_magnitude_id = mag.resource_id if self.xml_mw_out_file != None: print (f"Writing output file in {self.xml_mw_out_file}") isfile(self.xml_mw_out_file) self.catalog.write(self.xml_mw_out_file, out_format) return self.catalog
def _get_corresponding_st(self,waveform_id, pick_time, padding=20.0, ): """ Retrieve waveform stream corresponding to a pick. Parameters ---------- waveform_id : obspy.WaveformStreamID Waveform identifier. pick_time : obspy.UTCDateTime Pick time. padding : float, optional Time padding around the pick. Returns ------- obspy.Stream or None Retrieved waveform stream. """ start = pick_time - padding end = pick_time + padding if (waveform_id.network_code == None) or\ (waveform_id.network_code == ""): # net = "*" net = [ x.waveform_restrictions.network for x in self.providers] net = ",".join(net) else: net = waveform_id.network_code if (waveform_id.channel_code == None): cha = "" else: cha = waveform_id.channel_code[0:2] stream = Stream() for provider in self.providers: if len(stream) > 0: continue client = provider.client try: st= client.get_waveforms(net, waveform_id.station_code, "*", cha+"*",start,end ) st = get_st_according2preference(st, provider.waveform_restrictions.location_preferences, provider.waveform_restrictions.channel_preferences) stream += st except: pass if len(stream) > 0: return st else: print(f"\t-> Not found: {net}-{waveform_id.station_code}"+\ f"-*-{cha}* |"+\ f"{start} - {end} ") return None
# if __name__ =="__main__": # import math # from obspy.clients.fdsn import Client # from obspy.core.inventory.inventory import read_inventory # client = 'http://sismo.sgc.gov.co:8080' # client = Client(client) # # catalog = "/home/emmanuel/EDCT/SeisMonitor/data/events/SGC2022knomqj.xml" #3.7 # # catalog = "/home/emmanuel/EDCT/SeisMonitor/data/events/SGC2022cvykgs.xml" # catalog = "/home/emmanuel/EDCT/SeisMonitor/data/events/SGC2022kszqlt.xml" #2.2 143km # # catalog = "/home/emmanuel/EDCT/SeisMonitor/data/events/SGC2022ktsrvi.xml" #2 33km # # catalog = "/home/emmanuel/EDCT/SeisMonitor/data/events/SGC2022krnhiu.xml" #4.3 # # catalog = "/home/emmanuel/EDCT/SeisMonitor/data/events/SGC2022kszqlt.xml" #2.9 103km # # resp = "/home/emmanuel/Ecopetrol/SeisMonitor/data/metadata/CM.dataless" # resp = "/home/emmanuel/EDCT/SeisMonitor/data/events/public_CM.xml" # resp = read_inventory(resp) # out="./test_magnitude,xml" # mag = Magnitude(client,catalog,resp) # ## prof # physparams = MwPhysicalMagParams(vp=8200, # p_radiation_pattern=0.05, # ) # procparams = MwProcessingMagParams( # time_before_pick = 2, # time_after_pick = 15, # only_proc_p_pick=True) # ## sup # # physparams = MwPhysicalMagParams(vp=4800, # # ) # # procparams = MwProcessingMagParams( # # time_before_pick = 0.2, # # time_after_pick = 0.8, # # only_proc_p_pick=True) # # mag.get_Mw(physparams,procparams,out) # # ml_params = {"a":1.019,"b":0.0016,"r_ref":140} # # k = ml_params["a"]*math.log10(ml_params["r_ref"]/100) +\ # # ml_params["b"]* (ml_params["r_ref"]-100) +3 # # mt = lambda ampl,epi_dist: math.log10(ampl * 1000) + ml_params["a"] * math.log10(epi_dist/ml_params["r_ref"]) +\ # # ml_params["b"] * (epi_dist-ml_params["r_ref"]) + k # mag.get_Ml(mag_type="RSNC")