代码拉取完成,页面将自动刷新
import json
import os.path
import time
import requests
from colorama import *
from function_util import *
class StatisticsAi:
"""
基于统计学的AI,根据当前感知的历史情况,进行行为选择
"""
def __init__(self):
self.inputAndBeHistoryTable = {}
self.inputAndBeHistory = None
self.z = {}
self.allBeGuide = {}
"""行为指导;结构={be: {xIndex1: {xValue1: y1,...},...}}"""
self.migrate = []
def StudyFromY(self):
"""
假定学习是逆向的,及通过y,分析x的成分,并提取特征z={{value: [indexDict..]}: y}
那为啥不能用x去分析y成分了,假定xi==xj,yi!=yj,岂不是说明含有无法感知的通道存在……那确实
"""
yIndexHistory = self.GetYIndexHistory()
z = {}
for value, keys in yIndexHistory.items():
# print('value=', value)
z1 = GetSameMath(keys)
z[value] = z1
"""
还需要继续提纯
"""
z = DeepZ(z) # 求到了局部的感知通道与行为的关系
# print('z=', z)
"""
v= -0.001
data= {frozenset({(1, 3)}): {3: 0}, frozenset({(1, 7)}): {7: 0}, frozenset({(1, 4)}): {4: 0}, ...}
v= -0.002
data= {frozenset({(1, 8)}): {}}
v= 1.0
data= {frozenset({(1, 5)}): {1: 0, 5: 1}, frozenset({(1, 4)}): {3: 0, 4: 1}, frozenset({(1, 1)}): {1: 1, 6: 0}, ...}
v= -1.0
data= {frozenset({(1, 6)}): {0: 0, 1: 0, 5: 0, 6: 2, 7: 0}, frozenset({(1, 7)}): {1: 0, 2: 0, 7: 2}, ...}
"""
# for v, data in z.items():
# print('v=', v)
# print('data=', data)
# 接下来需要对同行为的不同结果进行感知上的分析
# 例如v=-0.001时,(1, 4): {4: 0}与v=1.0时,(1, 4): {3: 0, 4: 1}
# 双方都有通道4,且4的值不同,那么是否可以说通道4=1, be=4时,y=1.0
self.z = z
beIndexValue = self.GetBeIndexValueFromZ(z)
# print('beIndexValue=', beIndexValue)
allBeGuide = {}
for be, data in beIndexValue.items():
sameKeySet = self.GetAllHaveKey([d[1] for d in data])
# print('be=', be, 'sameKeySet=', sameKeySet, data)
if sameKeySet:
beGuide = self.GetBeGuide(sameKeySet, data)
else:
beGuide = data[0][0]
# print('beGuide=', beGuide)
allBeGuide[be] = beGuide
print('allBeGuide=', allBeGuide)
self.allBeGuide = allBeGuide
# for be, beIndexValueData in self.allBeGuide.items():
# print('be=', be)
# print('beIndexValueData=', beIndexValueData)
migrate = self.GetMigrateFromAllBeGuide(allBeGuide)
print('migrate=', migrate)
self.migrate = migrate
def GetMigrateFromAllBeGuide(self, allBeGuide: dict):
# allBeGuide={be: {xIndex1: {xValue1: y1,...},...}}
# 不同be的不同xIndex下,若xValue总是映射相应的y,那么可以将缺失部分的 xValue对 补齐
result = [] # 根据定义=> [[~be1, {xValue: y1,...}], [~be2, {xValue: y,...}]],~be:[be 1, be2, be3]
def MatchTemplate(template, xValueIndexYDict: dict):
for xValue, y in template[1].items():
if y != xValueIndexYDict.get(xValue, y):
return False
return True
def GetTemplate(xValueIndexYDict: dict):
for template in result:
if MatchTemplate(template, xValueIndexYDict):
return template
return None
def ComboTemplate1(template1, xValueIndexYDict: dict):
for k, v in xValueIndexYDict.items():
template1[k] = v
return template1
for be, xIdxVleYDict in allBeGuide.items():
if type(xIdxVleYDict) != dict:
continue
for xIndex in xIdxVleYDict:
_template = GetTemplate(xIdxVleYDict[xIndex])
if _template is None:
result.append([{be}, xIdxVleYDict[xIndex]])
else:
_template[0].add(be)
_template[1] = ComboTemplate1(_template[1], xIdxVleYDict[xIndex])
return result
def GetBeGuide(self, sameKeySet, valueIndexData: list):
"""获得行为指导;已知某个行为的关键感知,获得此感知的对象索引价值"""
result = {}
for key in sameKeySet:
result[key] = {}
for data in valueIndexData:
result[key][data[1][key]] = data[0]
return result
def GetAllHaveKey(self, dictList):
kSet = set(dictList[0].keys())
for d in dictList[1:]:
kSet &= set(d.keys())
return kSet
def GetBeIndexValueFromZ(self, z):
beIndexValue = {}
for value, data in z.items():
for be, indexDict in data.items():
if be not in beIndexValue:
beIndexValue[be] = []
beIndexValue[be].append([value, indexDict])
return beIndexValue
def GetYIndexHistory(self):
result = {}
for key, data in self.inputAndBeHistoryTable.items():
value = round(data[0] / float(data[1]), 4)
# if value <= 0:
# continue
if value not in result:
result[value] = {key}
else:
result[value].add(key)
return result
def TestStudy(self):
result = self.GetYIndexHistory()
# 特征提取;需要一种工具,提取统计特征
# +++-+
# +++++
# ===*= 模式匹配?
"""
已知x[i]和x[j]的y相同,是否可以推断出x[i]~x[j]在某种程度上相似?
那似乎可以对这种相似进行提取或识别?提取识别的过程就是新的认知过程?
"""
for value, keys in result.items():
print('value=', value)
for key1 in keys:
for key2 in keys:
if key1 == key2:
continue
print('key=', key1, key2)
sameMatch = SameMatchByArray(key1, key2)
print('SameMatch=', sameMatch)
if sameMatch:
for aid in range(len(key1)):
if aid not in sameMatch:
sameMatch2 = SameMatchByArray(key1[aid], key2[aid])
print('SameMatch2=', aid, sameMatch2)
def GetHistoryY(self, x, be):
history = self.inputAndBeHistoryTable.get((x, be), [])
if history:
return history[0] / float(history[1]), history
return 0, None
def Feedback(self, y):
"""对上一次行为的环境反馈"""
if self.inputAndBeHistory is not None:
if self.inputAndBeHistory not in self.inputAndBeHistoryTable:
self.inputAndBeHistoryTable[self.inputAndBeHistory] = [y, 1]
else:
self.inputAndBeHistoryTable[self.inputAndBeHistory][0] += y
self.inputAndBeHistoryTable[self.inputAndBeHistory][1] += 1
def Input(self, x, y, a):
"""感知帧,当前能用的行为"""
print('x=', x)
self.Feedback(y)
self.StudyFromY()
be = self.Decision(x, a)
self.inputAndBeHistory = (x, be)
print()
return be
def GetYFromZ(self, x, be):
for value, data in self.z.items():
if (1, be) not in data:
return None
for xIndex, xValue in data[(1, be)].items():
if x[xIndex] != xValue:
return None
return value
# for fSet, indexDict in data.items():
# if (1, a) in fSet:
# for index, indexValue in indexDict.items():
# if x[index] != indexValue:
# return None
# return value
return None
def GetMigrateY(self, be, xValue):
for beSet, xValueIndexYDict in self.migrate:
if be in beSet:
return xValueIndexYDict.get(xValue)
return None
def GetYFromBeGuide(self, x, be):
if (1, be) not in self.allBeGuide:
return None, False
# self.allBeGuide[(1, be)] => {xIndex: {xValue1: y1, xValue2: y2, xValue3: y3}}
if type(self.allBeGuide[(1, be)]) == dict:
if len(self.allBeGuide[(1, be)]) > 1:
return None, False
for xIndex, beIndexValueDict in self.allBeGuide[(1, be)].items():
# xValue in beIndexValueDict 为什么就能return呢? 因为收敛后实际上只有一个元素?那未收敛情况(多个元素全部满足吧)呢
# 所以这里应该时求平均吗?因为不知道行为会最终选向哪个;错,应该是不是唯一的就return None,求平均不明智
# 这里如果只有一个通道的时候执行,又太先验了吧!万一有两(多)个通道呢!因此,这里是要全部满足条件后?
# 有点蒙圈,只能先这样了
if x[xIndex] in beIndexValueDict:
return beIndexValueDict[x[xIndex]], False
# 使用迁移数据尝试回答
migrateY = self.GetMigrateY((1, be), x[xIndex])
if migrateY is not None:
return migrateY, True
return None, False
else:
return self.allBeGuide[(1, be)], False
def Decision(self, x, a):
"""决策"""
maxPack = MaxPack(-1000)
for be in a:
y, isMigrate = self.GetYFromBeGuide(x, be)
if y is None:
y = self.GetYFromZ(x, be)
if y is None:
y, history = self.GetHistoryY(x, be)
print(Fore.YELLOW + f'be={be}, 基于统计指导的y=', y, history, Style.RESET_ALL)
# print('self.inputAndBeHistoryTable=', self.inputAndBeHistoryTable)
else:
print(Fore.LIGHTYELLOW_EX + f'be={be}, 基于特征指导的y=', y, Style.RESET_ALL)
else:
print(Fore.GREEN + f'be={be}, 是否迁移={isMigrate}, 基于行为指导的y=', y, self.allBeGuide[(1, be)], Style.RESET_ALL)
maxPack.Update(y, be)
print('当前最大价值=', maxPack.maxValue, maxPack.maxVarList)
# print(f'行为={be}, 预期y={y}')
print(f'最大预期行为={maxPack.maxVarList}')
return random.choice(maxPack.maxVarList)
def Load(self):
if os.path.exists('inputAndBeHistoryTable.pys'):
with open('inputAndBeHistoryTable.pys', 'r', encoding='utf8') as f:
self.inputAndBeHistoryTable = eval(f.read())
with open('inputAndBeHistory.pys', 'r', encoding='utf8') as f:
self.inputAndBeHistory = eval(f.read())
def Save(self):
"""
self.inputAndBeHistoryTable = {}
self.inputAndBeHistory = []
"""
with open('inputAndBeHistoryTable.pys', 'w', encoding='utf8') as f:
f.write(str(self.inputAndBeHistoryTable))
with open('inputAndBeHistory.pys', 'w', encoding='utf8') as f:
f.write(str(self.inputAndBeHistory))
def Test2():
rq = requests.post(
'http://127.0.0.1:5011/add_ai', json={}
)
my = StatisticsAi()
my.Load()
t = 0
while 1:
print(Fore.RED + 't=', t, Style.RESET_ALL)
result = rq.json()
rq = requests.post(
'http://127.0.0.1:5011/move', json={'d': my.Input(tuple(result['frame']), result['y'], [_i for _i in range(9)])}
)
# input()
time.sleep(0.05)
t += 1
if t > 1000:
my.Save()
return
def Test1():
my = StatisticsAi()
my.Load()
my.StudyFromY()
if __name__ == '__main__':
Test2()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。