3 Star 2 Fork 1

ela/eqspy

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
EQSignal.py 10.71 KB
一键复制 编辑 原始数据 按行查看 历史
ela 提交于 2015-07-09 18:57 . add undo adjust button
# -*- coding: utf-8 -*-
import os
from copy import deepcopy
import scipy as spy
import xlwt
from utils import eqs
from scipy.signal import iirfilter, lfilter, welch
from scipy.fftpack import fftfreq, fft, ifft
def readNGAFile_(fn):
'''
读取PEER强震记录数据库的NGA格式地震波
>>> a, npts, dt = readNGAFile_(fn)
Parameters
----------
fn : string
NGA地震波文件名
Returns
-------
a : array_like
地震波加速度时程序列
npts : array_like
时程数据点数
dt : array_like
时程时间间隔
'''
f = open(fn,'r')
title = f.readline()
infor = f.readline().split(',')
unit = f.readline()
npts, dt = f.readline().split()[:2]
npts, dt = int(npts),float(dt)
a = ''.join(f.readlines()).replace('\n',' ')
a = spy.fromstring(a,sep=' ')
f.close()
return a, npts, dt
def nextpow2(n):
k = 1
while k<n:
k = k*2
return k
class EQSignal(object):
"""docstring for EQSignal"""
def __init__(self,acc=spy.zeros(2048),dt=0.02,v0=0.0,d0=0.0):
super(EQSignal, self).__init__()
self.init(acc,dt,v0,d0)
def init(self,acc=spy.zeros(2048),dt=0.02,v0=0.0,d0=0.0,name='eqs'):
self.acc = acc
self.dt = dt
self.n = len(acc)
self.t = spy.linspace(0,(self.n-1)*self.dt,self.n)
self.v0 = v0
self.d0 = d0
self.name = name
self.a2vd()
self.fft()
self.sp = [Spectra()]
self.acc_bak = deepcopy(self.acc)
self.acc_raw = deepcopy(self.acc)
self.vel_raw = deepcopy(self.vel)
self.disp_raw = deepcopy(self.disp)
self.Ia = spy.zeros(self.n)
def fft(self):
Nfft = nextpow2(self.n)
self.freq = fftfreq(Nfft,self.dt)
self.accf = fft(self.acc, Nfft)/Nfft
# self.psd = welch(self.acc, 1.0/self.dt, nperseg=Nfft/2)
def psd(self):
Nfft = nextpow2(self.n)
PSD = welch(self.acc, 1.0/self.dt, nperseg=Nfft/2)
self.freq = PSD[0]
self.accf = PSD[1]
def update(self,acc):
self.acc = acc
self.n = len(self.acc)
self.t = spy.linspace(0,(self.n-1)*self.dt,self.n)
self.a2vd()
# self.sp = []
self.acc_raw = deepcopy(self.acc)
self.vel_raw = deepcopy(self.vel)
self.disp_raw = deepcopy(self.disp)
self.Ia = spy.zeros(self.n)
def recover(self):
self.update(self.acc_bak)
def reset(self):
self.acc = deepcopy(self.acc_raw)
self.vel = deepcopy(self.vel_raw)
self.disp = deepcopy(self.disp_raw)
self.Ia = spy.zeros(self.n)
def trim(self,iStart,iStop,current=False):
if current:
acc = self.acc
else:
acc = self.acc_raw
self.acc_raw = acc[iStart:iStop]
self.acc = acc[iStart:iStop]
self.n = len(self.acc)
self.t = spy.linspace(0,(self.n-1)*self.dt,self.n)
self.a2vd()
self.vel_raw = self.vel
self.disp_raw = self.disp
def a2vd(self, current=True):
if current:
acc = self.acc
else:
acc = self.acc_raw
self.vel, self.disp = eqs.a2vd(acc,self.dt,self.v0,self.d0)
def a2vdr(self):
vel, disp = eqs.a2vd(self.acc,self.dt,self.v0,self.d0)
cv0 = spy.mean(vel)
cd0 = spy.mean(disp - cv0*self.t)
v0 = self.v0 - cv0
d0 = self.d0 - cd0
self.vel, self.disp = eqs.a2vd(self.acc,self.dt,v0,d0)
def detrendAcc(self,oh=2,ol=0,current=False):
if current:
acc = self.acc
else:
acc = self.acc_raw
self.acc = eqs.detrend(acc,oh,ol,self.n)
self.a2vd()
def baseLineCorr(self,oh=2,ol=0,current=False):
if current:
acc = self.acc
else:
acc = self.acc_raw
self.acc = eqs.basecorr(acc,oh,ol,self.dt,self.v0,self.d0,self.n)
self.a2vd()
def targetDispCorr(self,ntp,current=False):
if ntp == 1:
tp = [int(self.n*0.618)]
else:
tp = [int(i) for i in spy.linspace(self.n*1.0/ntp,self.n,ntp)]
tp = spy.array(tp)
pl = 2
ph = pl + ntp - 1
if current:
acc = self.acc
else:
acc = self.acc_raw
vel, disp = eqs.a2vd(acc,self.dt,self.v0,self.d0)
cv0 = spy.mean(vel)
cd0 = spy.mean(disp - cv0*self.t)
v0 = self.v0 - cv0
d0 = self.d0 - cd0
vel, disp = eqs.a2vd(acc,self.dt,v0,d0)
self.acc = eqs.targetdispcorr(acc,disp,tp,ph,pl,self.dt,self.v0,self.d0,self.n,ntp)
self.a2vd()
def filt(self,order,f1,f2,btype='highpass',current=False):
fn = 0.5/self.dt
if btype=='highpass':
Wn = f1/fn
elif btype=='lowpass':
Wn = f2/fn
else:
Wn = [f1/fn,f2/fn]
bf,af = iirfilter(order, Wn, btype=btype)
if current:
self.acc = lfilter(bf, af, self.acc)
else:
self.acc = lfilter(bf, af, self.acc_raw)
self.a2vd()
def setupSpectra(self,NP=90,Tstart=0.04,Tstop=10.0,DMode="log",Zeta=[0.05,0.02],SMethod="mixed",pseudo=False):
self.sp = []
for zeta in Zeta:
self.sp.append(Spectra(NP,Tstart,Tstop,DMode,zeta,SMethod,pseudo))
def backupSpectra(self):
for sp in self.sp:
sp.SPA_bak = sp.SPA
def getSpectra(self):
for sp in self.sp:
SMN = sp.SMDict[sp.SMethod]+3*int(sp.pseudo)
SPA,INDSPA = eqs.spectrum(self.acc,self.dt,sp.Zeta,sp.SPT,SMN)
SPA = abs(SPA)
# print (INDSPA)
sp.getSpectra(SPA, INDSPA)
def getAllSpectra(self):
for sp in self.sp:
sp.SPA_bak = sp.SPA
SPA, INDSPA, SPV, SPD, SPE = eqs.spectrumavd(self.acc,self.dt,sp.Zeta,sp.SPT,sp.SMDict[sp.SMethod])
SPA = abs(SPA)
SPV = abs(SPV)
SPD = abs(SPD)
SPE = abs(SPE)
if sp.pseudo:
tp = 2.0*spy.pi
SPV = tp*SPD/sp.SPT
SPA = tp*SPV/sp.SPT
sp.getAllSpectra(SPA, INDSPA, SPV, SPD, SPE)
def readNGAFile(self,fn):
acc, n, dt = readNGAFile_(fn)
name = os.path.splitext(os.path.basename(fn))[0]
self.init(acc,dt,name=name)
def readTXTFile(self,fn,dt=0.02):
acc = spy.loadtxt(fn)
name = os.path.splitext(os.path.basename(fn))[0]
self.init(acc,dt,name=name)
def getAriasIntensity(self):
Ia = spy.integrate.cumtrapz(self.acc*self.acc,self.t,initial=0.0)
self.Ia = Ia/Ia[-1]
def getTrimIndice(self,thd1,thd2):
self.getAriasIntensity()
iStart = max(0,self.Ia.searchsorted(thd1)-1)
iStop = self.Ia.searchsorted(thd2)
return iStart, iStop
def norm(self):
self.acc = self.acc/max(abs(self.acc))
self.a2vd()
self.acc_raw = deepcopy(self.acc)
self.vel_raw = deepcopy(self.vel)
self.disp_raw = deepcopy(self.disp)
def peak(self):
return max(abs(self.acc))
def fitSpectra(self,i,tol=0.05,mit=5,method=0):
sp = self.sp[i]
if method == 0:
acc = eqs.fdfitspectrum(self.acc,self.dt,sp.Zeta,sp.SPT,sp.SPAT,tol,mit,self.n,sp.NP)
elif method == 1:
acc = eqs.tdfitspectrum(self.acc,self.dt,sp.Zeta,sp.SPT,sp.SPAT,tol,mit,self.n,sp.NP)
self.update(acc)
def save_txt(self):
spy.savetxt('%s.txt'%self.name)
def save_xlsx(self):
wb = xlwt.Workbook()
ws = wb.add_sheet('Time History Data (Raw)')
for i in range(self.n):
ws.write(i, 0, self.t[i])
ws.write(i, 1, self.acc_raw[i])
ws.write(i, 2, self.vel_raw[i])
ws.write(i, 3, self.disp_raw[i])
ws = wb.add_sheet('Time History Data (Corrected)')
for i in range(self.n):
ws.write(i, 0, self.t[i])
ws.write(i, 1, self.acc[i])
ws.write(i, 2, self.vel[i])
ws.write(i, 3, self.disp[i])
for sp in self.sp:
ws = wb.add_sheet('Response Spectra (zeta = %.2f)'%sp.Zeta)
for i in range(sp.NP):
ws.write(i, 0, sp.SPT[i])
ws.write(i, 1, sp.SPA[i])
ws.write(i, 2, sp.SPAT[i])
ws.write(i, 3, sp.SPV[i])
ws.write(i, 4, sp.SPD[i])
ws.write(i, 5, sp.SPE[i])
wb.save('%s.xls'%self.name)
class Spectra(object):
"""docstring for Spectra"""
def __init__(self,NP=90,Tstart=0.04,Tstop=10.0,DMode="log",Zeta=0.05,SMethod="mixed",pseudo=False):
super(Spectra, self).__init__()
self.init(NP,Tstart,Tstop,DMode,Zeta,SMethod,pseudo)
def init(self,NP=90,Tstart=0.04,Tstop=10.0,DMode="log",Zeta=0.05,SMethod="mixed",pseudo=False):
self.NP = NP
self.Tstart = Tstart
self.Tstop = Tstop
self.DMode = DMode
self.Zeta = Zeta
self.SMethod = SMethod
self.pseudo = pseudo
self.SMDict = {"freq":1, "newmark":2, "mixed":3}
self.getPeriods()
def getPeriods(self):
if self.DMode == "linear":
self.SPT = spy.linspace(self.Tstart,self.Tstop,self.NP)
elif self.DMode == "log":
if self.Tstart == 0.0: self.Tstart==0.04
self.SPT = spy.logspace(spy.log10(self.Tstart),spy.log10(self.Tstop),self.NP)
self.SPA = spy.zeros(self.NP)
self.SPV = spy.zeros(self.NP)
self.SPD = spy.zeros(self.NP)
self.INDSPA = spy.zeros(self.NP,dtype="i")
self.SPAT = spy.zeros(self.NP)
self.SPE = spy.zeros(self.NP)
self.SPA_bak = spy.zeros(self.NP)
def getSpectra(self, SPA, INDSPA):
self.SPA, self.INDSPA = SPA, INDSPA
def getAllSpectra(self, SPA, INDSPA, SPV, SPD, SPE):
self.SPA, self.INDSPA, self.SPV, self.SPD, self.SPE = SPA, INDSPA, SPV, SPD, SPE
def getCodeSpectra(self,Tg,amax=2.25):
zeta = self.Zeta
SPT = self.SPT
gamma = 0.9+(0.05-zeta)/(0.3+6.0*zeta)
eta1 = 0.02+(0.05-zeta)/(4.0+32.0*zeta)
eta2 = 1.0+(0.05-zeta)/(0.08+1.6*zeta)
SP0 = 1.0/amax
SPA = spy.ones(len(SPT))*eta2
idx1 = SPT.searchsorted(0.1)
idx2 = SPT.searchsorted(Tg)
idx3 = SPT.searchsorted(5.0*Tg)
if idx1>0:
SPA[:idx1] = SP0+10.0*(eta2-SP0)*SPT[:idx1]
if idx2<self.NP:
SPA[idx2:idx3] = eta2*(Tg/SPT[idx2:idx3])**gamma
if idx3<self.NP:
SPA[idx3:] = eta2*0.2**gamma-eta1*(SPT[idx3:]-5.0*Tg)
self.SPAT = amax*SPA
def getError(self):
er = (self.SPA-self.SPAT)/self.SPAT
aer = spy.sqrt(sum(er*er)/self.NP)
mer = max(abs(er))
return aer,mer
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/ela/eqspy.git
git@gitee.com:ela/eqspy.git
ela
eqspy
eqspy
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385