#------------------------------------------------------------------------
#
# PestoSeis, a numerical laboratory to learn about seismology, written
# in the Python language.
# Copyright (C) 2021 Andrea Zunino
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#------------------------------------------------------------------------
"""
Functions to generate and process seismic reflection data to mimic an
exploration seismology setup.
"""
#######################################################################
#######################################################################
import numpy as __np
import matplotlib.pyplot as __plt
from scipy.interpolate import CubicSpline as __CubicSpline
from scipy.ndimage import gaussian_filter
#######################################################################
[docs]
def fwdconvolve(refle,wavelet,dt):
"""
Convolve a reflectivity series with a wavelet to compute a seismogram.
Args:
refle (ndarray): reflectivity series
wavelet (ndarray): the wavelet
dt (float): time interval
Returns:
tarr (ndarry): time array
convo (ndarry): seismic trace resulting from the convolution
"""
convo = __np.convolve(refle,wavelet,mode="full")
tarr = __np.array([i*dt for i in range(convo.size)])
return tarr,convo
#######################################################################
[docs]
def calcreflectivity(density,vel,z,dt):
"""
Compute the reflectivity series given density and velocity as a function
of depth and a time interval.
Args:
density (ndarray): a vector of density values
vel (ndarray): a vector of velocity values
z (ndarray): a vector of depths
dt (float): the time interval
Returns:
twt (ndarray): the two-way travel time
refltwt (ndarray): the reflectivity series as a function of two-way travel time
"""
assert(vel.ndim==1)
assert(density.ndim==1)
impdz = density*vel
reflz = (impdz[1:]-impdz[:-1])/(impdz[1:]+impdz[:-1])
twtfromv = _depth2time(z,vel)
twt = __np.arange(0.0,twtfromv.max(),dt)
npts = twt.size
refltwt = __np.zeros(npts)
for i in range(reflz.size):
if __np.abs(reflz[i])>0.0 :
idx = __np.argmin(__np.abs(twtfromv[i]-twt))
refltwt[idx] = reflz[i]
return twt,refltwt
#######################################################################
def _depth2time(z,vel):
"""
Convert depth to time.
Args:
z (ndarray): the depth array
vel (ndarray): the velocity array
Returns:
twt (ndarray): the two-way travel time
"""
assert(vel.ndim==1)
assert(vel.size==z.size)
npts=vel.size
twt = __np.zeros(npts)
for i in range(npts-1):
deltaz=z[i+1]-z[i]
assert(deltaz>=0.0)
twt[i+1] = twt[i] + 2.0*deltaz/vel[i]
return twt
#######################################################################
[docs]
def imgshotgath(seisdata,dt,offset,amplitudeclip=1.0):
"""
Create an image of a shotgather.
Args:
seisdata (ndarray): seismic data, i.e., shotgather, *rows* contain traces
dt (float): time interval
offset (ndarray): array of offsets
amplitudeclip (float,optional): clip the amplitude to a desired value
"""
twt = __np.array([i*dt for i in range(seisdata.shape[1])])
vmax = amplitudeclip*__np.abs(seisdata).max()
vmin = -vmax
extent = [offset.min(),offset.max(),twt.max(),twt.min()]
__plt.imshow(seisdata.T,vmin=vmin,vmax=vmax,cmap=__plt.cm.gray,
extent=extent,aspect='auto',interpolation='bilinear')
__plt.colorbar()
__plt.xlabel('Offset [m]')
__plt.ylabel('TWT [s]')
return
#######################################################################
[docs]
def wiggle(data,dt,offset=None,skiptr=1,scal=None,title=None,filltrace=True):
"""
Create a 'wiggle' plot of a shotgather.
Args:
data (ndarray): seismic data, i.e., shotgather, *rows* contain traces
dt (float): time interval
offset (ndarray,optional): array of offsets
skiptr (integer,optiona): plot only every 'skiptr' traces
title (string,optional): add a title to the plot
"""
t = __np.array([i*dt for i in range(data.shape[1])])
lwidth = 1.0 # line width
ntraces = data.shape[0]
if not isinstance(offset,__np.ndarray) :
ofs = __np.array([float(i) for i in range(ntraces)])
maxval=1.0/((abs(data).max()))
else :
ofs = offset
maxseis = abs(data).max()
maxoff = (abs(__np.diff(offset).max()))
maxval = maxoff/maxseis
if scal!=None :
maxval *= scal
if data.ndim==1:
data=data.reshape(1,-1)
for i in range(0,ntraces,skiptr):
trace=ofs[i]+data[i,:]*maxval
__plt.plot(trace,t,color='black',linewidth=lwidth)
if filltrace :
__plt.fill_betweenx(t,ofs[i],trace,where=(trace>=ofs[i]),color='black')
#__plt.fill_betweenx(t,ofs[i],trace,where=(trace<ofs[i]),color='red')
ax = __plt.gca()
__plt.xlim(ofs[0]-data[0,:].max()*maxval,ofs[-1]+data[-1,:].max())
__plt.ylim(t.min(),t.max())
ax.invert_yaxis()
if title!=None :
__plt.title(title)
__plt.xlabel('Offset [m]')
__plt.ylabel('TWT [s]')
return
#######################################################################
[docs]
def geometrical_spreading(seis,twt):
"""
Apply geometrical spreading correction to a shotgather.
Args:
seis (ndarray): seismic shotgather, *rows* contain traces
twt (ndarray): two-way traveltime
Returns
seis_gs (ndarray): seismograms corrected for geometrical spreading
"""
ns = seis.shape[1]
ntraces = seis.shape[0]
seis_gs = seis.copy()
for i in range(ntraces):
## twt**2 is a rough APPROXIMATION to geom. spreading...
seis_gs[i,:] = seis_gs[i,:] * twt**2
## scale max amplitude to 1
seis_gs[i,:] /= __np.abs(seis_gs[i,:]).max()
return seis_gs
#######################################################################
[docs]
def agc(seis, w=100, rho=0, type='uniform'):
"""
Apply Automatic Gain Control to a shotgather.
Args:
seis (ndarray): seismic shotgather, *rows* contain traces
w (float): window width
rho (ndarray,optional): density
agctype (string): 'linear' or 'gaussian', weight kernel type
Returns
seis_gs (ndarray): seismograms corrected with AGC
"""
ns = seis.shape[1]
ntraces = seis.shape[0];
if rho>0:
nw = int(__np.ceil(w/rho))
else:
nw = w
# select weight kernel type
if type.lower() == 'gaussian':
from scipy.stats import norm
iw=__np.linspace(-2,2,2*nw);
weight_kernel = norm.pdf(iw)
weight_kernel = weight_kernel/__np.sum(weight_kernel)
else:
weight_kernel = __np.ones(nw)
weight_kernel = weight_kernel/__np.sum(weight_kernel)
seis_agc=__np.zeros_like(seis)
for i in range(ntraces):
seis_abs = __np.abs(seis[i,:])
seis_weight = __np.convolve(__np.abs(seis[i,:]),weight_kernel, mode='same')
if (seis_weight==0.0).any() :
# print("agc(): Dividing by zero in AGC correction... ")
seis_weight = __np.where(seis_weight==0.0,1e-6,seis_weight)
##-----
seis_agc[i,:] = seis[i,:]/seis_weight
return seis_agc
#######################################################################
[docs]
def nmocorrection(velnmo,dt,offset,seisdat):
"""
Common Mid Point (CMP) normal moveout correction.
Args:
velnmo (ndarray): 1D array of "NMO" velocity, representing the
average velocity of the layers above
dt (float): the time interval
offset (ndarray): offsets for each trace
seisdat (ndarray): input seismic data, *rows* contain traces
Returns
seisnmo (ndarray): seismograms NMO corrected
"""
ntraces,nsmpl = seisdat.shape
assert ntraces==offset.size
seisnmo = __np.zeros_like(seisdat)
timearr = __np.array([(i-1)*dt for i in range(nsmpl)])
## build the nmo array trace by trace
for itr in range(ntraces):
# compute the time for nmo for all time points of a single trace
# velocity array is nx x nz so slice in depth is vel[?,:]
tnmo = __np.sqrt(timearr**2 + offset[itr]**2/velnmo**2)
# new trace
seisnmo[itr,:] = _resampletrace(timearr,tnmo,seisdat[itr,:])
return seisnmo
#######################################################################
def _resampletrace(torig,tnmo,seistr):
"""
Resample a trace
Args:
torig
tnmo
seistr
Returns
seisnew (ndarray): resampled trace
"""
seisnew = __np.zeros(torig.size)
itp = __CubicSpline(torig,seistr)
# time outside bounds?
outbou = (tnmo>torig.max()).nonzero()[0]
if outbou.size>0:
idx = __np.min(outbou)
else :
idx = seisnew.size
seisnew[:idx] = itp(tnmo[:idx])
return seisnew
#######################################################################
def get_pm_mask(seisdat):
"""
Gets a mask of the sign (+/-) for a given set of seismic traces. Useful
for recoving the correct sign of the seismic data after FFTing the data.
Args:
seisdat (ndarray): input seismic data in time-space domain, *rows*
contain traces
Returns
pm_mask (ndarray): mask containing the sign (+1 or -1) of the trace
samples
"""
pm_mask = __np.zeros_like(seisdat)
pm_mask[__np.where(seisdat > 0)] = 1
pm_mask[__np.where(seisdat < 0)] = -1
return pm_mask
#######################################################################
def transform_tx_to_fk(seisdat):
"""
Transform seismograms from time-space domain to frequency-wavenumber
domain.
Args:
seisdat (ndarray): input seismic data, *rows* contain traces
Returns
fk_seis (ndarray): transformed seismic data
"""
# Transform to frequency-space domain
fx_seis = __np.fft.fftshift(__np.fft.fft(seisdat, axis=1), axes=1)
# Transform to frequency-wavenumber domain
fk_seis = __np.fft.fftshift(__np.fft.ifft(fx_seis, axis=0), axes=0)
return fk_seis
#######################################################################
def transform_fk_to_tx(seisdat):
"""
Transform seismograms from frequency-wavenumber domain to time-space
domain.
Args:
seisdat (ndarray): seismic traces in frequency-wavenumber domain,
*rows* contain traces
Returns
tx_seis (ndarray): transformed seismic data
"""
# Transform to frequency-space domain
fx_seis = __np.fft.fft(__np.fft.ifftshift(seisdat, axes=0), axis=0)
# Transform to time-space domain
tx_seis = __np.fft.ifft(__np.fft.ifftshift(fx_seis, axes=1), axis=1)
return __np.abs(tx_seis)
#######################################################################
import numpy as np
from matplotlib.path import Path as PolyPath
class _Canvas:
def __init__(self, ax):
"""
Constructs a canvas object for drawing on a plot
Args:
ax (matplotlib.axes._subplots.AxesSubplot): plot axis
"""
self.ax = ax
# Create handle for a path of connected points
self.path, = ax.plot([], [], 'o-', lw=2.5, color='red')
self.vert = np.empty(shape=[0,2])
self.ax.set_title('Interactive Plot\nLEFT: new point, MIDDLE: close polygon, RIGHT: delete last point')
self.x = []
self.y = []
self.mouse_button = {1: self._add_point, 2: self._close_polygon, 3: self._delete_point}
def set_location(self,event):
if event.inaxes:
self.x = event.xdata
self.y = event.ydata
def _add_point(self):
self.vert = np.vstack((self.vert, np.array([self.x, self.y])))
def _delete_point(self):
if (self.vert).shape[0] > 0:
self.vert = np.delete(self.vert, -1, 0)
def _close_polygon(self):
self.vert = np.vstack((self.vert, self.vert[0,:]))
def update_path(self,event):
# If the mouse pointer is not on the canvas, ignore buttons
if not event.inaxes: return
# Do whichever action correspond to the mouse button clicked
# mouse_button: dictionary {1: self._add_point, 2: self._delete_point, 3: self._close_polygon}
self.mouse_button[event.button]()
print("vert:",self.vert)
x = [self.vert[k,0] for k in range((self.vert).shape[0])]
y = [self.vert[k,1] for k in range((self.vert).shape[0])]
self.path.set_data(x,y)
__plt.draw()
#######################################################################
def _tracepoly(fig) :
ax = fig.gca()
cnv = _Canvas(ax)
# Get the information on the mouse events
__plt.connect('button_press_event', cnv.update_path)
__plt.connect('motion_notify_event', cnv.set_location)
# Display the plot
__plt.tight_layout()
__plt.show()
return cnv
#######################################################################
def plot_fk_spectrum(seisdat_fk, dt, dx, ylim=None, interactive=True):
"""
Plots the frequency-wavenumber spectrum of the seismic traces. Data
must already be in frequency-wavenumber domain.
Args:
seisdat_fk (ndarray): seismic traces in frequency-wavenumber domain,
*rows* contain traces
ylim (float, optional): upper y-limit (frequency) to plot
interactive (bool, optional): whether to plot an interactive polygon
drawing tool
Returns
canvas (_Canvas): canvas object containing the user-drawn polygon, only
returned when `interactive=True`.
"""
# Verify that the data is complex; rudamentary check to see if the data
# is still in time-space domain (but obviously not fool-proof)
if not __np.any(__np.iscomplex(seisdat_fk)):
raise Exception("Input data is not complex; verify that the input\
data is in frequency-wavenumber domain")
# Construct plot
fig = __plt.figure(figsize=[10, 8])
ax = __plt.gca()
ax.imshow(
__np.abs(seisdat_fk).T,
aspect="auto",
cmap="gray_r",
origin="lower",
)
freqs = np.fft.fftshift(np.fft.fftfreq(seisdat_fk.shape[1], d=dt))
__plt.yticks(np.arange(0, seisdat_fk.shape[1], 25), np.round(freqs[::25], 0))
ks = np.fft.fftshift(np.fft.fftfreq(seisdat_fk.shape[0], d=dx))
__plt.xticks(np.arange(0, seisdat_fk.shape[0], 25), np.round(ks[::25], 3))
if ylim is None:
ylim = 0.1 * seisdat_fk.shape[1]
__plt.ylim([seisdat_fk.shape[1]/2, seisdat_fk.shape[1]/2 + ylim])
ax.set_title("Frequency-Wavenumber Spectrum")
ax.set_xlabel("Wavenumber [1/m]")
ax.set_ylabel("Frequency [Hz]")
if interactive:
# Set canvas for drawing polygons onto
canvas = _tracepoly(fig)
return canvas
#######################################################################
def mask_from_polygon(img, poly, smooth=True, smoothing_strength=2.5, invert=False):
"""
Constructs a boolean mask from a polygon for a given image. Regions within
the polygon are by default assigned `0` and values outside of the polygon
are assigned `1`.
Args:
img (ndarray): image to construct the mask for
poly (ndarray, _Canvas): polygon to construct the boolean
mask of
smooth (bool, optional): flag for whether to apply gaussian smoothing
to the edges of the mask
smoothing_strength (int, optional): strength of gaussian smoothing
invert (bool, optional): flag for inverting the boolean selection
Returns
mask (ndarray): boolean mask with same dimensions of `img`
"""
if type(poly) is _Canvas:
poly = __np.array(poly.vert)
if poly.shape[1] != 2 or len(poly.shape) != 2:
raise ValueError(
f"Polygon must have shape `[n, 2]` -> Got polygon with shape {poly.shape}"
)
# Create the mask from the polygon
mask = np.invert(_pts_inside_poly(img, poly))
# Invert selection if specified
if invert:
mask = __np.invert(mask)
mask = np.array(mask, dtype=float)
# Smooth the edges of the mask
if smooth:
mask = gaussian_filter(mask, smoothing_strength)
# Flip the maps along the axes of symmetry
inds = [int(mask.shape[0]/2), int(mask.shape[1]/2)]
mask[-inds[0]:, :inds[1]] = __np.fliplr(mask[-inds[0]:, -inds[1]:])
mask[:inds[0], :] = __np.flipud(mask[-inds[0]:, :])
return mask.T
#######################################################################
def _pts_inside_poly(img, poly):
n = img.shape
x, y = np.meshgrid(np.arange(n[0]), np.arange(n[1]))
x, y = x.flatten(), y.flatten()
pts = np.vstack((x, y)).T
path = PolyPath(poly)
grid = path.contains_points(pts)
grid = grid.reshape((n[1], n[0]))
return grid
#######################################################################