1 Star 0 Fork 0

周一恒/BMT_PM

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Transmission_efficiency_HZ_Z.py 3.71 KB
一键复制 编辑 原始数据 按行查看 历史
Your Name 提交于 2024-11-08 11:43 . 添加发射度灰度图
from collections import deque
from pcaspy import SimpleServer, Driver
import os
import threading
from epics import PV
import time
maxsize = 2
EPICS_CA_ADDR_LIST = "127.0.0.1 192.168.1.26 192.168.1.27" # 采集数据PV所在服务器的IP地址,默认为localhost
EPICS_CAS_SERVER_PORT = "5066" # 本IOC服务器的端口号,默认为5064
# LACCT = "iLINAC:mod1_LACCT1_Rd_Avg"
# MACCT = "iLINAC:mod1_MACCT1_Rd_Avg"
LACCT = "zyh:ai1"
MACCT = "zyh:ai2"
_Transmission_efficiency = "iLINAC:Transmission_efficiency"
class FixedSizeQueue:
def __init__(self):
self.queue = deque(maxlen=maxsize)
self.total_sum = 0
def enqueue(self, value):
if len(self.queue) == self.queue.maxlen:
# 如果队列已满,移除最前面的元素并更新总和
self.total_sum -= self.queue[0]
self.queue.append(value)
self.total_sum += value
def dequeue(self):
if not self.queue:
raise IndexError("dequeue from empty queue")
value = self.queue.popleft()
self.total_sum -= value
return value
def average(self):
if not self.queue:
return 0 # 或者可以抛出异常,取决于你的需求
return self.total_sum / len(self.queue)
def __len__(self):
return len(self.queue)
def __repr__(self):
return f"FixedSizeQueue({list(self.queue)})"
class Transmission_efficiency(threading.Thread):
def __init__(self,setParam):
self.setParam = setParam
self.Flag = False
self.LACCT = PV(LACCT, callback=self.onValueChange)
self.MACCT = PV(MACCT, callback=self.onValueChange)
self.LACCT_queue = FixedSizeQueue()
self.MACCT_queue = FixedSizeQueue()
threading.Thread.__init__(self)
def onValueChange(self,pvname=None, value=None, host=None, **kws):
if pvname == LACCT:
self.LACCT_queue.enqueue(self.LACCT.get())
elif pvname == MACCT:
self.MACCT_queue.enqueue(self.MACCT.get())
self.Flag = True
def run(self):
while True:
time.sleep(0.01)
if self.Flag:
self.Flag = False
self.setParam(LACCT+"_C",self.LACCT_queue.average())
self.setParam(MACCT+"_C",self.MACCT_queue.average())
if not self.LACCT_queue.average() == 0:
self.setParam(_Transmission_efficiency,self.MACCT_queue.average()/self.LACCT_queue.average()*100)
class myDriver(Driver):
def __init__(self):
super(myDriver, self).__init__()
Transmission_efficiency(self.setParam)
if __name__ == '__main__':
os.environ["EPICS_CA_ADDR_LIST"] = EPICS_CA_ADDR_LIST
os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "no"
os.environ["EPICS_CAS_SERVER_PORT"] = EPICS_CAS_SERVER_PORT
prefix = ''
pvdb = {LACCT+"_C":{
'prec': '3',
'scan': 1,
},
MACCT+"_C":{
'prec': '3',
'scan': 1,
},
_Transmission_efficiency:{'prec': '3',
'scan': 1,}}
server = SimpleServer()
server.createPV(prefix, pvdb)
driver = myDriver()
while True:
server.process(0.1)
# 使用示例
queue = FixedSizeQueue()
queue.enqueue(1)
queue.enqueue(2)
queue.enqueue(3)
print(f"Queue: {queue}, Average: {queue.average()}")
queue.enqueue(4) # 此时队列已满,1将被移除
print(f"Queue: {queue}, Average: {queue.average()}")
queue.enqueue(5) # 此时队列已满,2将被移除
print(f"Queue: {queue}, Average: {queue.average()}")
# 尝试出队
queue.dequeue()
print(f"Queue: {queue}, Average: {queue.average()}")
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zyh-j/bmt_pm.git
git@gitee.com:zyh-j/bmt_pm.git
zyh-j
bmt_pm
BMT_PM
master

搜索帮助