Source code for SeisMonitor.monitor.locator.nlloc.utils

import os
import shutil
import sys
from time import time
import pandas as pd
from subprocess import run
from obspy import read_inventory
from tqdm import tqdm
import glob
from git import Repo
import SeisMonitor.utils as sut

[docs] def get_nlloc_folders(core_path): """Generate dictionary of NonLinLoc folder paths. Args: core_path (str): Base path to NonLinLoc core directory Returns: dict: Dictionary containing all relevant NonLinLoc paths """ pre_core_path = os.path.dirname(core_path) src_path = os.path.join(core_path, "src") bin_path = os.path.join(src_path, "bin") vel2grid_exe_path = os.path.join(bin_path, "Vel2Grid") grid2time_exe_path = os.path.join(bin_path, "Grid2Time") nll_exe_path = os.path.join(bin_path, "NLLoc") paths = { "pre_core_path": pre_core_path, "core_path": core_path, "src_path": src_path, "bin_path": bin_path, "vel2grid_exe_path": vel2grid_exe_path, "grid2time_exe_path": grid2time_exe_path, "nll_exe_path": nll_exe_path } return paths
[docs] def testing_nlloc_core_path(nlloc_core_path): """Validate existence of essential NonLinLoc directories. Args: nlloc_core_path (str): Path to NonLinLoc core directory Returns: dict: Validated paths dictionary Raises: Exception: If mandatory paths are missing """ paths = get_nlloc_folders(nlloc_core_path) for key, path in paths.items(): if key in ["pre_core_path", "core_path", "src_path", "bin_path"]: if not os.path.isdir(path): raise Exception( f"Mandatory path was not found -> {path}.\n" "There is no NLLoc core folder, or it could be corrupted. " "If you are using Ubuntu, feel free to use NLLoc.download()" ) return paths
[docs] def run_nlloc(nlloc_paths, p_control_file_path, s_control_file_path): """Execute NonLinLoc processing steps using system commands. Args: nlloc_paths (dict): Dictionary of NonLinLoc executable paths p_control_file_path (str): Path to P-wave control file s_control_file_path (str): Path to S-wave control file """ sut.printlog("info", "NLLoc:Vel2Grid", "Running") os.system(f"{nlloc_paths['vel2grid_exe_path']} {p_control_file_path} > /dev/null") sut.printlog("info", "NLLoc:Grid2Time:P", "Running") os.system(f"{nlloc_paths['grid2time_exe_path']} {p_control_file_path} > /dev/null") sut.printlog("info", "NLLoc:Grid2Time:S", "Running") os.system(f"{nlloc_paths['grid2time_exe_path']} {s_control_file_path} > /dev/null") sut.printlog("info", "NLLoc:NLLoc", "Running") os.system(f"{nlloc_paths['nll_exe_path']} {s_control_file_path} > /dev/null")
[docs] def apt_install(pkgs): """Install packages using apt-get with sudo privileges. Args: pkgs (list): List of package names to install """ cmd = ['pkexec', 'apt-get', 'install', '-y'] + pkgs print(f"Running command: {' '.join(cmd)}") result = run( cmd, stdout=sys.stdout, stderr=sys.stderr, encoding='utf8', env={**os.environ, 'DEBIAN_FRONTEND': 'noninteractive'} ) result.check_returncode()
[docs] def write_pref_origin_removing_phaselocinfo(catalog): """Process seismic catalog to keep only preferred origins. Args: catalog (obspy.Catalog): Input seismic catalog Returns: obspy.Catalog: Modified catalog with cleaned origins """ events = [] for ev in catalog: pref_origin = ev.preferred_origin() # Standardize pick information for pick in ev.picks: pick.waveform_id.network_code = "NN" pick.waveform_id.location_code = "NN" pick.waveform_id.channel_code = "NNN" pick.evaluation_mode = "automatic" # Clear arrival parameters new_arrivals = [] for arrival in pref_origin.arrivals: arrival.azimuth = None arrival.distance = None arrival.takeoff_angle = None arrival.time_residual = None arrival.time_weight = None new_arrivals.append(arrival) # Update origins with cleaned arrivals for i, origin in enumerate(ev.origins): if origin.resource_id.id == ev.preferred_origin_id: ev.origins[i].arrivals = new_arrivals ev.origins = [pref_origin] events.append(ev) catalog.events = events return catalog
[docs] def download_nlloc(nlloc_path, forced=False): """Download and compile NonLinLoc from Git repository. Args: nlloc_path (str): Destination path for NonLinLoc forced (bool): Force re-download even if exists (default: False) Returns: bool: True if already installed and not forced """ nlloc_paths = get_nlloc_folders(nlloc_path) zip_path = os.path.join(nlloc_paths['pre_core_path'], "nll.zip") cache_path = os.path.join(nlloc_paths['src_path'], "CMakeCache.txt") if not forced and os.path.isfile(nlloc_paths['nll_exe_path']): return True git_url = "https://github.com/alomax/NonLinLoc.git" if not os.path.isdir(nlloc_paths['core_path']): Repo.clone_from(git_url, nlloc_paths['core_path']) if os.path.isdir(nlloc_paths['bin_path']): shutil.rmtree(nlloc_paths['bin_path']) os.makedirs(nlloc_paths['bin_path']) if os.path.isdir(cache_path): shutil.rmtree(cache_path) try: apt_install(["cmake"]) os.system(f"cd {nlloc_paths['src_path']} && cmake . && make") except Exception as e: raise Exception(f"Could not compile NLLoc: {str(e)}")
[docs] def write_1d_vel_model(vel_path, out, compute_vs=True, vp_vs_ratio=1.78): """Write 1D velocity model file from CSV data. Args: vel_path (str): Path to input velocity CSV file out (str): Output file path compute_vs (bool): Calculate Vs from Vp if True (default: True) vp_vs_ratio (float): Vp/Vs ratio for Vs calculation (default: 1.78) """ df = pd.read_csv(vel_path) with open(out, 'w') as vm: vm.write("# model layers (LAYER depth, Vp_top, Vp_grad, Vs_top, Vs_grad, p_top, p_grad)\n") for i, row in df.iterrows(): vs = row.vp / vp_vs_ratio if compute_vs else row.vs enter = "" if i == len(df) - 1 else "\n" msg = f"LAYER {row.depth:<6.2f} {row.vp:<.2f} 0.00 {vs:<.2f} 0.00 {row.rho:<.2f} 0.00{enter}" vm.write(msg)
[docs] def resp2df(resp): """Convert RESP file to DataFrame with station information. Args: resp (str): Path to RESP file Returns: pd.DataFrame: DataFrame with network, station, lat, lon, elevation """ networks, stations, longitudes, latitudes, elevations = [], [], [], [], [] inv = read_inventory(resp) for net in inv: for sta in net: latitudes.append(sta.latitude) longitudes.append(sta.longitude) elevations.append(sta.elevation) stations.append(sta.code) networks.append(net.code) return pd.DataFrame({ "network": networks, "station": stations, "latitude": latitudes, "longitude": longitudes, "elevation": elevations })
[docs] def write_station_file(sta_path, out): """Write station file from RESP data. Args: sta_path (str): Path to RESP file out (str): Output file path """ df = resp2df(sta_path) with open(out, 'w') as vs: vs.write("# GTSRCE label LATLON latSrce longSrce zSrce elev\n") for i, row in df.iterrows(): elv = row.elevation / 1e3 enter = "" if i == len(df) - 1 else "\n" msg = f"GTSRCE {row.station:<5} LATLON {row.latitude:<7.3f} {row.longitude:<8.3f} 0.000 {elv:<7.3f}{enter}" vs.write(msg)
[docs] def join_args(args): """Convert list of arguments to space-separated string. Args: args (list): List of arguments Returns: str: Space-separated string of arguments """ return " ".join(map(str, args))
[docs] class GenericControlStatement: """Class for generating generic control statements for NonLinLoc.""" def __init__(self, trans, control=[1, 54321]): """ Args: trans (list): Transformation parameters [type, lat, lon, depth] control (list): Control parameters [level, random_seed] (default: [1, 54321]) """ self.control = join_args(control) self.trans = join_args(trans) sut.validate(self.__init__, locals())
[docs] def get_msg(self): """Generate formatted control statement string. Returns: str: Formatted control statement """ msg = ( "#__________________START GENERIC CONTROL STATEMENTS\n\n" "CONTROL {}\n" "TRANS {}\n" "#__________________END\n" ) sut.printlog("debug", "NLLoc:GenericControlStatement", "Message received") return msg.format(self.control, self.trans)
[docs] class Vel2Grid: """Class for generating velocity grid statements for NonLinLoc.""" def __init__(self, vel_path, grid_folder_out, grid, p_phase="P", s_phase="S"): """ Args: vel_path (str): Path to velocity model file grid_folder_out (str): Output folder for grid files grid (list): Grid parameters [nx, ny, nz, x0, y0, z0, dx, dy, dz, type] p_phase (str): P-wave phase identifier (default: "P") s_phase (str): S-wave phase identifier (default: "S") """ self.vel_path = vel_path self.grid = join_args(grid) self.grid_folder_out = grid_folder_out self.p_phase = p_phase self.s_phase = s_phase sut.validate(self.__init__, locals())
[docs] def get_msg(self): """Generate formatted velocity grid statement string. Returns: str: Formatted velocity grid statement """ msg = ( "#__________________START VEL2GRID STATEMENTS\n\n" "VGOUT {}\n" "VGTYPE {}\n" "VGTYPE {}\n" "VGGRID {}\n" "INCLUDE {}\n" "#__________________END\n" ) path = os.path.dirname(self.grid_folder_out) sut.isfile(path) sut.printlog("debug", "NLLoc:Vel2grid", "Message received") return msg.format( self.grid_folder_out, self.p_phase, self.s_phase, self.grid, self.vel_path )
[docs] class Grid2Time: """Class for generating travel time grid statements for NonLinLoc.""" def __init__(self, station_path, grid_folder_out, time_folder_out, phase="P", mode=["GRID3D", "ANGLES_YES"], plfd=[1e-3, 0]): """ Args: station_path (str): Path to station file grid_folder_out (str): Input folder with grid files time_folder_out (str): Output folder for time files phase (str): Phase type (default: "P") mode (list): Mode parameters [grid_type, angles] (default: ["GRID3D", "ANGLES_YES"]) plfd (list): Path length and first difference parameters (default: [1e-3, 0]) """ self.station_path = station_path self.grid_folder_out = grid_folder_out self.time_folder_out = time_folder_out self.phase = phase self.mode = join_args(mode) self.plfd = join_args(plfd) sut.validate(self.__init__, locals())
[docs] def get_msg(self): """Generate formatted travel time grid statement string. Returns: str: Formatted travel time grid statement """ msg = ( "#__________________START GRID2TIME STATEMENTS\n\n" "GTFILES {} {} {}\n" "GTMODE {}\n" "INCLUDE {}\n" "GT_PLFD {}\n" "#__________________END\n" ) path = os.path.dirname(self.time_folder_out) sut.isfile(path) sut.printlog("debug", "NLLoc:Grid2Time", "Message received") return msg.format( self.grid_folder_out, self.time_folder_out, self.phase, self.mode, self.station_path, self.plfd )
[docs] class Time2Loc: """Class for generating location statements for NonLinLoc.""" def __init__(self, catalog, grid, time_folder_out, loc_folder_out, meth=["GAU_ANALYTIC", 9999, 4, -1, -1, 1.78, 6], search=["OCT", 37, 58, 7, 1e-2, int(1e5), int(1e4)], sig="SeisMonitor", com="Comment", gau=[0.2, 0.0], gau2=[0.05, 0.05, 2.0], p_phaseid=["P", "P", "p", "PN", "PG", "Pn", "Pg"], s_phaseid=["S", "S", "s", "SN", "SG", "Sn", "Sg"], qual2err=[0.1, 0.5, 1.0, 2.0, 99999.9], phstat=[9999.0, -1, 9999.0, 1.0, 1.0, 9999.0, -9999.0, 9999.0], angles=["ANGLES_YES", 5], hypout=["SAVE_NLLOC_ALL", "SAVE_NLLOC_SUM", "SAVE_HYPO71_SUM"], mag=["ML_HB", 1.0, 1.110, 0.00189]): """ Args: catalog (list): [catalog_path, format] grid (list): Grid parameters for location time_folder_out (str): Input folder with time files loc_folder_out (str): Output folder for location results meth (list): Method parameters (default: GAU_ANALYTIC settings) search (list): Search parameters (default: OCT settings) sig (str): Signature (default: "SeisMonitor") com (str): Comment (default: "Comment") gau (list): Gaussian parameters (default: [0.2, 0.0]) gau2 (list): Secondary Gaussian parameters (default: [0.05, 0.05, 2.0]) p_phaseid (list): P-phase identifiers s_phaseid (list): S-phase identifiers qual2err (list): Quality to error mapping phstat (list): Phase statistics parameters angles (list): Angle parameters hypout (list): Hypocenter output options mag (list): Magnitude calculation parameters """ self.catalog = join_args(catalog) self.time_folder_out = time_folder_out self.loc_folder_out = loc_folder_out self.grid = join_args(grid) self.meth = join_args(meth) self.search = join_args(search) self.sig = sig self.com = com self.gau = join_args(gau) self.gau2 = join_args(gau2) self.p_phaseid = join_args(p_phaseid) self.s_phaseid = join_args(s_phaseid) self.qual2err = join_args(qual2err) self.phstat = join_args(phstat) self.angles = join_args(angles) self.hypout = join_args(hypout) self.mag = join_args(mag) sut.validate(self.__init__, locals())
[docs] def get_msg(self): """Generate formatted location statement string. Returns: str: Formatted location statement """ msg = ( "#__________________START NLDIFFLOC STATEMENTS\n\n" "LOCSIG {}\n" "LOCCOM {}\n" "LOCFILES {} {} {}\n" "LOCHYPOUT {}\n" "LOCSEARCH {}\n" "LOCGRID {}\n" "LOCMETH {}\n" "LOCGAU {}\n" "LOCGAU2 {}\n" "LOCPHASEID {}\n" "LOCPHASEID {}\n" "LOCQUAL2ERR {}\n" "LOCPHSTAT {}\n" "LOCANGLES {}\n" "LOCMAG {}\n" "#__________________END\n" ) path = os.path.dirname(self.loc_folder_out) sut.isfile(path) sut.printlog("debug", "NLLoc:Time2Loc", "Message received") return msg.format( self.sig, self.com, self.catalog, self.time_folder_out, self.loc_folder_out, self.hypout, self.search, self.grid, self.meth, self.gau, self.gau2, self.p_phaseid, self.s_phaseid, self.qual2err, self.phstat, self.angles, self.mag )
[docs] class NLLocControlFile: """Class for generating complete NonLinLoc control file.""" def __init__(self, generic_control, vel2grid, grid2time, time2loc): """ Args: generic_control (GenericControlStatement): Generic control instance vel2grid (Vel2Grid): Velocity grid instance grid2time (Grid2Time): Travel time grid instance time2loc (Time2Loc): Location instance """ self.generic_control = generic_control self.vel2grid = vel2grid self.grid2time = grid2time self.time2loc = time2loc sut.validate(self.__init__, locals()) def _validate_args(self, key, input_path, output_path): """Validate input and output paths for control file components. Args: key (str): Component identifier input_path (str): Input file path output_path (str): Output directory path Raises: Exception: If input path doesn't exist """ sut.printlog("debug", "NLLoc:validate_control_file_args", "validating control file args") if os.path.isfile(input_path): sut.printlog("debug", "NLLoc:validate_control_file_args", f"{key} is ok.") else: raise Exception(f"NLLoc: {key} path doesn't exist. Check: {input_path}") sut.isfile(output_path) sut.printlog("debug", "NLLoc:validate_control_file_args", "control file args are ok")
[docs] def get_msg(self, vel2grid=True, grid2time=True, time2loc=True): """Generate complete control file message. Args: vel2grid (bool): Include velocity grid section (default: True) grid2time (bool): Include travel time grid section (default: True) time2loc (bool): Include location section (default: True) Returns: str: Complete control file content """ msg = self.generic_control.get_msg() if vel2grid: msg += self.vel2grid.get_msg() self._validate_args("vel_path", self.vel2grid.vel_path, self.vel2grid.grid_folder_out) if grid2time: msg += self.grid2time.get_msg() self._validate_args("station_path", self.grid2time.station_path, self.grid2time.time_folder_out) if time2loc: msg += self.time2loc.get_msg() self._validate_args("catalog_path", self.time2loc.catalog.split(" ")[0], self.time2loc.loc_folder_out) return msg
[docs] def write(self, out, vel2grid=True, grid2time=True, time2loc=True): """Write control file to disk. Args: out (str): Output file path vel2grid (bool): Include velocity grid section (default: True) grid2time (bool): Include travel time grid section (default: True) time2loc (bool): Include location section (default: True) """ sut.printlog("info", "NLLoc:write_control_file", "Running") sut.isfile(out) msg = self.get_msg(vel2grid, grid2time, time2loc) with open(out, "w") as control_file_msg: control_file_msg.write(msg) sut.printlog("info", "NLLoc:write_control_file", f"Finished. Control file: {out}")
if __name__=="__main__": catalog = "/home/emmanuel/EDCT/SeisMonitor/SeisMonitor/monitor/locator/nlloc/test3/select.out" station_path = "/home/emmanuel/EDCT/SeisMonitor/SeisMonitor/monitor/locator/nlloc/test3/station.dat" vel_path = "/home/emmanuel/EDCT/SeisMonitor/SeisMonitor/monitor/locator/nlloc/test3/model.dat" grid_folder_out = "/home/emmanuel/EDCT/SeisMonitor/SeisMonitor/monitor/locator/nlloc/test3/model/layer" time_folder_out = "/home/emmanuel/EDCT/SeisMonitor/SeisMonitor/monitor/locator/nlloc/test3/time/layer" loc_folder_out = "/home/emmanuel/EDCT/SeisMonitor/SeisMonitor/monitor/locator/nlloc/test3/loc/SeisMonitor" control_file_out = "/home/emmanuel/EDCT/SeisMonitor/SeisMonitor/monitor/locator/nlloc/test3/test.in" gen_control = GenericControlStatement(trans=["SIMPLE", 5.0,-73.0,0.0]) vel2grid = Vel2Grid(vel_path, grid_folder_out, grid = [2,583,70, -891.0,-891.0,-5.0, 3.0, 3.0, 3.0, "SLOW_LEN"], phase = "P") grid2time = Grid2Time(station_path, grid_folder_out, time_folder_out) time2loc = Time2Loc(catalog=[catalog,"SEISAN"], grid = [374,583,70, -891.0,-891.0,-5.0, 3.0,3.0,3.0, "PROB_DENSITY","SAVE"], time_folder_out=time_folder_out, loc_folder_out=loc_folder_out) nlloc = NLLocObj(gen_control,vel2grid, grid2time,time2loc) nlloc.write_control_file(control_file_out)