Source code for SeisMonitor.monitor.picker.utils

# /**
#  * @author [Emmanuel Castillo]
#  * @email [excastillot@unal.edu.co]
#  * @create date 2021-12-22 09:49:23
#  * @modify date 2021-12-22 09:49:23
#  * @desc [description]
#  */
import os
import csv
import sys
import json
import math
import time
import obspy
import shutil
import logging
import datetime
import numpy as np
import pandas as pd
import datetime as dt
from obspy import UTCDateTime
from datetime import timedelta
from SeisMonitor.utils import printlog, isfile
from SeisMonitor.monitor.downloader.utils import get_chunktimes
import concurrent.futures as cf
from git import Repo
from obspy.core.event.origin import Pick
from obspy.core.event.base import QuantityError, WaveformStreamID, CreationInfo, Comment
from obspy.core.event import ResourceIdentifier

# Define standard columns for SeisMonitor
SeisMonitor_columns = [
    "pick_id", "arrival_time", "probability", "phasehint",
    "network", "station", "location", "instrument_type", "author",
    "creation_time", "event_start_time", "event_end_time",
    "detection_probability", "snr", "station_lat", "station_lon",
    "station_elv", "file_name"
]


[docs] def clone_aipicker(name, output_folder): """Clone AI picker repository from GitHub. Args: name (str): Name of the AI picker ('EQTransformer' or 'PhaseNet') output_folder (str): Directory path to store the cloned repository Returns: bool: True if successful Raises: Exception: If invalid picker name is provided """ git_urls = { "PhaseNet": "https://github.com/ecastillot/PhaseNet.git", "EQTransformer": "https://github.com/ecastillot/EQTransformer.git" } if name not in git_urls: raise Exception( f"Invalid picker name: {name}. Choose 'EQTransformer' or 'PhaseNet'" ) if os.path.isdir(output_folder): shutil.rmtree(output_folder) Repo.clone_from(git_urls[name], output_folder) return True
[docs] def eqt_picks_2_seismonitor_fmt(eqt_folder, mseed_folder, out_path): """Convert EQTransformer picks to SeisMonitor format. Args: eqt_folder (str): Directory containing EQTransformer pick files mseed_folder (str): Directory containing MSEED files out_path (str): Output path for the converted CSV file Returns: pandas.DataFrame: Converted picks DataFrame """ date_cols = ["p_arrival_time", "s_arrival_time", "event_start_time", "event_end_time"] def parse_datetime(datetime_str): """Parse datetime string with multiple possible formats.""" if isinstance(datetime_str, float): return None for fmt in ["%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"]: try: return dt.datetime.strptime(datetime_str, fmt) except ValueError: pass return None dfs = [] for dp, dn, filenames in os.walk(eqt_folder): for f in filenames: if f == "X_prediction_results.csv" : search_path = os.path.join(dp, f) df = pd.read_csv(search_path,dtype={'station': str,'location':str}) if not df.empty: # to_date = lambda x: pd.to_datetime(x,format="mixed") # df[date_cols] = df[date_cols].apply(to_date) to_date = lambda x: parse_datetime(x) # df[date_cols] = df[date_cols].apply(to_date) for date_col in date_cols: df[date_col] = df[date_col].apply(parse_datetime) dfs.append(df) if not dfs: return pd.DataFrame() df = pd.concat(dfs,ignore_index=True) df["station"] = df["station"].apply(lambda x: x.strip()) df = df.sort_values(by="p_arrival_time",ignore_index=True) x_results_path = os.path.join(os.path.dirname(out_path), "X_prediction_results_merge.csv") df.to_csv(x_results_path,index=False, date_format="%Y-%m-%d %H:%M:%S.%f") #seismonitor date_cols = ["arrival_time","creation_time", "event_start_time","event_end_time"] get_loc = lambda x: x.file_name.split(".")[2] df["location"] = df.apply(get_loc,axis=1) df["author"] = "EQTransformer" filename_func = lambda x: os.path.join(mseed_folder,x) df["file_name"] = df["file_name"].apply(filename_func) p_cols = df.columns.to_list() s_cols = df.columns.to_list() dfs = [] for phase in ["p","s"]: if phase.lower() == "p": rm = "s" else: rm = "p" phase_cols = df.columns.to_list() not_rm_cols = [f'{phase}_arrival_time', f'{phase}_probability', f'{phase}_uncertainty', f'{phase}_snr'] rm_cols = [f'{rm}_arrival_time', f'{rm}_probability', f'{rm}_uncertainty', f'{rm}_snr'] for x in rm_cols: phase_cols.remove(x) phase_df = df[phase_cols] new_cols_gen = lambda x: x[2:] new_cols = list(map(new_cols_gen,not_rm_cols)) columns = dict(zip(not_rm_cols, new_cols)) phase_df = phase_df.rename(columns=columns) phase_df["phasehint"] = phase.upper() phase_df = phase_df.dropna(subset=["arrival_time"]) phase_df["creation_time"] = dt.datetime.now() pick_id = lambda x: id_maker(x.arrival_time, x.network, x.station, x.location, x.instrument_type, x.phasehint) phase_df["pick_id"] = phase_df.apply(pick_id,axis=1) phase_df[date_cols] = phase_df[date_cols].apply(pd.to_datetime) dfs.append(phase_df) df = pd.concat(dfs,ignore_index=True) df = df[SeisMonitor_columns] df = df.sort_values(by="arrival_time",ignore_index=True) df.to_csv(out_path,index=False, date_format="%Y-%m-%d %H:%M:%S.%f") return df
[docs] def get_filenames(mseed, filter_net=None, filter_sta=None, filter_cha=None): """Retrieve MSEED filenames with optional filtering. Args: mseed (str): Path to MSEED files directory filter_net (list, optional): Networks to exclude filter_sta (list, optional): Stations to exclude filter_cha (list, optional): Channels to include (if specified) Returns: list: Filtered filenames Raises: Exception: If filter parameters are not lists """ filter_net = filter_net or [] filter_sta = filter_sta or [] filter_cha = filter_cha or [] for param in (filter_net, filter_sta, filter_cha): if not isinstance(param, list): raise Exception(f"{param} must be a list") filenames = [] file_list = [f for f in os.listdir(mseed) if f.lower().endswith('.mseed')] for filename in file_list: net, sta, loc, cha = filename.split('__')[0].split('.') if net in filter_net or sta in filter_sta or (filter_cha and cha not in filter_cha): continue filenames.append(filename) return filenames
[docs] def mv_mseed2onefolder(mseed_folder, one_folder): """Move MSEED files from subfolders to a single folder. Args: mseed_folder (str): Source directory with MSEED files one_folder (str): Destination directory """ for dp, _, filenames in os.walk(mseed_folder): for f in filenames: if f.endswith(".mseed"): src = os.path.join(dp, f) dst = os.path.join(one_folder, f) printlog("debug", "mv_mseed2onefolder", f"{src} -> {dst}") shutil.move(src, dst)
[docs] def mv_mseed2stationfolder(one_folder, mseed_folder): """Move MSEED files to station-specific subfolders. Args: one_folder (str): Source directory with MSEED files mseed_folder (str): Destination base directory """ for dp, _, filenames in os.walk(one_folder): for f in filenames: if f.endswith(".mseed"): src = os.path.join(dp, f) net, sta, _, _ = f.split("__")[0].split(".") dst = os.path.join(mseed_folder, sta, f) if not os.path.isdir(os.path.dirname(dst)): os.makedirs(os.path.dirname(dst)) printlog("debug", "mv_mseed2stationfolder", f"{src} -> {dst}") shutil.move(src, dst)
[docs] def make_dataframe(mseed, json_path, filter_net=None, filter_sta=None, filter_cha=None): """Create DataFrame from MSEED files and JSON metadata. Args: mseed (str): Directory containing MSEED files json_path (str): Path to JSON metadata file filter_net (list, optional): Networks to exclude filter_sta (list, optional): Stations to exclude filter_cha (list, optional): Channels to include Returns: pandas.DataFrame: DataFrame with waveform metadata """ with open(json_path, "r") as jfile: datajson = json.load(jfile) filenames = get_filenames(mseed, filter_net, filter_sta, filter_cha) mseed_paths = [] for onefile in filenames: df = {} path = os.path.join(mseed, onefile) st = obspy.read(path) stats = st[0].stats for key, val in st._get_common_channels_info().items(): net, sta, loc, instrument_type = key channels = list(val["channels"].keys()) df["fname"] = path for channel in channels: direction = channel[-1] if len(channels) == 1: df["E"] = df["N"] = df["Z"] = channel else: df[direction] = channel instrument = instrument_type[:-1] instruments = [x[:-1] for x in datajson[sta]['channels']] idx = next((i for i, x in enumerate(instruments) if x == instrument), None) if idx is None: continue sampling_rate = datajson[sta]['sampling_rate'][idx] lat, lon, elv = datajson[sta]['coords'] df.update({ 'mseed_start_time': stats.starttime.strftime("%Y-%m-%d %H:%M:%S.%f"), 'mseed_end_time': stats.endtime.strftime("%Y-%m-%d %H:%M:%S.%f"), "network": net, "station": sta, "location": loc, "instrument_type": instrument, "sampling_rate": sampling_rate, "sta_lat": lat, "sta_lon": lon, "sta_elv": elv }) mseed_paths.append(df) return pd.DataFrame(mseed_paths)
[docs] def make_phasenet_datalist( datadir, json_path, datalist_path, channel_list=None, filter_network=None, filter_station=None, **kwargs ): """Create PhaseNet datalist CSV file. Args: datadir (str): Directory with MSEED files json_path (str): Path to JSON metadata datalist_path (str): Output path for datalist CSV channel_list (list, optional): Channels to include filter_network (list, optional): Networks to exclude filter_station (list, optional): Stations to exclude kwargs: Additional keyword arguments Returns: str: Directory containing the datalist """ df = make_dataframe(datadir, json_path, filter_network, filter_station, channel_list) datalist_dir = os.path.dirname(datalist_path) isfile(datalist_path) df.to_csv(datalist_path, index=False) return datalist_dir
[docs] def create_datalist(self, all_in_folder=False): """Create PhaseNet datalist(s) for processing. Args: self: Object with datalist_dir and mseed_storage attributes all_in_folder (bool): If True, create single datalist; otherwise, per station """ tic = time.time() logger = logging.getLogger('PhaseNet: datalist') logger.info("Running to create datalist") if not os.path.exists(self.datalist_dir): os.makedirs(self.datalist_dir) if all_in_folder: datalist = os.path.join(self.datalist_dir, 'fname.csv') make_phasenet_datalist( datadir=self.all_mseed, datalist_path=datalist, json_path=self.json_path, # Assuming json_path is an attribute groupby='{network}.{station}' ) else: for sta in os.listdir(self.mseed_storage): datadir = os.path.join(self.mseed_storage, sta) datalist_path = os.path.join(self.datalist_dir, sta, 'fname.csv') make_phasenet_datalist( datadir=datadir, datalist_path=datalist_path, json_path=self.json_path, # Assuming json_path is an attribute groupby='{network}.{station}.{location}' ) toc = time.time() exetime = timedelta(seconds=toc - tic) logger.info(f'Total time of execution: {exetime.total_seconds()} seconds')
[docs] def phasenet_from_console(pnet_obj, msg_author): """Run PhaseNet from console with specified parameters. Args: pnet_obj: Object with PhaseNet configuration attributes msg_author (str): Author identifier for logging """ run = os.path.join(pnet_obj.phasenet_path, "run.py") base_command = ( f"python \"{run}\" --mode=pred " f"--model_dir=\"{pnet_obj.model_dir}\" --data_dir=\"{pnet_obj.data_dir}\" " f"--data_list=\"{pnet_obj.data_list}\" --output_dir=\"{pnet_obj.output_dir}\" " f"--batch_size={pnet_obj.batch_size} --tp_prob={pnet_obj.tp_prob} " f"--ts_prob={pnet_obj.ts_prob}" ) command = ( f"{base_command} --input_mseed" if os.name == 'nt' else f"{base_command} --resampling={pnet_obj.one_single_sampling_rate} --input_mseed" ) if pnet_obj.plot_figure: command += " --plot_figure" if pnet_obj.save_result: command += " --save_result" printlog("info", msg_author, "Console-> " + command) printlog("info", msg_author, "Running...") os.system(command)
[docs] def get_pickframe(path, csv_filename): """Collect picks from CSV files in a directory structure. Args: path (str): Root directory containing station folders with picks csv_filename (str): Name of CSV file containing picks Returns: pandas.DataFrame: Combined picks DataFrame """ pick_paths = [ os.path.join(dp, f) for dp, _, filenames in os.walk(path) for f in filenames if f == csv_filename ] dfs = [pd.read_csv(pick_path, index_col=None, header=0) for pick_path in pick_paths] return dfs[0] if len(dfs) == 1 else pd.concat(dfs, axis=0, ignore_index=True)
[docs] def merge_picks(path, csv_filename, output_path, sort=None): """Merge picks from multiple CSV files into one. Args: path (str): Root directory containing station folders with picks csv_filename (str): Name of CSV file containing picks output_path (str): Path for output merged CSV sort (str, optional): Column name to sort by """ pickframe = get_pickframe(path, csv_filename) if sort: pickframe = pickframe.sort_values(sort) output_dir = os.path.dirname(output_path) if not os.path.isdir(output_dir): os.makedirs(output_dir) pickframe.to_csv(output_path, index=False)
[docs] def rm_phasenet_duplicate_picks(path, output_path): """Remove duplicate picks from PhaseNet output. Args: path (str): Path to input picks CSV output_path (str): Path for output cleaned CSV Returns: pandas.DataFrame: Cleaned picks DataFrame """ def select_pick(df): """Select picks based on segment type and probability.""" logger = logging.getLogger('PhaseNet: Select_picks') df_overlap = df[df['segment_type'] == 'overlap'] df_single = df[df['segment_type'] == 'single'] if df_overlap.empty: logger.debug('Single selected') return df_single if df_single.empty: logger.debug('Overlap selected') return df_overlap single_prob_avg = np.average(df_single['probability']) overlap_prob_avg = np.average(df_overlap['probability']) choice = df_single if single_prob_avg >= overlap_prob_avg else df_overlap logger.debug( f"{'Single' if choice is df_single else 'Overlap'} selected: " f"Single:{single_prob_avg} vs Overlap:{overlap_prob_avg}" ) return choice df = pd.read_csv(path, dtype={"location": str}) if df.empty: return df timecols = ['arrival_time', 'creation_time', 'mseed_start_time', 'mseed_end_time'] df[timecols] = df[timecols].apply(pd.to_datetime) group_cols = ['mseed_start_time', 'mseed_end_time', 'network', 'station', 'instrument_type'] mseed_groups = df.groupby(group_cols, as_index=False) picks_ok = [] for ind in mseed_groups.indices.keys(): mseed_df = mseed_groups.get_group(ind) mseed_starttime = mseed_df['mseed_start_time'].iloc[0] mseed_endtime = mseed_df['mseed_end_time'].iloc[0] mseed_sr = mseed_df['sampling_rate'].iloc[0] overlap_in_sec = 1500 / mseed_sr overlap_times = get_chunktimes(mseed_starttime, mseed_endtime, overlap_in_sec, 0) for ovl_starttime, ovl_endtime in overlap_times: ovl_df = mseed_df[ (mseed_df['arrival_time'] <= ovl_endtime) & (mseed_df['arrival_time'] >= ovl_starttime) ] if not ovl_df.empty: picks_ok.append(select_pick(ovl_df)) df_picks_ok = pd.concat(picks_ok, ignore_index=True, sort=False) df_picks_ok = df_picks_ok.sort_values('arrival_time').reset_index(drop=True) df_picks_ok.to_csv(output_path, index=False) return df_picks_ok
[docs] def picks2df(picks): """Convert list of Pick objects to DataFrame. Args: picks (list): List of Pick objects Returns: pandas.DataFrame: Converted DataFrame """ df_data = [] for pick in picks: comment = json.loads(pick.comments[0].text) seismonitor = { "pick_id": pick.resource_id.id, "arrival_time": pick.time.strftime("%Y-%m-%d %H:%M:%S.%f"), "probability": pick.time_errors.uncertainty, "phasehint": pick.phase_hint, "network": pick.waveform_id.network_code, "station": pick.waveform_id.station_code, "instrument_type": pick.waveform_id.channel_code[0:2], "author": pick.method_id.id, "creation_time": pick.creation_info.creation_time.strftime("%Y-%m-%d %H:%M:%S.%f"), # "event_start_time": comment.get("event_start_time"), # "event_end_time": comment.get("event_end_time"), # "mseed_start_time": comment.get("mseed_start_time"), # "mseed_end_time": comment.get("mseed_end_time"), # "detection_probability": comment.get("detection_probability"), # "snr": comment.get("snr"), # "sampling_rate": comment.get("sampling_rate"), # "sample": comment.get("sample"), # "segment": comment.get("segment"), # "segment_type": comment.get("segment_type"), # "station_lat": comment.get("station_lat"), # "station_lon": comment.get("station_lon"), # "station_elv": comment.get("station_elv"), # "file_name": comment.get("file_name") } for key, value in comment.items(): seismonitor[key] = value loc = pick.waveform_id.location_code seismonitor["location"] = ( "" if loc in [None, ""] or (isinstance(loc, (int, float)) and math.isnan(loc)) else f"{int(loc):02d}" if isinstance(loc, (int, float)) else str(loc) ) seismonitor.update(pick.comments[0]) df_data.append(seismonitor) df = pd.DataFrame(df_data) df["location"] = df["location"].astype(str) date_cols = ["arrival_time", "creation_time", "event_start_time", "event_end_time", "mseed_start_time", "mseed_end_time"] df[date_cols] = df[date_cols].apply(pd.to_datetime) return df
[docs] def get_picks( datapicks, datalist, min_p_prob=0.3, min_s_prob=0.3, one_single_sampling_rate=-1, mode='df_obj', export=None ): """Read PhaseNet picks and convert to desired format. Args: datapicks (str): Path to PhaseNet picks CSV datalist (str): Path to datalist CSV min_p_prob (float): Minimum P-phase probability min_s_prob (float): Minimum S-phase probability one_single_sampling_rate (float): Override sampling rate (-1 for auto) mode (str): Output mode ('pick_obj' or 'df_obj') export (str, optional): Path to export DataFrame as CSV Returns: list or pandas.DataFrame: Picks in specified format """ df = pd.read_csv(datapicks).astype({'itp': 'string', 'tp_prob': 'string', 'its': 'string', 'ts_prob': 'string'}) df = df.replace("[]", np.nan).dropna(subset=['itp', 'tp_prob', 'its', 'ts_prob'], how="all") picks = [] def _get_picks(irow): i, row = irow wf_name = row["fname"] p_picks = ( pick_constructor(datalist, row["itp"].strip('[]').strip().split(), row["tp_prob"].strip('[]').strip().split(), wf_name, 'P', min_p_prob, one_single_sampling_rate) if not pd.isna(row["itp"]) else [] ) s_picks = ( pick_constructor(datalist, row["its"].strip('[]').strip().split(), row["ts_prob"].strip('[]').strip().split(), wf_name, 'S', min_s_prob, one_single_sampling_rate) if not pd.isna(row["its"]) else [] ) picks.append(p_picks + s_picks) with cf.ThreadPoolExecutor() as executor: executor.map(_get_picks, df.iterrows()) picks = [x for sublist in picks for x in sublist] if mode == 'pick_obj': return picks if mode == 'df_obj' or export: if not picks: return pd.DataFrame() picks = picks2df(picks) logger = logging.getLogger('PhaseNet: picks2df') if not isinstance(picks, pd.DataFrame): logger.warning('There are no picks to convert.') return pd.DataFrame() # print(picks) pnet_cols = ["mseed_start_time", "mseed_end_time", "sampling_rate", "sample", "segment", "segment_type"] picks = picks[SeisMonitor_columns + pnet_cols].sort_values('arrival_time').reset_index(drop=True) logger.info('Conversion pick_sample in pick_time ok. See your results in pick_df.csv.') if export: picks.to_csv(export, index=False, date_format="%Y-%m-%d %H:%M:%S.%f") return picks
[docs] def pick_constructor(datalist, picks, prob, wf_name, ph_type, min_prob, one_single_sampling_rate=-1): """Construct Pick objects from PhaseNet data. Args: datalist (str): Path to datalist CSV picks (list): Pick sample points prob (list): Pick probabilities wf_name (str): Waveform name ph_type (str): Phase type ('P' or 'S') min_prob (float): Minimum probability threshold one_single_sampling_rate (float): Override sampling rate Returns: list: List of Pick objects Raises: Exception: If datalist file is missing """ if not os.path.isfile(datalist): logging.getLogger("Datalist").error("Error in phasenet_pick2date()") raise Exception("No datalist") datalist_df = pd.read_csv(datalist, dtype={"location": str}) datalist_df[['mseed_start_time', 'mseed_end_time']] = datalist_df[ ['mseed_start_time', 'mseed_end_time'] ].apply(pd.to_datetime) datalist_df = datalist_df.set_index("fname") picks_list = [] if picks != ['']: wfpath, segment = wf_name.split(".mseed_") wfpath += ".mseed" segment = int(segment) row = datalist_df.loc[wfpath] net, sta, loc, ch = row["network"], row["station"], row["location"], row["instrument_type"] if isinstance(loc, (int, float)) and math.isnan(loc): loc = "" sampling_rate = ( one_single_sampling_rate if one_single_sampling_rate != -1 else row["sampling_rate"] ) mseed_starttime, mseed_endtime = row['mseed_start_time'], row['mseed_end_time'] sta_lat, sta_lon, sta_elv = row["sta_lat"], row["sta_lon"], row["sta_elv"] dtt = (mseed_endtime - mseed_starttime).seconds for pick, p_prob in zip(picks, prob): p_prob = float(p_prob) if p_prob < min_prob: continue pick_time, creation_time, obs = sample2time(pick, mseed_starttime, sampling_rate, segment, dtt) pick_id = id_maker(pick_time, net, sta, loc, ch, ph_type) evaluation_mode = 'manual' if p_prob >= 0.95 else 'automatic' pick_obj = Pick( resource_id=ResourceIdentifier(id=pick_id), time=pick_time, time_errors=QuantityError(uncertainty=p_prob, confidence_level=p_prob * 100), waveform_id=WaveformStreamID( network_code=net, station_code=sta, location_code=loc, channel_code=ch, resource_uri=ResourceIdentifier(id=pick_id) ), phase_hint=ph_type, evaluation_mode=evaluation_mode, creation_info=CreationInfo(author="PhaseNet_picker", creation_time=UTCDateTime.now()), method_id="PhaseNet", comments=[Comment(text=json.dumps({ "event_start_time": None, "event_end_time": None, "detection_probability": None, "snr": None, "station_lat": sta_lat, "station_lon": sta_lon, "station_elv": sta_elv, "sampling_rate": sampling_rate, "file_name": wf_name, "mseed_start_time": mseed_starttime.strftime("%Y-%m-%d %H:%M:%S.%f"), "mseed_end_time": mseed_endtime.strftime("%Y-%m-%d %H:%M:%S.%f"), "sample": pick, "segment": segment, "segment_type": obs }))] ) picks_list.append(pick_obj) return picks_list
[docs] def id_maker(pick_time, net, station, loc, ch, phasehint): """Create SeisComP-style pick PublicID. Args: pick_time (UTCDateTime): Pick time net (str): Network code station (str): Station code loc (str): Location code ch (str): Channel code phasehint (str): Phase hint ('P' or 'S') Returns: str: SeisComP pick PublicID """ date_id = pick_time.strftime('%Y%m%d.%H%M%S.%f')[:-4] return f"{date_id}-{'P' if phasehint == 'P' else 'S'}-{net}.{station}.{loc}.{ch}"
[docs] def sample2time(sample, to, df, segment, dtt): """Convert PhaseNet sample count to time. Args: sample (str): Sample point of pick to (datetime): Waveform start time df (float): Sampling rate segment (int): Segment number dtt (float): Waveform duration in seconds Returns: tuple: (pick_time, creation_time, observation_type) """ init_time = to obs = 'single' if segment != 0: if segment >= dtt * df: segment -= dtt * df init_time -= datetime.timedelta(seconds=1500 / df) obs = 'overlap' add_seconds = (segment + float(sample)) / df pick_time = init_time + dt.timedelta(seconds=add_seconds) creation_time = UTCDateTime() return pick_time, creation_time, obs