代码拉取完成,页面将自动刷新
import sys
sys.path.append('protosGen')
from config.constants import Constants
from common.roll_log import init_roll_log
import threading
from protosGen.common_pb2 import *
from protosGen.marketGateway_pb2 import *
from protosGen.orderreporting_pb2 import *
import time
from threading import Thread
import logging
from decimal import *
from datetime import datetime
from common import utils
strategy_name = "Moving bricks among exchanges Strategy"
logger = init_roll_log(strategy_name + ".log",logging.DEBUG)
# strategy params
exchanges = [Constants.Exchanges.HUOBI, Constants.Exchanges.BINANCE, Constants.Exchanges.OKEX, Constants.Exchanges.BITFINEX]
exchange_subaccount = {Constants.Exchanges.HUOBI : Constants.SUBACCOUNT_CODE_HUOBI,
Constants.Exchanges.OKEX : Constants.SUBACCOUNT_CODE_OKEX,
Constants.Exchanges.BITFINEX : Constants.SUBACCOUNT_CODE_BITFINEX,
Constants.Exchanges.BINANCE : Constants.SUBACCOUNT_CODE_BINANCE}
symbolCoordinator = utils.SymbolCoordinator(baseCurrency = "btc", quoteCurrency = "usdt")
spreadThreshold = 0
quantity = 0.01 # for demonstration
class PositionStrategy:
def __init__(self, market_stub, order_stub):
self.market_stub = market_stub
self.order_stub = order_stub
self.orders = {}
self.orderHistory = {}
self._stop_event = threading.Event()
self.depthHandlerThread = Thread(target=self.__subAndHandleDepth, name = '__subAndHandleDepth')
self.orderHandlerThread = Thread(target=self.__subAndHandleOrder, name = '__subAndHandleOrder')
def start(self):
self.depthHandlerThread.start()
self.orderHandlerThread.start()
def stop(self):
self._stop_event.set()
self.orderHandlerThread.join(5)
self.depthHandlerThread.join(5)
def __subAndHandleDepth(self):
depthResponses = self.market_stub.subDepth(self.__genSubDepthMsg())
self.__handleDepth(depthResponses)
def __genSubDepthMsg(self):
def threadSleep():
while not self._stop_event.is_set():
time.sleep(1)
subs = []
for exchange in exchanges:
s = Symbol(exchangeType=exchange.value,
baseCurrency=symbolCoordinator.get_base_currency(exchange),
quoteCurrency=symbolCoordinator.get_quote_currency(exchange))
subs.append(ReqSubDepth(symbol=s, isUnSub=False))
for sub in subs:
yield sub
yield threadSleep()
def __handleDepth(self, responses):
bestDepthCalculator = BestDepthCalculator(exchanges)
for depthsData in responses:
bestDepthCalculator.push_value(depthsData)
if bestDepthCalculator.is_data_ready():
self.__doTrade(bestDepthCalculator.get_best_bid(), bestDepthCalculator.get_best_ask(), bestDepthCalculator.get_max_volumn())
def __doTrade(self, best_bid, best_ask, quantity):
spread = (best_bid['price'] - best_ask['price'])/best_bid['price']
logger.info("current spread: {}, best bid: {} {}, best ask: {} {}".format(spread, best_bid['exchange'], best_bid['price'], best_ask['exchange'], best_ask['price']))
if spread > spreadThreshold:
buyExchange = Constants.Exchanges(best_ask['exchange'])
order_response = self.order_stub.insertOrder(InsertOrderRequest(
subaccountCode=exchange_subaccount[buyExchange],
symbol=Symbol(
exchangeType=best_ask['exchange'],
baseCurrency=symbolCoordinator.get_base_currency(buyExchange),
quoteCurrency=symbolCoordinator.get_quote_currency(buyExchange)),
quantity=str(quantity),
price=str(best_ask['price']),
type=Constants.OrderType.LIMIT.value,
side=Constants.OrderSide.BUY.value))
if order_response.CRsp.isSuccess:
self.orders[order_response.orderId] = Constants.OrderStatus.INSERT_SUCCESS
logger.info("Insert buy order success, orderId: " + order_response.orderId)
else:
logger.error("Insert buy order failed, errCode: " + order_response.CRsp.errCode + ". errMsg: " + order_response.CRsp.errMsg)
return
sellExchange = Constants.Exchanges(best_bid['exchange'])
order_response = self.order_stub.insertOrder(InsertOrderRequest(
subaccountCode = exchange_subaccount[sellExchange],
symbol = Symbol(
exchangeType = best_bid['exchange'],
baseCurrency = symbolCoordinator.get_base_currency(sellExchange),
quoteCurrency = symbolCoordinator.get_quote_currency(sellExchange)),
quantity = str(quantity),
price = str(best_bid['price']),
type = Constants.OrderType.LIMIT.value,
side = Constants.OrderSide.SELL.value))
if order_response.CRsp.isSuccess:
self.orders[order_response.orderId] = Constants.OrderStatus.INSERT_SUCCESS
logger.info("Insert sell order success, orderId: " + order_response.orderId)
else:
logger.error("Insert sell order failed, errCode: " + order_response.CRsp.errCode + ". errMsg: " + order_response.CRsp.errMsg)
return
def __subAndHandleOrder(self):
orderResponses = self.order_stub.subOrder(self.__genSubOrderMsg())
self.__handleOrder(orderResponses)
def __genSubOrderMsg(self):
def threadSleep():
while not self._stop_event.is_set():
time.sleep(1)
for subaccount in exchange_subaccount.values():
yield ReqSubOrder(subaccountCode = subaccount)
yield threadSleep()
def __handleOrder(self, responses):
for rtnSubOrder in responses:
order = rtnSubOrder.order
self.orderHistory[order.orderId] = order
logger.info("Order Status updated, orderStatus:" + str(order.orderStatus) + ", exchangeOrderId:" + str(order.exchangeOrderId) + ", orderId:" + str(order.orderId))
if order.orderStatus not in ['canceled', 'filled', 'exch_rejected', 'tg_connection_error','partially_canceled']:
logger.info("Cancel order:" + order.orderId)
cancel_order_response = self.order_stub.cancelOrder(CancelOrderRequest(orderId = order.orderId))
if not cancel_order_response.CRsp.isSuccess:
logger.error("Insert cancel order failed, errCode: " + cancel_order_response.CRsp.errCode + ". errMsg: " + cancel_order_response.CRsp.errMsg)
class BestDepthCalculator:
def __init__(self, allExchanges):
self.exchange_bidask = {}
self.best_bid = {}
self.best_ask = {}
self.allExchanges = allExchanges
def push_value(self, depthsData):
# if isinstance(type(depthsData), DepthsData):
if depthsData.CRsp.isSuccess:
exchange = depthsData.data.symbol.exchangeType
bid = self.__toFloatDepth(depthsData.data.bids[0])
ask = self.__toFloatDepth(depthsData.data.asks[0])
if exchange not in self.exchange_bidask:
self.exchange_bidask[exchange] = {}
self.exchange_bidask[exchange]['bid'] = bid
self.exchange_bidask[exchange]['ask'] = ask
self.exchange_bidask[exchange]['time'] = datetime.utcnow()
#calculate best bid and best ask
for ex in self.exchange_bidask.keys():
if self.exchange_bidask[ex]['bid']['price'] <= bid['price']:
self.best_bid['price'] = bid['price']
self.best_bid['quantity'] = bid['quantity']
self.best_bid['exchange'] = exchange
else:
self.best_bid['price'] = self.exchange_bidask[ex]['bid']['price']
self.best_bid['quantity'] = self.exchange_bidask[ex]['bid']['quantity']
self.best_bid['exchange'] = ex
if self.exchange_bidask[ex]['ask']['price'] >= ask['price']:
self.best_ask['price'] = ask['price']
self.best_ask['quantity'] = ask['quantity']
self.best_ask['exchange'] = exchange
else:
self.best_ask['price'] = self.exchange_bidask[ex]['ask']['price']
self.best_ask['quantity'] = self.exchange_bidask[ex]['ask']['quantity']
self.best_ask['exchange'] = ex
def is_data_ready(self):
for exchange in self.allExchanges:
if exchange.value not in self.exchange_bidask:
return False
return self.__checkUpdateTime()
def get_best_bid(self):
return self.best_bid
def get_best_ask(self):
return self.best_ask
def get_max_volumn(self):
# return self.best_bid['quantity'] if self.best_bid['quantity'] < self.best_ask['quantity'] else self.best_ask['quantity']
return quantity # for demonstration
def __toFloatDepth(self, depth):
floatDepth = {}
floatDepth['price'] = float(depth.price)
floatDepth['quantity'] = float(depth.quantity)
return floatDepth
def __checkUpdateTime(self):
return True # todo
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。