1 Star 0 Fork 2

nagisa-surface/radar-hand-gesture

forked from 韩磊/radar-hand-gesture 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
6843_dsp.py 17.43 KB
一键复制 编辑 原始数据 按行查看 历史
韩磊 提交于 2021-05-01 14:45 . 6843复现复旦大学的方法
import scipy.io as sio
import numpy as np
import json
import matplotlib.pyplot as plt
import os
from scipy import interpolate
import pylab as pl
from PIL import Image
import math
from scipy.ndimage import convolve1d
range_index=32
n_frames=50
n_chrips=32
# load_fn='E:/getdata/new_dataset/mat_data/Train/6_6_ (19).mat'
# load_data=sio.loadmat(load_fn)
# raw_data=load_data['AOACube_tx1']
# data=raw_data.transpose(3,0,1,2)
def save_CA_DTI(file_dir,cout_dir):
for file_name in os.listdir(file_dir):
mat_data=sio.loadmat(file_dir+file_name)
raw_mat_data=mat_data['AOACube_tx1'] ##加载mat文件
data=raw_mat_data.transpose(3,0,1,2) ##data大小为 50*256*12*32(帧数*采样点数*通道数*chrips)
DTI=np.zeros((n_chrips,n_frames))
RT = np.zeros((range_index, 1), dtype=float) ##距离-时间图
for i in range(n_frames):
cube = data[i]
cube = cube[:, 8, 1]
fft_data = np.abs(np.fft.fft(cube, axis=0))
RT[:,:] = fft_data[0:range_index].reshape(range_index,1)
r,c=np.where(RT==np.max(RT)) ##在距离峰值上,找到固定距离索引r
cube_2=data[i]
cube_2=cube_2[r[0:1],:,:].reshape(12,32)
cube_avg=np.mean(cube_2,axis=0).reshape(1,32) ###对多通道做平均得到CA-DTI
cube_avg=np.transpose(cube_avg,(1,0))
DT_data=np.abs(np.fft.fft(cube_avg,axis=0))
DT_data=np.fft.fftshift(DT_data,axes=0)
DTI[:,i:i+1]=DT_data
# DTI = normalize(DTI)
plt.imsave(cout_dir + '\{}.png'.format(file_name),DTI)
print("=======ok=========")
def save_HATI(file_dir,cout_dir):
for file_name in os.listdir(file_dir):
mat_data=sio.loadmat(file_dir+file_name)
raw_mat_data=mat_data['AOACube_tx1']
data=raw_mat_data.transpose(3,0,1,2)
VATI=np.zeros((32,n_frames),dtype=float)
RT = np.zeros((range_index, 1), dtype=float)
for j in range(n_frames):
cube = data[j]
cube = cube[:, 8, :]
fft_data = np.fft.fft(cube, axis=0)
img_tmp = fft_data
img_tmp = img_tmp[0:range_index, :]
RDI = np.abs(np.fft.fft(img_tmp, axis=1))
RDI = np.fft.fftshift(RDI, axes=1)
RDI[:,16:17]=0 ##去直流
RT[:,:] = fft_data[0:range_index].reshape(range_index,1)
r,c=np.where(RT==np.max(RT)) ##在距离峰值上,找到固定距离索引r
r, c = np.where(RDI == np.max(RDI[r[0:1,:]]))##在距离峰值索引上,再次利用峰值搜索在该位置的多普勒序列上寻找符合条件的,进行多普勒目标点筛选
VATI_cube=data[j]
VATI_cube[:,(4,8),:]=VATI_cube[:,(4,8),:]*(-1) ##相位校准
VATI_cube=np.fft.fft(VATI_cube,axis=0)
VATI_cube=VATI_cube[r[0:1],(4,6,8,10),c[0:1]] ##根据r,c索引取出水平维度的数组
vati_fft_data = np.abs(np.fft.fft(VATI_cube, axis=0))
vati_fft_data =np.fft.fftshift(vati_fft_data,axes=0)
VATI[14:18,j:j+1]=vati_fft_data.reshape(4,1)
plt.imsave(cout_dir + '\{}.png'.format(file_name),VATI)
print("=====ok=====")
##处理流程同水平维度
def save_VATI(file_dir,cout_dir):
for file_name in os.listdir(file_dir):
mat_data=sio.loadmat(file_dir+file_name)
raw_mat_data=mat_data['AOACube_tx1']
data=raw_mat_data.transpose(3,0,1,2)
VATI=np.zeros((32,n_frames),dtype=float)
RT = np.zeros((range_index, 1), dtype=float)
for j in range(n_frames):
cube = data[j]
cube = cube[:, 8, :]
fft_data = np.fft.fft(cube, axis=0)
img_tmp = fft_data
img_tmp = img_tmp[0:range_index, :]
RDI = np.abs(np.fft.fft(img_tmp, axis=1))
RDI = np.fft.fftshift(RDI, axes=1)
RDI[:,16:17]=0
RT[:,:] = fft_data[0:range_index].reshape(range_index,1)
r,c=np.where(RT==np.max(RT))
r, c = np.where(RDI == np.max(RDI[r[0:1,:]]))
VATI_cube=data[j]
VATI_cube[:,(3,11),:]=VATI_cube[:,(3,11),:]*(-1)
VATI_cube=np.fft.fft(VATI_cube,axis=0)
VATI_cube=VATI_cube[r[0:1],(3,2,11,10),c[0:1]]
vati_fft_data = np.abs(np.fft.fft(VATI_cube, axis=0))
vati_fft_data =np.fft.fftshift(vati_fft_data,axes=0)
VATI[14:18,j:j+1]=vati_fft_data.reshape(4,1)
plt.imsave(cout_dir + '\{}.png'.format(file_name),VATI)
print("=====ok=====")
def save_VATI_2(file_dir,cout_dir):
for file_name in os.listdir(file_dir):
mat_data=sio.loadmat(file_dir+file_name)
raw_mat_data=mat_data['AOACube_tx1']
data=raw_mat_data.transpose(3,0,1,2)
all_frames_4_parms = get_4_parms(data)
best_point_rcae = get_best_point(all_frames_4_parms, data)
VATI=np.zeros((32,n_frames),dtype=float)
for i in range(n_frames):
best_point_rc = best_point_rcae[i]
best_point_r = best_point_rc[0]
best_point_c = best_point_rc[1]
VATI_cube=data[i]
VATI_cube[:,(3,11),:]=VATI_cube[:,(3,11),:]*(-1)
VATI_cube=np.fft.fft(VATI_cube,axis=0)
cube_2=VATI_cube[int(best_point_r),(3,2,11,10),int(best_point_c)]
vati_fftdata=np.abs(np.fft.fft(cube_2,axis=0))
vati_fftdata=np.fft.fftshift(vati_fftdata,axes=0)
VATI[14:18,i:i+1]=vati_fftdata.reshape(4,1)
plt.imsave(cout_dir + '\{}.png'.format(file_name), VATI)
print("vati========ok")
def save_HATI_2(file_dir,cout_dir):
for file_name in os.listdir(file_dir):
mat_data=sio.loadmat(file_dir+file_name)
raw_mat_data=mat_data['AOACube_tx1']
data=raw_mat_data.transpose(3,0,1,2)
all_frames_4_parms = get_4_parms(data)
best_point_rcae = get_best_point(all_frames_4_parms, data)
HATI=np.zeros((32,n_frames),dtype=float)
for i in range(n_frames):
best_point_rc = best_point_rcae[i]
best_point_r = best_point_rc[0]
best_point_c = best_point_rc[1]
HATI_cube = data[i]
HATI_cube[:, (4, 8), :] = HATI_cube[:, (4, 8), :] * (-1)
HATI_cube = np.fft.fft(HATI_cube, axis=0)
cube_2 = HATI_cube[int(best_point_r), (4, 6, 8, 10), int(best_point_c)]
hati_fftdata = np.abs(np.fft.fft(cube_2, axis=0))
hati_fftdata = np.fft.fftshift(hati_fftdata, axes=0)
HATI[14:18, i:i + 1] = hati_fftdata.reshape(4, 1)
plt.imsave(cout_dir + '\{}.png'.format(file_name), HATI)
print("hati========ok")
def ca_(x, guard_len=4, noise_len=8, mode='wrap', l_bound=4000):
if isinstance(x, list):
x = np.array(x)
assert type(x) == np.ndarray
kernel = np.ones(1 + (2 * guard_len) + (2 * noise_len), dtype=x.dtype) / (2 * noise_len)
kernel[noise_len:noise_len + (2 * guard_len) + 1] = 0
noise_floor = convolve1d(x, kernel, mode=mode)
threshold = noise_floor + l_bound
return threshold, noise_floor
def show_cube(cube):
cube = cube[:, 8, :]
fft_data = np.fft.fft(cube, axis=0)
img_tmp = fft_data
img_tmp = img_tmp[0:range_index, :]
RDI = np.abs(np.fft.fft(img_tmp, axis=1))
RDI = np.fft.fftshift(RDI, axes=1)
RDI[:,16:17]=0
# return RDI
plt.clf()
plt.imshow(RDI, cmap=plt.get_cmap('seismic'), interpolation='hamming')
plt.pause(10)
def SNR_doppler(cube):
doppler_cube=cube[:,8,:]
fft_data=np.fft.fft(doppler_cube,axis=0)
img_tmp=fft_data
img_tmp=img_tmp[0:range_index,:]
RDI=np.abs(np.fft.fft(img_tmp,axis=1))
RDI=np.fft.fftshift(RDI,axes=1)
RDI[:,16:17]=0
# print(np.where(RDI==np.max(RDI)))
# print(np.max(RDI))
thresholdDoppler, noiseFloorDoppler = np.apply_along_axis(func1d=ca_,
axis=0,
arr=RDI.T,
l_bound=1000,
guard_len=4,
noise_len=16)
thresholdRange, noiseFloorRange = np.apply_along_axis(func1d=ca_,
axis=0,
arr=RDI,
l_bound=1000,
guard_len=4,
noise_len=16)
thresholdDoppler, noiseFloorDoppler = thresholdDoppler.T, noiseFloorDoppler.T
det_doppler_mask = (RDI > thresholdDoppler)
det_range_mask = (RDI > thresholdRange)
full_mask = (det_doppler_mask & det_range_mask)
det_peaks_indices = np.argwhere(full_mask == True)
return det_peaks_indices
##对二维角度图进行阈值估算
def SNR_2d_ai(ai_data):
thresholdDoppler, noiseFloorDoppler = np.apply_along_axis(func1d=ca_,
axis=0,
arr=ai_data.T,
l_bound=500,
guard_len=4,
noise_len=16)
thresholdRange, noiseFloorRange = np.apply_along_axis(func1d=ca_,
axis=0,
arr=ai_data,
l_bound=500,
guard_len=4,
noise_len=16)
thresholdDoppler, noiseFloorDoppler = thresholdDoppler.T, noiseFloorDoppler.T
det_doppler_mask = (ai_data > thresholdDoppler)
det_range_mask = (ai_data > thresholdRange)
full_mask = (det_doppler_mask & det_range_mask)
det_peaks_indices = np.argwhere(full_mask == True)
return det_peaks_indices
##寻找最大的连通区域
def find_doppler_max_area(after_SNR_data):
res=after_SNR_data
list1 = []
list2 = []
length = res.shape[0]
for i in range(1,length):
if i == (length - 1):
list1.append(list2)
if (res[i][0] - res[i-1][0] <= 1):
list2.append(res[i])
continue
else:
list1.append(list2)
list2 = []
list2.append(res[i])
# print(res)
# print(np.array(list1[0]))
# print(np.array(list1[1]))
index = -1
max_len = 0
if len(list1)==0:
return res
else:
for idx in range(len(list1)):
if len(list1[idx]) > max_len:
max_len = len(list1[idx])
index = idx
doppler_max_area=np.array(list1[index])
return doppler_max_area
##对doppler每一个检测点得到二维角度图,如何做?
def AI_2D(xy,cube):
res=np.zeros((4, 4))
y=xy[0]
x=xy[1]
ai_2d_cube=cube
ai_2d_cube=np.fft.fft(ai_2d_cube, axis=0)
res[0,0:2]=ai_2d_cube[y,(3,1),x]
res[1,0:2]=ai_2d_cube[y,(2,0),x]
res[2,0:4]=ai_2d_cube[y,(11,9,7,5),x]
res[3,0:4]=ai_2d_cube[y,(10,8,6,4),x]
res=np.abs(np.fft.fft(res,axis=0))
res=np.fft.fftshift(res,axes=0)
res=np.abs(np.fft.fft(res,axis=1))
res=np.fft.fftshift(res,axes=1)
# res=np.abs(np.fft.fft2(res,s=(64,64)))
# res=np.fft.fftshift(res,axes=(1,0))
# print(res)
# print(np.where(res == np.max(res)))
# plt.clf()
# plt.imshow(res, cmap=plt.get_cmap('seismic'), interpolation='hamming')
# plt.pause(1)
return res
##对snr后的二维角度图取一个手势点即可,如何取?
def get_best_2d_ai(after_snr_2d_ai):
return (after_snr_2d_ai[int(after_snr_2d_ai.shape[0]/2)])
##对snr前的二维角度图取最大值的那个点,返回是一个[evea,amziu]列表
def get_maxpower_2d_ai(ai_data):
ev,am=np.where(ai_data==np.max(ai_data))
return [ev[0],am[0]]
##此时计算得到了r,c,amziu,eleva,将第一帧以后的四个参数信息进行整合,与前一帧进行计算
def get_4_parms(data):
all_frames_4_parms=[]
for k in range(1,data.shape[0]):
cube=data[k]
after_snr_data=SNR_doppler(cube)
doppler_max_area=find_doppler_max_area(after_snr_data)
four_parms=np.zeros((doppler_max_area.shape[0],4))
i=0
for rc in doppler_max_area:
ai_data=AI_2D(rc,cube)
# after_snr_2d_ai=SNR_2d_ai(ai_data)
# best_2d_ai=get_best_2d_ai(after_snr_2d_ai)
max_power_2d_ai=get_maxpower_2d_ai(ai_data)
amziu=(-90+max_power_2d_ai[1]*60 )/180*math.pi ###计算两个方向的角度,角度分辨率?
elev=(-90+max_power_2d_ai[0]*60)/180*math.pi
four_parms[i,:]=(rc[0],rc[1],amziu,elev)
i+=1
all_frames_4_parms.append(four_parms)
# print("=====ok====")
return np.array(all_frames_4_parms)
def get_best_point(all_frames_4_parms,frame_0):
best_point_rcae=np.zeros((50,4))
best_point_rcae[0,:]= get_first_frame_4_parms(frame_0) ###填入第一帧的最优散射点的r,c,a,e坐标
index=0
for four_parms_arr in all_frames_4_parms:
length_list=[]
for four_parms in four_parms_arr:
r=four_parms[0]
a=four_parms[2]
e=four_parms[3]
row1=r*math.cos(a)*math.cos(e)-best_point_rcae[index][0]*math.cos(best_point_rcae[index][2])*math.cos(best_point_rcae[index][3])
row2=r*math.cos(e)*math.sin(a)-best_point_rcae[index][0]*math.cos(best_point_rcae[index][3])*math.sin(best_point_rcae[index][2])
row3=r*math.sin(e)-best_point_rcae[index][0]*math.sin(best_point_rcae[index][3])
tmp_length=math.sqrt(math.pow(row1,2)+math.pow(row2,2)+math.pow(row3,2))
length_list.append(tmp_length)
# print(length_list)
length_arr=np.array(length_list)
# print(length_arr)
# assert length_arr.size!=0
idx=np.where(length_arr==np.min(length_arr))
index+=1
best_point_rcae[index,:]=four_parms_arr[idx[0][0],:]
# print("====ok2====")
return best_point_rcae
def get_first_frame_4_parms(data):
first_frame_4_parms=[]
first_frame=data[0]
print(first_frame.shape)
first_frame = first_frame[:, 8, :]
first_frame_fft_data = np.fft.fft(first_frame, axis=0)
tmp_img = first_frame_fft_data[0:range_index, :]
first_RDI = np.abs(np.fft.fft(tmp_img, axis=1))
first_RDI = np.fft.fftshift(first_RDI, axes=1)
first_RDI[:, 16:17] = 0
first_r, first_c = np.where(first_RDI== np.max(first_RDI))
first_frame_4_parms.append(first_r[0])
first_frame_4_parms.append(first_c[0])
first_2d_ai=AI_2D([first_r[0],first_c[0]],data[0])
first_frame_ea=get_maxpower_2d_ai(first_2d_ai)
first_amz=(-90+first_frame_ea[1]*60)/180*math.pi
first_frame_4_parms.append(first_amz)
first_elev=(-90+first_frame_ea[0]*60)/180*math.pi
first_frame_4_parms.append(first_elev)
return first_frame_4_parms
def save_CA_DTI_2(file_dir,cout_dir):
for file_name in os.listdir(file_dir):
mat_data=sio.loadmat(file_dir+file_name)
raw_mat_data=mat_data['AOACube_tx1'] ##加载mat文件
data=raw_mat_data.transpose(3,0,1,2) ##data大小为 50*256*12*32(帧数*采样点数*通道数*chrips)
DTI=np.zeros((n_chrips,n_frames))
all_frames_4_parms=get_4_parms(data)
best_point_rcae=get_best_point(all_frames_4_parms,data)
for i in range(n_frames):
best_point_rc=best_point_rcae[i]
best_point_r=best_point_rc[0]
best_point_c=best_point_rc[1]
# r,c=np.where(RT==np.max(RT))
cube_2=data[i]
# print(best_point_r)
cube_2=cube_2[int(best_point_r),:,:].reshape(12,32)
cube_avg=np.mean(cube_2,axis=0).reshape(1,32) ###对多通道做平均得到CA-DTI
cube_avg=np.transpose(cube_avg,(1,0))
DT_data=np.abs(np.fft.fft(cube_avg,axis=0))
DT_data=np.fft.fftshift(DT_data,axes=0)
DTI[:,i:i+1]=DT_data
# DTI = normalize(DTI)
plt.imsave(cout_dir + '\{}.png'.format(file_name),DTI)
# after_snr_data=SNR_doppler(data[49]) ###对多普勒进行阈值筛选
# print(after_snr_data)
# doppler_max_area=find_doppler_max_area(after_snr_data) ###找阈值筛选后的最大连通区域
# print(doppler_max_area)
# ai_data=AI_2D(doppler_max_area[0],data[49]) ###对最大连通区域中的每个点计算二维角度图
# after_snr_2d_ai=SNR_2d_ai(ai_data) ###对二维角度图进行阈值筛选
# best_2d_ai=get_best_2d_ai(after_snr_2d_ai)
# maxpower_2d_ai=get_maxpower_2d_ai(ai_data)
# print(get_4_parms(data))
# print(get_best_point(get_4_parms(data),data))
# save_CA_DTI_2('E:/getdata/new_dataset/mat_data/session/','E:/getdata/new_dataset/tmp/ca_dti_2/session/')
save_HATI_2('E:/getdata/new_dataset/mat_data/Train/','E:/getdata/new_dataset/tmp/hati_2/Train/')
# show_cube(data[10])
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/nagisa-surface/radar-hand-gesture.git
git@gitee.com:nagisa-surface/radar-hand-gesture.git
nagisa-surface
radar-hand-gesture
radar-hand-gesture
master

搜索帮助