Source code for SeisMonitor.monitor.locator.nlloc.nlloc

import os
import glob
from posixpath import basename
import shutil
import numpy as np
from typing import Union
import concurrent.futures as cf
from obspy.core.event.base import CreationInfo
from obspy.core.event.catalog import Catalog, read_events
from obspy.geodetics.base import gps2dist_azimuth
from obspy.io.nlloc.core import read_nlloc_hyp
from obspy.core.event.resourceid import ResourceIdentifier
from obspy import UTCDateTime
import subprocess
from tqdm import tqdm
from . import utils as ut
from SeisMonitor import utils as sut
from SeisMonitor.core import utils as scut
from SeisMonitor.monitor.locator import utils as slut


[docs] class NLLoc: """ NonLinLoc (NLLoc) seismic event locator. This class provides an interface to the NonLinLoc algorithm for earthquake location using a 1D/3D velocity model and station geometry. It wraps the full workflow including travel-time computation, grid building, and event location. Args: core_path: str Path to the NonLinLoc installation directory. agency: str Agency name used in output event metadata. region: list of float Spatial search region defined as: ``[lon_min, lon_max, lat_min, lat_max, z_min, z_max]`` Notes: - Longitude and latitude are in degrees - Depth (z) is in kilometers - Positive z corresponds to depth below sea level vel_model: VelModel Velocity model used for travel-time computation. stations: Stations Station metadata container. delta_in_km: float, optional Grid spacing in kilometers (default: 2). kwargs_for_trans: dict, optional Parameters for coordinate transformation. kwargs_for_vel2grid: dict, optional Parameters for Vel2Grid execution. kwargs_for_grid2time: dict, optional Parameters for Grid2Time execution. kwargs_for_time2loc: dict, optional Parameters for Time2Loc execution. tmp_folder: str, optional Directory used for temporary files and travel-time grids. Defaults to current working directory. exhaustively: bool, optional If True, performs multi-resolution grid search. search_in_degrees: list, optional Grid refinement levels in degrees (used when ``exhaustively=True``). rm_attempts: bool, optional If True, removes intermediate attempt files after processing. Attributes: core_path: str Path to the NonLinLoc core installation directory. nlloc_paths: dict Dictionary containing resolved executable paths for NLLoc components. basic_inputs: LocatorBasicInputs Container holding velocity model and station information used for location. tmp_folder: str Working directory used for temporary files and intermediate outputs. Warnings -------- - Ensure that the ``region`` fully covers all stations, expected earthquake locations, and the velocity model extent. Pay special attention to the elevation range: 0 corresponds to sea level, negative values indicate below sea level, and positive values indicate above sea level. - The ``compute_travel_times`` method can be computationally expensive depending on grid resolution, spatial extent of the model, and number of stations. Large grids may also require significant memory and disk usage. Ensure adequate computational resources are available before execution. """ def __init__( self, core_path: str, agency: str, region: list, vel_model: slut.VelModel, stations: slut.Stations, delta_in_km: float = 2, kwargs_for_trans: dict = {}, kwargs_for_vel2grid: dict = {}, kwargs_for_grid2time: dict = {}, kwargs_for_time2loc: dict = {}, tmp_folder: str = os.getcwd(), exhaustively: bool = False, search_in_degrees: list = [], rm_attempts: bool = False, ): """Initialize NLLoc locator instance.""" paths = ut.testing_nlloc_core_path(core_path) self.core_path = core_path self.nlloc_paths = paths self.agency = agency self.region = region self.vel_model = vel_model self.stations = stations self.basic_inputs = slut.LocatorBasicInputs( vel_model=vel_model, stations=stations ) self.delta_in_km = delta_in_km self.kwargs_for_trans = kwargs_for_trans self.kwargs_for_vel2grid = kwargs_for_vel2grid self.kwargs_for_grid2time = kwargs_for_grid2time self.kwargs_for_time2loc = kwargs_for_time2loc self.tmp_folder = tmp_folder self.rm_attempts = rm_attempts self.exhaustively = exhaustively if exhaustively: if not search_in_degrees: self.search_in_degrees = list(reversed(np.arange(1, 4, 0.5))) else: self.search_in_degrees = search_in_degrees def __initialize(self, write_nlloc_files: bool = True): """Prepare input files and parameters for location process. Args: write_nlloc_files: Whether to write NLLoc control files """ # Set up file paths self.vel_model_path = os.path.join(self.tmp_folder, "vel_model.dat") self.station_path = os.path.join(self.tmp_folder, "station.dat") # Create/overwrite input files sut.isfile(self.vel_model_path, overwrite=True) self.basic_inputs.vel_model.to_nlloc(self.vel_model_path) sut.isfile(self.station_path, overwrite=True) self.basic_inputs.stations.to_nlloc(self.station_path) # Define output directories self.grid_folder_out = os.path.join(self.tmp_folder, "model", "layer") self.time_folder_out = os.path.join(self.tmp_folder, "time", "layer") self.loc_folder_out = os.path.join(self.tmp_folder, "loc", "SeisMonitor") self.p_control_file_out = os.path.join(self.tmp_folder, "p_nlloc.in") self.s_control_file_out = os.path.join(self.tmp_folder, "s_nlloc.in") # Prepare grid parameters grid_args = self._prepare_grid_args() # Update grid arguments with kwargs if provided grid_args.update({ "trans": self.kwargs_for_trans.get("trans", grid_args["trans"]), "velgrid": self.kwargs_for_vel2grid.get("grid", grid_args["velgrid"]), "locgrid": self.kwargs_for_grid2time.get("grid", grid_args["locgrid"]) }) # Create control objects gen_control = ut.GenericControlStatement(trans=grid_args["trans"]) gen_vel2grid = ut.Vel2Grid( vel_path=self.vel_model_path, grid_folder_out=self.grid_folder_out, grid=grid_args["velgrid"] ) p_grid2time = ut.Grid2Time( station_path=self.station_path, grid_folder_out=self.grid_folder_out, time_folder_out=self.time_folder_out, phase="P" ) s_grid2time = ut.Grid2Time( station_path=self.station_path, grid_folder_out=self.grid_folder_out, time_folder_out=self.time_folder_out, phase="S" ) # Prepare catalog output self.catalog_path = os.path.join(self.tmp_folder, "catalog.out") sut.isfile(self.catalog_path, overwrite=True) open(self.catalog_path, "w").close() gen_time2loc = ut.Time2Loc( catalog=[self.catalog_path, "SEISAN"], grid=grid_args["locgrid"], time_folder_out=self.time_folder_out, loc_folder_out=self.loc_folder_out ) def write_each_nlloc_file(nlloc_and_out): """Helper function to write NLLoc control files.""" nlloc, out = nlloc_and_out nlloc.write( out, vel2grid=True, grid2time=True, time2loc=True ) return True # Create control files for P and S phases p_nlloc = ut.NLLocControlFile(gen_control, gen_vel2grid, p_grid2time, gen_time2loc) s_nlloc = ut.NLLocControlFile(gen_control, gen_vel2grid, s_grid2time, gen_time2loc) nlloc_control_files = [ (p_nlloc, self.p_control_file_out), (s_nlloc, self.s_control_file_out) ] if write_nlloc_files: # Write control files using single-threaded executor with cf.ThreadPoolExecutor(max_workers=1) as executor: executor.map(write_each_nlloc_file, nlloc_control_files) self.nll_control_file = p_nlloc def _prepare_grid_args(self) -> dict: """Calculate grid parameters based on region and delta. Returns: Dictionary containing transformation, velocity grid, and location grid parameters """ lon_w, lon_e, lat_s, lat_n, z_min, z_max = self.region # Calculate center point c_lat = (lat_s + lat_n) / 2 c_lon = (lon_w + lon_e) / 2 # Calculate grid dimensions in km x, _, _ = gps2dist_azimuth(lat_s, lon_w, lat_s, c_lon) x = x / 1e3 # Convert to km x_num = int(x * 2 / self.delta_in_km) y, _, _ = gps2dist_azimuth(lat_s, lon_w, c_lat, lon_w) y = y / 1e3 # Convert to km y_num = int(y * 2 / self.delta_in_km) z_num = int((z_max - z_min) / self.delta_in_km) return { "trans": ["SIMPLE", c_lat, c_lon, 0], "velgrid": [ x_num, y_num, z_num, -round(x, 2), -round(y, 2), round(z_min, 2), self.delta_in_km, self.delta_in_km, self.delta_in_km, "SLOW_LEN" ], "locgrid": [ x_num, y_num, z_num, -round(x, 2), -round(y, 2), round(z_min, 2), self.delta_in_km, self.delta_in_km, self.delta_in_km, "PROB_DENSITY", "SAVE" ] }
[docs] def compute_travel_times(self): """ Compute travel-time tables using the velocity model and station geometry. This method runs the NLLoc workflow steps: - ``Vel2Grid``: builds the velocity grid from the velocity model - ``Grid2Time``: computes travel-time tables for all stations The resulting travel-time grids are stored in ``self.tmp_folder``. Notes ----- This operation can be computationally expensive depending on: - Grid resolution - Spatial extent of the model - Number of stations Large grids may also require significant memory and disk space. Ensure adequate computational resources are available before execution. Warnings -------- This method may take a long time to complete for large domains or dense station networks. """ self.__initialize() sut.printlog("info", "NLLoc:Vel2Grid", "Running") subprocess.call( f"{self.nlloc_paths['vel2grid_exe_path']} {self.p_control_file_out}", shell=True ) sut.printlog("info", "NLLoc:Grid2Time:P", "Running") subprocess.call( f"{self.nlloc_paths['grid2time_exe_path']} {self.p_control_file_out}", shell=True ) sut.printlog("info", "NLLoc:Grid2Time:S", "Running") subprocess.call( f"{self.nlloc_paths['grid2time_exe_path']} {self.s_control_file_out}", shell=True )
def _locate( self, catalog: Union[Catalog, str], nlloc_out_folder: str, out_filename: str = "locations.xml", out_format: str = "SC3ML" ) -> Catalog: """Perform single-pass event location. Args: catalog: Input catalog or path to catalog file nlloc_out_folder: Output directory out_filename: Output filename out_format: Output format Returns: Located event catalog """ catalog = catalog if isinstance(catalog, Catalog) else read_events(catalog) # Set up file paths nlloc_inp = os.path.join(nlloc_out_folder, "catalog_input.inp") nlloc_folder = os.path.join(nlloc_out_folder, "nlloc", "SeisMonitor") nlloc_out = os.path.join(nlloc_out_folder, out_filename) nlloc_control = os.path.join(nlloc_out_folder, "loc.in") # Write input catalog sut.isfile(nlloc_inp, overwrite=True) catalog.write(nlloc_inp, format="NORDIC") picks_from_unproc_catalog = slut.get_picks(catalog) # Initialize if needed and update control file try: nlloc_control_file = self.nll_control_file except AttributeError: self.__initialize(False) nlloc_control_file = self.nll_control_file nlloc_control_file.time2loc.catalog = " ".join((nlloc_inp, "SEISAN")) nlloc_control_file.time2loc.loc_folder_out = nlloc_folder nlloc_control_file.write(nlloc_control) # Run location sut.printlog("info", "NLLoc:NLLoc", "Running") subprocess.call( f"{self.nlloc_paths['nll_exe_path']} {nlloc_control}", shell=True ) # Process results all_events = [] for path in tqdm(glob.glob(nlloc_folder + "*.hyp")): file_base = os.path.basename(path) date = file_base.split(".")[1] if date in ["sum", "last.hyp"]: continue try: catalog = read_nlloc_hyp(path, format="NORDIC") except Exception: print(f"Unread: {path}") continue for ev in catalog.events: ori_pref = ev.preferred_origin() ori_pref = scut.add_aditional_origin_info( ori_pref, agency=self.agency, method_id="NLLOC", earth_model_id=self.vel_model.model_name ) ev.preferred_origin_id = ori_pref.resource_id.id ev = slut.changing_picks_info(ev, picks_from_unproc_catalog) ev = scut.add_aditional_event_info(ev, agency=self.agency) all_events.append(ev) catalog = Catalog(events=all_events) catalog = scut.add_aditional_catalog_info(catalog, agency=self.agency) sut.isfile(nlloc_out) catalog.write(nlloc_out, format=out_format) sut.printlog("info", "NLLoc:NLLoc", f"Finished. See your results in {nlloc_out}") return catalog def _iterlocate( self, catalog: Union[Catalog, str], nlloc_out_folder: str, out_filename: str = "locations.xml", out_format: str = "SC3ML", degrees: list = [4, 3, 2.5, 2, 1.5, 1], rm_attempts: bool = False ) -> Catalog: """Perform iterative event location with decreasing distance thresholds. Args: catalog: Input catalog or path nlloc_out_folder: Output directory out_filename: Output filename out_format: Output format degrees: List of distance thresholds rm_attempts: Remove temporary files Returns: Located event catalog """ nlloc_out = os.path.join(nlloc_out_folder, out_filename) reloc_catalog = self._locate( catalog, nlloc_out_folder, out_filename="base.xml", out_format="SC3ML" ) good_evs = [] bad_catalog = reloc_catalog iter_count = 0 while True: good_events, bad_events = slut.get_bad_and_good_events(bad_catalog) good_evs.append(good_events) degree = degrees[iter_count] if not bad_events: print( f"convergence in {iter_count} iterations, " f"only stations with distance < {degrees[iter_count-1]} degrees" if iter_count > 0 else "convergence without iterate" ) break bad_events = slut.filter_arrivals_by_distance(bad_events, degree) if not bad_events: break bad_catalog = Catalog(bad_events) tmp_path = os.path.join(nlloc_out_folder, "tmp", f"deg_{degree}") bad_catalog = self._locate( bad_catalog, tmp_path, out_filename=f"{iter_count}.xml", out_format="SC3ML" ) iter_count += 1 good_evs = [y for x in good_evs for y in x] if good_evs: reloc_catalog.events = good_evs sut.isfile(nlloc_out) reloc_catalog.write(nlloc_out, format=out_format) sut.printlog("info", "NLLoc:NLLoc", f"Finished. See your results in {nlloc_out}") if rm_attempts: shutil.rmtree(os.path.join(nlloc_out_folder, "tmp")) return reloc_catalog
[docs] def locate( self, catalog: Union[Catalog, str], nlloc_out_folder: str, out_filename: str = "locations.xml", out_format: str = "SC3ML" ) -> Catalog: """Main location method with optional exhaustive search. Args: catalog: Input catalog or path nlloc_out_folder: Output directory out_filename: Output filename out_format: Output format Returns: Located event catalog """ if self.exhaustively: return self._iterlocate( catalog=catalog, nlloc_out_folder=nlloc_out_folder, out_filename=out_filename, out_format=out_format, degrees=self.search_in_degrees, rm_attempts=self.rm_attempts ) return self._locate( catalog=catalog, nlloc_out_folder=nlloc_out_folder, out_filename=out_filename, out_format=out_format )