# import sys
# AIpicker_path = "/home/ecastillo/AIpicker"
# sys.path.insert(0,AIpicker_path)
import datetime as dt
import time
import os
import ast
import json
import pandas as pd
import concurrent.futures
from obspy import UTCDateTime
from obspy.core.event.base import CreationInfo
from obspy.core.event.resourceid import ResourceIdentifier
from obspy.core.event.catalog import read_events
from obspy.core.event import Catalog
[docs]
def add_aditional_origin_info(origin,
agency = None,
region = None,
method_id = None,
earth_model_id = None,
evaluation_mode = "automatic",
evaluation_status = "preliminary"):
"""
Adds additional metadata to a seismic origin object.
Parameters:
-----------
origin : obspy.core.event.Origin
The seismic origin to update.
agency : str, optional
The agency responsible for the origin.
region : str, optional
The geographic region of the event.
method_id : str, optional
Identifier for the location method used.
earth_model_id : str, optional
Identifier for the Earth model used.
evaluation_mode : str, optional
Mode of evaluation (default is "automatic").
evaluation_status : str, optional
Status of evaluation (default is "preliminary").
Returns:
--------
origin : obspy.core.event.Origin
The updated origin object.
"""
if region != None:
origin.region = region
if earth_model_id != None:
origin.earth_model_id = ResourceIdentifier(id=earth_model_id)
if method_id != None:
origin.method_id = ResourceIdentifier(id=method_id)
origin.evaluation_mode = evaluation_mode
origin.evaluation_status = evaluation_status
origin.creation_info = CreationInfo(agency_id=agency,
agency_uri=ResourceIdentifier(id=agency),
author="SeisMonitor",
author_uri=ResourceIdentifier(id="SeisMonitor"),
creation_time=UTCDateTime.now())
return origin
[docs]
def add_aditional_event_info(event,
agency = None,
event_type = "earthquake",
event_type_certainty = "suspected"):
"""
Adds additional metadata to a seismic event object.
Parameters:
-----------
event : obspy.core.event.Event
The seismic event to update.
agency : str, optional
The agency responsible for the event.
event_type : str, optional
The type of seismic event (default is "earthquake").
event_type_certainty : str, optional
Certainty level of event classification (default is "suspected").
Returns:
--------
event : obspy.core.event.Event
The updated event object.
"""
ori_pref = event.preferred_origin()
if ori_pref == None:
event.preferred_origin_id = event.origins[0].resource_id.id
event.event_type = event_type
event.event_type_certainty = event_type_certainty
event.creation_info = CreationInfo(agency_id=agency,
agency_uri=ResourceIdentifier(id=agency),
author="SeisMonitor",
author_uri=ResourceIdentifier(id="SeisMonitor"),
creation_time=UTCDateTime.now())
return event
[docs]
def add_aditional_catalog_info(catalog,
agency = None):
"""
Adds additional metadata to a seismic catalog object.
Parameters:
-----------
catalog : obspy.core.event.Catalog
The seismic catalog to update.
agency : str, optional
The agency responsible for the catalog.
Returns:
--------
catalog : obspy.core.event.Catalog
The updated catalog object.
"""
catalog.creation_info = CreationInfo(agency_id=agency,
agency_uri=ResourceIdentifier(id=agency),
author="SeisMonitor",
author_uri=ResourceIdentifier(id="SeisMonitor"),
creation_time=UTCDateTime.now())
return catalog
[docs]
def preproc_stream(st,
order=['normalize','merge','detrend',
'taper',"filter"],
decimate=None,detrend=None,filter=None,
merge=None,normalize=None,
resample=None,taper=None,
select_networks=[],
select_stations=[],
filter_networks=[],
filter_stations=[]):
"""
Preprocesses a seismic stream according to specified operations.
Parameters
----------
st : obspy.Stream
The seismic stream object to preprocess.
order : list of str, optional
A list of preprocessing steps to apply, in order. The options are:
'normalize', 'merge', 'detrend', 'taper', 'filter'. The default
is ['normalize', 'merge', 'detrend', 'taper', 'filter'].
decimate : dict, optional
Parameters for decimating the stream, passed to `st.decimate()`.
detrend : dict, optional
Parameters for detrending the stream, passed to `st.detrend()`.
filter : dict, optional
Parameters for filtering the stream, passed to `st.filter()`.
merge : dict, optional
Parameters for merging the stream, passed to `st.merge()`.
normalize : dict, optional
Parameters for normalizing the stream, passed to `st.normalize()`.
resample : dict, optional
Parameters for resampling the stream, passed to `st.resample()`.
taper : dict, optional
Parameters for applying a taper to the stream, passed to `st.taper()`.
select_networks : list of str, optional
A list of networks to include for preprocessing. If empty, all networks
are included. Default is an empty list.
select_stations : list of str, optional
A list of stations to include for preprocessing. If empty, all stations
are included. Default is an empty list.
filter_networks : list of str, optional
A list of networks to exclude from preprocessing. Default is an empty list.
filter_stations : list of str, optional
A list of stations to exclude from preprocessing. Default is an empty list.
Returns
-------
st : obspy.Stream
The preprocessed seismic stream object.
processed : bool
True if the stream was processed, False if not.
comment : str
A string containing the result of each preprocessing step, indicating
success or failure.
"""
tr = st[0]
network = tr.stats.network
station = tr.stats.station
comment = ""
if len(order) == 0:
processed = False
return st, processed, comment
if (network in filter_networks) or\
(station in filter_stations):
processed = False
else:
if not select_networks:
select_networks.append(network)
if not select_stations:
select_stations.append(station)
if (network in select_networks) or\
(station in select_stations):
for i,process in enumerate(order):
try:
if process == 'decimate':
st.decimate(**decimate)
elif process == 'detrend':
st.detrend(**detrend)
elif process == 'filter':
st.filter(**filter)
# print("filter applied")
elif process == 'merge':
st.merge(**merge)
elif process == 'normalize':
st.normalize(**normalize)
elif process == 'resample':
st.resample(**resample)
elif process == 'taper':
st.taper(**taper)
## only for print comments
if i == len(order)-1:
comment += f"({process}:ok)"
else:
comment += f"({process}:ok)->"
except:
if i == len(order)-1:
comment += f"({process}:Failed)"
else:
comment += f"({process}:Failed)->"
processed = True
comment = f"[{comment}]"
else:
processed = False
return st, processed, comment
[docs]
def get_csv_events(seiscomp_file, version="0.9", with_magnitude=True,
export = None,
from_format="SC3ML", inside_polygon=False ):
"""
Extracts event and pick data from a SeisComp file and returns it as Pandas DataFrames.
Parameters
----------
seiscomp_file : str
Path to the SC3ML SeisComp file that contains information about a single event.
version : str, optional
The version of the XML schema used in the SeisComp file. Default is "0.9".
with_magnitude : bool, optional
If True, the magnitude and magnitude-related information will be included. Default is True.
export : str, optional
Path to the file where the CSV will be exported. If None, no file is created and only DataFrames are returned. Default is None.
from_format : str, optional
The format of the SeisComp file. Default is "SC3ML".
inside_polygon : bool, optional
If True, only events inside the specified polygon will be considered. Default is False.
Returns
-------
events_df : pandas.DataFrame
DataFrame containing event information such as event ID, time, location, and magnitude.
picks_df : pandas.DataFrame
DataFrame containing pick information such as pick ID, phase, time, and associated station.
"""
def change_xml_version(ev_file,new_version="0.9"):
"""
Changes the XML version in the SeisComp file.
Parameters
----------
ev_file : str
Path to the SeisComp event file whose version needs to be updated.
new_version : str, optional
The new XML version to be set. Default is "0.9".
"""
lines = open(ev_file).readlines()
lines[1] = f'<seiscomp xmlns="http://geofon.gfz-potsdam.de/ns/seiscomp3-schema/{new_version}" version="{new_version}">\n'
with open(ev_file, 'w') as f:
f.write(''.join(lines))
if from_format == "SC3ML":
if version not in ["0.5", "0.6", "0.7", "0.8", "0.9", "0.10"]:
change_xml_version(seiscomp_file,new_version="0.10")
datefmt = "%Y-%m-%d %H:%M:%S.%f"
seiscomp_file_path = os.path.splitext(seiscomp_file)[0]
catalog = read_events(seiscomp_file,format = from_format)
event_list = catalog.events
event_colname= ["n_event","event_id","event_time","latitude","latitude_uncertainty",
"longitude","longitude_uncertainty","depth","depth_uncertainty",
"rms","region","method","earth_model","event_type","magnitude",
"magnitude_type","n_P_phases","n_S_phases"]
pick_colname = ["n_event","event_id","pick_id","phasehint","arrival_time",
"probability","snr","detection_probability",
"network","station","location","channel","picker"
]
events_df = []
picks_df = []
for n_ev,event in enumerate(event_list):
loc_id = os.path.basename(str(event.resource_id))
ev_type = event.event_type
# if event.creation_info != None:
# agency = event.creation_info.agency_id
# else:
# agency = None
if event.event_descriptions:
region = event.event_descriptions[0].text
else:
region = None
## Preferred Origin
pref_origin = event.preferred_origin()
time = pref_origin.time
latitude = pref_origin.latitude
latitude_error = pref_origin.latitude_errors.uncertainty
longitude = pref_origin.longitude
longitude_error = pref_origin.longitude_errors.uncertainty
depth = pref_origin.depth
depth_error = pref_origin.depth_errors
rms = pref_origin.quality.standard_error
method = os.path.basename(str(pref_origin.method_id))
earth_model = os.path.basename(str(pref_origin.earth_model_id))
evaluation_mode = pref_origin.evaluation_mode
if depth != None:
depth = float(depth) #in km
if depth_error != None:
if depth_error.uncertainty != None:
depth_error = depth_error.uncertainty #in km
else:
depth_error = None
else:
depth_error = None
## Preferred Magnitude
if with_magnitude:
pref_magnitude = event.preferred_magnitude()
if pref_magnitude != None:
magnitude = pref_magnitude.mag
magnitude_type = pref_magnitude.magnitude_type
if magnitude != None:
magnitude = round(magnitude,2)
else:
magnitude = None
magnitude_type = None
else:
magnitude = None
magnitude_type = None
## Dictionary with the picks information
picks = {}
for pick in event.picks:
if pick.resource_id.id not in picks.keys():
if pick.creation_info == None:
_author = None
else:
_author = pick.creation_info.author
if pick.comments:
comment = ast.literal_eval(pick.comments[0].text)
prob = comment["probability"]
if _author == "EQTransformer":
snr = comment["snr"]
ev_prob = comment["detection_probability"]
else:
snr = None
ev_prob = None
else:
prob = None
snr = None
ev_prob = None
picks[pick.resource_id.id] = {
"id":os.path.basename(str(pick.resource_id)),
"network_code":pick.waveform_id.network_code,
"station_code":pick.waveform_id.station_code,
"location_code":pick.waveform_id.location_code,
"channel_code":pick.waveform_id.channel_code,
"phase_hint":pick.phase_hint,
"time":pick.time,
"author":_author,
"probability": prob,
"snr":snr,
"detection_probability":ev_prob,
"time_errors":pick.time_errors,
"filter_id":pick.filter_id,
"method_id":pick.method_id,
"polarity":pick.polarity,
"evaluation_mode":pick.evaluation_mode,
"evaluation_status":pick.evaluation_status }
p_count = 0
s_count = 0
for i,arrival in enumerate(pref_origin.arrivals):
pick = picks[arrival.pick_id.id]
pick_row = [n_ev,loc_id,pick["id"],pick["phase_hint"],
pick["time"].datetime,pick["probability"],
pick["snr"],pick["detection_probability"],
pick["network_code"],pick["station_code"],
pick["location_code"],pick["channel_code"],
pick["author"]
]
picks_df.append(pick_row)
if pick["phase_hint"].upper() == "P":
p_count += 1
elif pick["phase_hint"].upper() == "S":
s_count += 1
event_row = [n_ev,loc_id,time.datetime,latitude,latitude_error,\
longitude,longitude_error,depth,depth_error,rms,region,\
method, earth_model,ev_type, magnitude, magnitude_type,
p_count,s_count]
events_df.append(event_row)
events_df = pd.DataFrame(events_df,columns=event_colname)
picks_df = pd.DataFrame(picks_df,columns=pick_colname)
events_df = events_df.sort_values(by="event_time",
ascending=True,
ignore_index=True)
picks_df = picks_df.sort_values(by="arrival_time",
ascending=True,
ignore_index=True)
if export != None:
if os.path.isdir(os.path.dirname(export)) == False:
os.makedirs(os.path.dirname(export))
events_df.to_csv(export,index=False)
print(f"Events_csv_file: {export}")
return events_df,picks_df