代码拉取完成,页面将自动刷新
from collections import deque
from pcaspy import SimpleServer, Driver
import os
import threading
from epics import PV
import time
maxsize = 2
EPICS_CA_ADDR_LIST = "127.0.0.1 192.168.1.26 192.168.1.27" # 采集数据PV所在服务器的IP地址,默认为localhost
EPICS_CAS_SERVER_PORT = "5066" # 本IOC服务器的端口号,默认为5064
# LACCT = "iLINAC:mod1_LACCT1_Rd_Avg"
# MACCT = "iLINAC:mod1_MACCT1_Rd_Avg"
LACCT = "zyh:ai1"
MACCT = "zyh:ai2"
_Transmission_efficiency = "iLINAC:Transmission_efficiency"
class FixedSizeQueue:
def __init__(self):
self.queue = deque(maxlen=maxsize)
self.total_sum = 0
def enqueue(self, value):
if len(self.queue) == self.queue.maxlen:
# 如果队列已满,移除最前面的元素并更新总和
self.total_sum -= self.queue[0]
self.queue.append(value)
self.total_sum += value
def dequeue(self):
if not self.queue:
raise IndexError("dequeue from empty queue")
value = self.queue.popleft()
self.total_sum -= value
return value
def average(self):
if not self.queue:
return 0 # 或者可以抛出异常,取决于你的需求
return self.total_sum / len(self.queue)
def __len__(self):
return len(self.queue)
def __repr__(self):
return f"FixedSizeQueue({list(self.queue)})"
class Transmission_efficiency(threading.Thread):
def __init__(self,setParam):
self.setParam = setParam
self.Flag = False
self.LACCT = PV(LACCT, callback=self.onValueChange)
self.MACCT = PV(MACCT, callback=self.onValueChange)
self.LACCT_queue = FixedSizeQueue()
self.MACCT_queue = FixedSizeQueue()
threading.Thread.__init__(self)
def onValueChange(self,pvname=None, value=None, host=None, **kws):
if pvname == LACCT:
self.LACCT_queue.enqueue(self.LACCT.get())
elif pvname == MACCT:
self.MACCT_queue.enqueue(self.MACCT.get())
self.Flag = True
def run(self):
while True:
time.sleep(0.01)
if self.Flag:
self.Flag = False
self.setParam(LACCT+"_C",self.LACCT_queue.average())
self.setParam(MACCT+"_C",self.MACCT_queue.average())
if not self.LACCT_queue.average() == 0:
self.setParam(_Transmission_efficiency,self.MACCT_queue.average()/self.LACCT_queue.average()*100)
class myDriver(Driver):
def __init__(self):
super(myDriver, self).__init__()
Transmission_efficiency(self.setParam)
if __name__ == '__main__':
os.environ["EPICS_CA_ADDR_LIST"] = EPICS_CA_ADDR_LIST
os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "no"
os.environ["EPICS_CAS_SERVER_PORT"] = EPICS_CAS_SERVER_PORT
prefix = ''
pvdb = {LACCT+"_C":{
'prec': '3',
'scan': 1,
},
MACCT+"_C":{
'prec': '3',
'scan': 1,
},
_Transmission_efficiency:{'prec': '3',
'scan': 1,}}
server = SimpleServer()
server.createPV(prefix, pvdb)
driver = myDriver()
while True:
server.process(0.1)
# 使用示例
queue = FixedSizeQueue()
queue.enqueue(1)
queue.enqueue(2)
queue.enqueue(3)
print(f"Queue: {queue}, Average: {queue.average()}")
queue.enqueue(4) # 此时队列已满,1将被移除
print(f"Queue: {queue}, Average: {queue.average()}")
queue.enqueue(5) # 此时队列已满,2将被移除
print(f"Queue: {queue}, Average: {queue.average()}")
# 尝试出队
queue.dequeue()
print(f"Queue: {queue}, Average: {queue.average()}")
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。