6 Star 19 Fork 8

E.K/量化交易

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
backtrader_tools.py 21.66 KB
一键复制 编辑 原始数据 按行查看 历史
E.K 提交于 2024-07-17 10:48 . 开源前测试

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File: data_download.py
# Date: 2024/7/4
# Author: 抖音、快手、视频号:东哥策略交易,微信:dongtrader
# Description: 东哥量化,带你走上量化之路。
import datetime # 用于datetime对象操作
import os
import re
import feather
import backtrader as bt # 引入backtrader框架
import random
import time
from datetime import datetime, timedelta
from data_settings import *
# IndexError: array assignment index out of range,
# 主要是因为股票k线数量在回测周期内存在停牌等原因造成数量少于计算周期,如用均线30天、5天与20天金叉等这种平滑计算
data_columns = ['open', 'high', 'low', 'close', 'volume', 'trade','openinterest']
class Addmoredata(bt.feeds.PandasData):
lines = ('trade',)
params = (('trade', 5),)
class MyStrategy(bt.Strategy):
# 可配置策略参数
params = dict(
poneplot=False, # 是否打印到同一张图
pstake=1000, # 单笔交易股票数目
percents=100, # 单笔交易百分比,全仓100%
retint=True,
)
def __init__(self):
self.inds = dict()
self.buyid = {}
self.buyprice = {}
self.buyhigh = {}
# 买入价格和手续费
self.buyprice = None
self.buycomm = None
# self.trade = self.datas[0].trade
for i, d in enumerate(self.datas):
self.inds[d] = dict()
if i > 0:
if self.p.poneplot:
d.plotinfo.plotmaster = self.datas[0]
def getsize(self, data):
try:
dc = data.close[1]
except:
dc = data.close[0]
size = int(self.broker.get_cash() / max(data.close[0], dc) / 100) * 100
return size
def next(self):
for i, d in enumerate(self.datas):
pos = self.getposition(d)
if not pos: # 不在场内,则可以买入
if d.trade[0] == 1:
self.buy(data=d, size=self.getsize(d))
else: # 在场内,则可以卖出
if d.trade[0] == -1:
self.close(data=d) # 平仓,适用于股票(卖出)及期货
def get_filenames_in_directory(directory_path):
"""
获取指定目录下的所有文件名。
:param directory_path: 文件夹的路径 (字符串)
:return: 包含文件名的列表 (列表)
"""
# 确保传入的路径是绝对路径,以避免相对路径可能引起的错误
directory_path = os.path.abspath(directory_path)
# 检查路径是否存在且是一个目录
if os.path.exists(directory_path) and os.path.isdir(directory_path):
# 使用os.listdir获取目录下的所有条目
entries = os.listdir(directory_path)
# 过滤出文件名,排除可能的子目录
filenames = [entry for entry in entries if os.path.isfile(os.path.join(directory_path, entry))]
table_list = [name.replace('.feather', '') for name in filenames if
re.match(r'^\d+$', name.replace('.feather', ''))]
return table_list
else:
print(f"路径不存在或不是一个有效的目录: {directory_path}")
return []
# 随机生成回测时间或计算固定回测时间
def random_times(start_date=None, addday=365):
"""
生成一个随机的日期范围。
参数:
start_date -- 起始日期,可以是None、整数(表示年份)或元组(年,月,日)。
addday -- 结束日期相对于起始日期增加的天数。
返回:
一个元组,包含随机起始日期和结束日期的元组。
"""
"""
生成一个随机日期范围,并根据指定的天数增加结束日期。
:param start_date: 开始日期,可以是None、整数或包含年月日的元组。
:param addday: 结束日期相对于开始日期需要增加的天数。
:return: 开始日期和结束日期的元组。
"""
# 根据start_date的类型设置起始日期
if start_date is None:
start_year = 2000
start_month = 1
start_day = 1
elif isinstance(start_date, int):
start_year = start_date
start_month = 1
start_day = 1
else:
start_year, start_month, start_day = start_date
# 定义结束日期
end_year = 2020
end_month = 12
end_day = 31
# 将日期转换为时间戳
start_timestamp = time.mktime((start_year, start_month, start_day, 0, 0, 0, 0, 0, 0))
end_timestamp = time.mktime((end_year, end_month, end_day, 23, 59, 59, 0, 0, 0))
start_timestamp = int(start_timestamp)
end_timestamp = int(end_timestamp)
# 生成随机时间戳
random_timestamp = random.randint(start_timestamp, end_timestamp)
random_date = time.localtime(random_timestamp)
odate = time.strftime("%Y-%m-%d", random_date)
# # 将随机日期转换为元组格式
#
# end_date = (int(odate[:4]), int(odate[5:7]), int(odate[-2:]))
# 根据随机日期和增加的天数计算结束日期
at = datetime.strptime(odate, '%Y-%m-%d')
addday = int(addday)
offset = timedelta(days=addday)
ndate = (at + offset).strftime("%Y-%m-%d")
start_date = f'{int(odate[:4])}-{int(odate[5:7])}-{int(odate[-2:])}'
end_date = f'{int(ndate[:4])}-{int(ndate[5:7])}-{int(ndate[-2:])}'
# 返回起始日期和结束日期的元组
return start_date, end_date
# 统计回测周期内K线数量
def bar_size(code, start_date, end_date):
"""
计算给定股票代码和起止日期之间的数据条数。
参数:
code: 字符串,股票代码。
start_date: 字符串,起始日期,格式为'YYYY-MM-DD'。
end_date: 字符串,结束日期,格式为'YYYY-MM-DD'。
返回:
整数,起止日期之间的数据条数。
"""
# 构建股票数据文件的路径
file_path = f'{stockpath}/{code}.feather'
# 从文件中读取股票数据
df = feather.read_dataframe(file_path)
# 将DataFrame中的日期列转换为datetime类型的列表
# 将所有日期转化为datetime的list
date_list = list(map(lambda x: datetime.strptime(str(x), '%Y-%m-%d'),
df['date'].to_list()))
# 使用二分查找确定起始日期的索引
start_index = binary_search(date_list, start_date)
# 使用二分查找确定结束日期的索引
end_index = binary_search(date_list, end_date)
# 计算并返回日期范围内的数据条数
return end_index - start_index + 1
# 二分查找,确定所查找日期索引值
def binary_search(lst, target):
"""
在有序列表中使用二分查找算法寻找目标值的索引。
如果目标值存在于列表中,则返回其索引;
如果目标值不存在,返回离目标值最近的索引。
:param lst: 有序列表
:param target: 目标值
:return: 目标值的索引或离目标值最近的索引
"""
# 初始化搜索范围的起始和结束索引
start = 0
end = len(lst) - 1
while start <= end:
# 计算中间索引
mid = (start + end) // 2
if target > lst[mid]:
# 如果目标值大于中间值,更新搜索的起始索引
start = mid + 1
elif target < lst[mid]:
# 如果目标值小于中间值,更新搜索的结束索引
end = mid - 1
else:
# 如果目标值等于中间值,找到目标,返回其索引
return mid
# 如果未找到目标值,返回起始和结束索引中较小的绝对值
# 这表示目标值可能在该索引处插入而保持列表有序
return min(abs(start), abs(end))
# 符合回测的股票号码,回测有效时间、回测K线数
def conform_code(codelst, stock_num, start_date, end_date):
"""
根据给定的股票代码列表、股票数量、开始日期和结束日期,筛选出符合特定条件的股票代码。
参数:
codelst: 股票代码列表。
stock_num: 需要筛选出的股票数量。
start_date: 开始日期,格式为(年, 月, 日)。
end_date: 结束日期,格式为(年, 月, 日)。
返回:
符合条件的股票代码列表。
"""
# 初始化一个空列表,用于存储筛选后的股票代码
testlist = []
# 初始化计数器i,用于记录已筛选出的股票数量
i = 0
# 遍历股票代码列表
for code in codelst:
# 如果已筛选出的股票数量小于指定的数量,继续筛选
if i < stock_num:
try:
# 构建股票数据文件的路径
file_path = f'{stockpath}/{code}.feather'
# 从文件中读取股票数据
db_df = feather.read_dataframe(file_path)
# 筛选数据中的最新交易日期
# 假设'date'是数据框中的列之一,我们按日期排序并选择最新的日期
latest_df = db_df.sort_values('date', ascending=False).head(1)
latest_date = str(db_df.loc[0, 'date'])
except:
# 如果读取数据失败,跳过当前股票
continue
# 判断最新交易日期是否晚于开始日期
start_date = str(datetime.strptime(start_date, '%Y-%m-%d').date())
if int(start_date[:4]) > int(latest_date[:4]) or \
(int(start_date[:4]) == int(latest_date[:4]) and int(start_date[5:7]) > int(latest_date[5:7])) or \
(int(start_date[:4]) == int(latest_date[:4]) and int(start_date[5:7]) == int(latest_date[5:7]) and
int(start_date[8:]) > int(latest_date[8:])):
# 如果最新交易日期早于开始日期,或者交易周期小于最小周期,则跳过当前股票
start=re.split('-', start_date)
end = re.split('-', end_date)
if MIN_PERIOD > bar_size(code, datetime(int(start[0]), int(start[1]), int(start[2])),
datetime(int(end[0]), int(end[1]), int(end[2]))):
continue
# 如果最新交易日期晚于或等于开始日期,并且交易周期大于等于最小周期,则将股票代码添加到筛选后的列表中
testlist.append(code)
# 增加计数器i
i += 1
else:
break
# 返回筛选后的股票代码列表
return testlist
# 随机生成回测股票
def random_code(stock_num, start_date, end_date):
"""
根据指定的起止日期,从数据库中随机选择一定数量的股票代码。
参数:
stock_num: int, 需要随机选择的股票数量。
start_date: tuple, 起始日期,格式为(年, 月, 日)。
end_date: tuple, 结束日期,格式为(年, 月, 日)。
返回:
list, 包含随机选择的股票代码的列表。
"""
stocklist = get_filenames_in_directory(stockpath)
# 随机打乱股票代码列表
random.shuffle(stocklist)
# 对股票代码进行筛选,确保它们在指定日期范围内有数据
testlist = conform_code(stocklist, stock_num, start_date, end_date)
# 返回最终筛选后的股票代码列表
return testlist
def is_number_in_range(s):
"""
判断输入的字符串s是否符合股市股票特定范围内的数字。
字符串s必须是纯数字,并且对应的数字必须在以下范围内之一:
- 0到100000 深市股票
- 300000到400000 创业板股票
- 600000到700000 沪市股票(含科创板)
- 800000到900000 北交所股票
参数:
s (str): 待检查的字符串。
返回:
bool: 如果字符串s表示的数字在指定范围内,则返回True,否则返回False。
"""
# 检查字符串s是否为纯数字
# 首先检查字符串是否全部由数字组成
if not s.isdigit():
return False
# 将字符串s转换为整数
# 将字符串转换为整数
number = int(s)
# 检查数字是否在指定范围内
# 检查数字是否在指定的区间范围内
if 0 <= number <= 100000 or \
300000 <= number <= 400000 or \
600000 <= number <= 700000 or \
800000 <= number <= 900000:
return True
else:
return False
# 检查输入的股票号码是否正确,
def check_codename(code):
"""
检查提供的股票代码是否符合特定的条件。
参数:
code -- 股票代码,可以是单个代码(字符串或整数)或代码列表。
返回:
如果代码符合条件,则返回一个包含合格代码的列表;
如果代码不符合条件,则停止程序执行。
"""
# 初始化一个空列表,用于存储符合条件的股票代码
testlist = []
# 检查code的类型
if type(code) is list:
# 遍历代码列表
for co in code:
# 检查单个代码是否在指定范围内
result = is_number_in_range(co)
if result == True:
# 如果代码符合条件,添加到结果列表中
testlist.append(co)
# 立即返回结果列表
return testlist
else:
# 如果代码不符合条件,提示用户并退出程序
print('输入的股票代码有误,请检查后重新运行程序!')
input("程序执行完毕,请按回车键退出...")
os._exit(0)
elif type(code) is str or type(code) is int:
# 检查单个代码是否在指定范围内
result = is_number_in_range(code)
if result == True:
# 如果代码符合条件,添加到结果列表中
testlist.append(code)
# 立即返回结果列表
return testlist
else:
# 如果代码不符合条件,提示用户并退出程序
print('输入的股票代码有误,请检查后重新运行程序!')
input("程序执行完毕,请按回车键退出...")
os._exit(0)
else:
# 如果code的类型既不是列表也不是字符串或整数,提示用户并退出程序
print('输入的股票代码有误,请检查后重新运行程序!')
input("程序执行完毕,请按回车键退出...")
os._exit(0)
# 股票独立回测程序
def separate_test(strategy, stock_num=10, start_date=None, days=365):
print('*' * 80)
print(f'=*=*= 股 票 独 立 回 测 =*=*=')
print(f'回测股票数:{stock_num}个,回测天数:{days}天')
if start_date is None:
start_date, end_date = random_times(start_date=None, addday=days)
else:
start_date, end_date = random_times(start_date=start_date, addday=days)
# 从股票代码列表中,随机选择股票,如股票数据时间长度符合回测时间要求,加入回测列表
testlist = random_code(stock_num, start_date, end_date)
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
print(f'回测策略:{strategy},回测时间:{start_date}{end_date}')
print(f'回测开始...')
# 对回测列表中的股票,逐个回测,并统计整体回测结果。
if type(strategy) is list:
strategy_list = strategy
else:
strategy_list = [strategy]
for straname in strategy_list:
sumvalue = 0
maxvalue = capital
minvalue = capital
win = 0
winvalue = 0
detail = []
nottrader = 0
for code in testlist:
# 创建cerebro
cerebro = bt.Cerebro()
# 取得股票历史数据
dataframe = get_data(code, strategy, start_date, end_date)
data = Addmoredata(dataname=dataframe, fromdate=start_date,
todate=end_date)
# 在Cerebro中添加股票数据
cerebro.adddata(data)
cerebro.broker.setcash(capital)
# 设置交易单位大小(FixedSize, stake = 5000)为固定股,AllInSizerInt为所有资金的百分比,PercentSizerInt可用资金的百分比,用(percents = 100)
cerebro.addsizer(bt.sizers.PercentSizer, percents=90)
# 设置佣金为万分之五
cerebro.broker.setcommission(commission=0.0005)
# 添加策略
cerebro.addstrategy(MyStrategy, poneplot=False)
# 发出信号当日收盘价成交
cerebro.broker.set_coc(True)
cerebro.addanalyzer(bt.analyzers.SQN)
try:
cerebro.run()
# 打印最后结果
# print(
# f'回测股票:{code}, 资产总值: {cerebro.broker.getvalue():.2f}, 盈利率:{cerebro.broker.getvalue() / capital * 100:.2f}%')
except Exception as e:
# 打印最后结果
print(f'回测股票:{code}, 出现错误;{e}')
nottrader += 1
# 绘图
# cerebro.plot(iplot=True)
value = cerebro.broker.getvalue()
detail.append(f'股票:{code}, 资产总值: {value:.2f}, 盈利率:{value / capital * 100:.2f}%')
sumvalue += value
if value > capital:
win += 1
winvalue += value
if value > maxvalue:
maxvalue = value
elif value == capital:
nottrader += 1
elif value < minvalue:
minvalue = value
print('-' * 80)
print(
f'回测股票:{code},资产总值: {cerebro.broker.getvalue():.2f},盈利:{cerebro.broker.getvalue() - capital:.2f},盈利率:{(cerebro.broker.getvalue() / capital - 1) * 100:.2f}%')
print('*' * 80)
print('=' * 80)
print('*' * 80)
print(
f'回测策略:{straname},回测时间:{start_date}{end_date}')
print(
f'预计回测股票数:{stock_num}个,实际回测股票数:{len(testlist)}个,回测无交易股票数:{nottrader}个,盈利股票数:{win}个,亏损股票数:{len(testlist) - win - nottrader}个')
print(
f'平均资产总值: {sumvalue / len(testlist):.2f},盈利率:{(sumvalue / (capital * len(testlist)) - 1) * 100:.2f}%')
if win != 0:
print(f'投资成功率:{win / (len(testlist) - nottrader) * 100:.2f}%,'
f'平均盈利:{(winvalue / win - capital) / capital * 100:.2f}%,'
f'最高盈利:{(maxvalue / capital - 1) * 100:.2f}%')
if len(testlist) - win - nottrader != 0:
print(f'投资失败率:{(len(testlist) - win - nottrader) / (len(testlist) - nottrader) * 100:.2f}%,'
f'平均亏损:{(capital - (sumvalue - winvalue - capital * nottrader) / (len(testlist) - nottrader - win)) / capital * 100:.2f}%,'
f'最大亏损:{(1 - minvalue / capital) * 100:.2f}%')
print('*' * 80)
# 股票混合回测程序
def same_test(strategy, code=None, start_date=None, days=365, stock_num=1):
if stock_num > 5:
stock_num = 5
if start_date is None:
start_date, end_date = random_times(start_date=None, addday=days)
else:
start_date, end_date = random_times(start_date=start_date, addday=days)
if code is None:
testlist = random_code(stock_num, start_date, end_date)
else:
codelist = check_codename(code)
stock_num = len(codelist)
testlist = conform_code(codelist, stock_num, start_date, end_date)
print('*' * 80)
print(f'=*=*= 股 票 混 合 回 测 =*=*=')
print(
f'回测策略:{strategy},回测股票数:{len(testlist)}个,回测天数:{days}天,回测时间:{start_date}{end_date[0]}-{end_date}')
cerebro = bt.Cerebro() # 创建cerebro
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
for code in testlist:
# 取得股票历史数据
dataframe = get_data(code,strategy, start_date,end_date)
data = Addmoredata(dataname=dataframe, fromdate=start_date,
todate=end_date)
# 在Cerebro中添加股票数据
cerebro.adddata(data)
cerebro.broker.setcash(capital)
# 设置交易单位大小(FixedSize, stake = 5000)为固定股,AllInSizerInt为所有资金的百分比,PercentSizerInt可用资金的百分比,用(percents = 100)
cerebro.addsizer(bt.sizers.PercentSizer, percents=90)
# 设置佣金为万分之五
cerebro.broker.setcommission(commission=0.0005)
# 添加策略
cerebro.addstrategy(MyStrategy, poneplot=False)
# 发出信号当日收盘价成交
cerebro.broker.set_coc(True)
cerebro.addanalyzer(bt.analyzers.SQN)
# 遍历所有数据
try:
cerebro.run()
# 打印最后结果
print('-' * 80)
print(f'最终符合回测周期股票数:{len(testlist)}个,回测股票:{testlist}')
print(
f'资产总值: {cerebro.broker.getvalue():.2f},盈利:{cerebro.broker.getvalue() - capital:.2f},盈利率:{(cerebro.broker.getvalue() / capital - 1) * 100:.2f}%')
print('*' * 80)
except Exception as e:
print(f'出现错误;{e}')
# 绘图
# cerebro.plot(iplot=True)
# b=Bokeh(style='bar',tabs='single')
# cerebro.plot(b)
def get_data(code,strategy, start_date, end_date):
df_tdx = pd.read_feather(f'{stockpath}/{code}.feather')
df_tdx.index = pd.to_datetime(df_tdx.date, format='%Y%m%d')
df_tdx_b = df_tdx.truncate(before=start_date, after=end_date)
df_tdx_b['openinterest'] = 0
df_tdx_b.rename(columns={f'{strategy}_trade': 'trade'}, inplace=True)
df_tdx_b = df_tdx_b[data_columns]
return df_tdx_b
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/east-king/quantitative-trading.git
git@gitee.com:east-king/quantitative-trading.git
east-king
quantitative-trading
量化交易
master

搜索帮助

23e8dbc6 1850385 7e0993f3 1850385