代码拉取完成,页面将自动刷新
import sys
sys.path.append('protosGen')
from config.constants import Constants
from protosGen.marketGateway_pb2_grpc import *
from protosGen.login_pb2 import *
from protosGen.login_pb2_grpc import *
from protosGen.orderreporting_pb2 import *
from protosGen.orderreporting_pb2_grpc import *
import time
from common import header_manipulator_client_interceptor
import signal
import ma_strategy
import position_strategy
class RouweiDemo:
def __init__(self):
channelAuth = grpc.insecure_channel(Constants.HOST + ":" + Constants.PORT_AUTH)
loginStub = LoginStub(channelAuth)
rspLogin = loginStub.login(ReqLogin(
username=Constants.USER_NAME,
password=Constants.USER_PASSWORD))
if not rspLogin.success:
print("rspLogin.message:" + str(rspLogin.message))
print("rspLogin.success:" + str(rspLogin.success))
print("rspLogin.token:" + str(rspLogin.token))
return
header_adder_interceptor = header_manipulator_client_interceptor.header_adder_interceptor('token', rspLogin.token)
intercept_channel = grpc.intercept_channel(channelAuth, header_adder_interceptor)
order_stub = OrderReportingStub(intercept_channel)
channelPublic = grpc.insecure_channel(Constants.HOST + ':' + Constants.PORT_PUBLIC)
mdmStub = MarketDataManageStub(channelPublic)
# maStrategy = ma_strategy.MaStrategy(mdmStub, order_stub)
# maStrategy.start()
# print(ma_strategy.strategy_name + " started, you could follow its logs in the output folder.")
positionStrategy = position_strategy.PositionStrategy(mdmStub, order_stub)
positionStrategy.start()
print(position_strategy.strategy_name + " started, you could follow its logs in the output folder.")
# Safely exit using keybord: Ctrl + c
def shutdown(signal, frame):
# maStrategy.stop()
positionStrategy.stop()
sys.exit()
for sig in [signal.SIGINT, signal.SIGTERM]:
signal.signal(sig, shutdown)
while True:
time.sleep(360)
if __name__ == '__main__':
RouweiDemo()
print('--end--')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。