Lean  $LEAN_TAG$
BrokerageTransactionHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Concurrent;
18 using System.Collections.Generic;
19 using System.Linq;
20 using System.Runtime.CompilerServices;
21 using System.Threading;
26 using QuantConnect.Logging;
27 using QuantConnect.Orders;
31 using QuantConnect.Util;
32 
34 {
35  /// <summary>
36  /// Transaction handler for all brokerages
37  /// </summary>
39  {
40  private IAlgorithm _algorithm;
41  private IBrokerage _brokerage;
42  private bool _brokerageIsBacktesting;
43  private bool _loggedFeeAdjustmentWarning;
44 
45  // Counter to keep track of total amount of processed orders
46  private int _totalOrderCount;
47 
48  // this bool is used to check if the warning message for the rounding of order quantity has been displayed for the first time
49  private bool _firstRoundOffMessage = false;
50 
51  // this value is used for determining how confident we are in our cash balance update
52  private long _lastFillTimeTicks;
53 
54  private const int MaxCashSyncAttempts = 5;
55  private int _failedCashSyncAttempts;
56 
57  /// <summary>
58  /// OrderQueue holds the newly updated orders from the user algorithm waiting to be processed. Once
59  /// orders are processed they are moved into the Orders queue awaiting the brokerage response.
60  /// </summary>
62 
63  private Thread _processingThread;
64  private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
65 
66  private readonly ConcurrentQueue<OrderEvent> _orderEvents = new ConcurrentQueue<OrderEvent>();
67 
68  /// <summary>
69  /// The _completeOrders dictionary holds all orders.
70  /// Once the transaction thread has worked on them they get put here while witing for fill updates.
71  /// </summary>
72  private readonly ConcurrentDictionary<int, Order> _completeOrders = new ConcurrentDictionary<int, Order>();
73 
74  /// <summary>
75  /// The orders dictionary holds orders which are open. Status: New, Submitted, PartiallyFilled, None, CancelPending
76  /// Once the transaction thread has worked on them they get put here while witing for fill updates.
77  /// </summary>
78  private readonly ConcurrentDictionary<int, Order> _openOrders = new ConcurrentDictionary<int, Order>();
79 
80  /// <summary>
81  /// The _openOrderTickets dictionary holds open order tickets that the algorithm can use to reference a specific order. This
82  /// includes invoking update and cancel commands. In the future, we can add more features to the ticket, such as events
83  /// and async events (such as run this code when this order fills)
84  /// </summary>
85  private readonly ConcurrentDictionary<int, OrderTicket> _openOrderTickets = new ConcurrentDictionary<int, OrderTicket>();
86 
87  /// <summary>
88  /// The _completeOrderTickets dictionary holds all order tickets that the algorithm can use to reference a specific order. This
89  /// includes invoking update and cancel commands. In the future, we can add more features to the ticket, such as events
90  /// and async events (such as run this code when this order fills)
91  /// </summary>
92  private readonly ConcurrentDictionary<int, OrderTicket> _completeOrderTickets = new ConcurrentDictionary<int, OrderTicket>();
93 
94  /// <summary>
95  /// Cache collection of price adjustment modes for each symbol
96  /// </summary>
97  private readonly Dictionary<Symbol, DataNormalizationMode> _priceAdjustmentModes = new Dictionary<Symbol, DataNormalizationMode>();
98 
99  /// <summary>
100  /// The _cancelPendingOrders instance will help to keep track of CancelPending orders and their Status
101  /// </summary>
103 
104  private IResultHandler _resultHandler;
105 
106  private readonly object _lockHandleOrderEvent = new object();
107 
108  /// <summary>
109  /// Event fired when there is a new <see cref="OrderEvent"/>
110  /// </summary>
111  public event EventHandler<OrderEvent> NewOrderEvent;
112 
113  /// <summary>
114  /// Gets the permanent storage for all orders
115  /// </summary>
116  public ConcurrentDictionary<int, Order> Orders
117  {
118  get
119  {
120  return _completeOrders;
121  }
122  }
123 
124  /// <summary>
125  /// Gets all order events
126  /// </summary>
127  public IEnumerable<OrderEvent> OrderEvents => _orderEvents;
128 
129  /// <summary>
130  /// Gets the permanent storage for all order tickets
131  /// </summary>
132  public ConcurrentDictionary<int, OrderTicket> OrderTickets
133  {
134  get
135  {
136  return _completeOrderTickets;
137  }
138  }
139 
140  /// <summary>
141  /// Gets the current number of orders that have been processed
142  /// </summary>
143  public int OrdersCount => _totalOrderCount;
144 
145  /// <summary>
146  /// Creates a new BrokerageTransactionHandler to process orders using the specified brokerage implementation
147  /// </summary>
148  /// <param name="algorithm">The algorithm instance</param>
149  /// <param name="brokerage">The brokerage implementation to process orders and fire fill events</param>
150  /// <param name="resultHandler"></param>
151  public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResultHandler resultHandler)
152  {
153  if (brokerage == null)
154  {
155  throw new ArgumentNullException(nameof(brokerage));
156  }
157  // multi threaded queue, used for live deployments
159  // we don't need to do this today because we just initialized/synced
160  _resultHandler = resultHandler;
161 
162  _brokerage = brokerage;
163  _brokerageIsBacktesting = brokerage is BacktestingBrokerage;
164 
165  _brokerage.OrdersStatusChanged += (sender, orderEvents) =>
166  {
167  HandleOrderEvents(orderEvents);
168  };
169 
170  _brokerage.AccountChanged += (sender, account) =>
171  {
172  HandleAccountChanged(account);
173  };
174 
175  _brokerage.OptionPositionAssigned += (sender, fill) =>
176  {
177  HandlePositionAssigned(fill);
178  };
179 
180  _brokerage.OptionNotification += (sender, e) =>
181  {
182  HandleOptionNotification(e);
183  };
184 
185  _brokerage.NewBrokerageOrderNotification += (sender, e) =>
186  {
187  HandleNewBrokerageSideOrder(e);
188  };
189 
190  _brokerage.DelistingNotification += (sender, e) =>
191  {
192  HandleDelistingNotification(e);
193  };
194 
195  _brokerage.OrderIdChanged += (sender, e) =>
196  {
197  HandlerBrokerageOrderIdChangedEvent(e);
198  };
199 
200  _brokerage.OrderUpdated += (sender, e) =>
201  {
202  HandleOrderUpdated(e);
203  };
204 
205  IsActive = true;
206 
207  _algorithm = algorithm;
209  }
210 
211  /// <summary>
212  /// Create and start the transaction thread, who will be in charge of processing
213  /// the order requests
214  /// </summary>
215  protected virtual void InitializeTransactionThread()
216  {
217  _processingThread = new Thread(Run) { IsBackground = true, Name = "Transaction Thread" };
218  _processingThread.Start();
219  }
220 
221  /// <summary>
222  /// Boolean flag indicating the Run thread method is busy.
223  /// False indicates it is completely finished processing and ready to be terminated.
224  /// </summary>
225  public bool IsActive { get; private set; }
226 
227  #region Order Request Processing
228 
229  /// <summary>
230  /// Adds the specified order to be processed
231  /// </summary>
232  /// <param name="request">The order to be processed</param>
234  {
235  if (_algorithm.LiveMode)
236  {
237  Log.Trace("BrokerageTransactionHandler.Process(): " + request);
238 
239  _algorithm.Portfolio.LogMarginInformation(request);
240  }
241 
242  switch (request.OrderRequestType)
243  {
244  case OrderRequestType.Submit:
245  return AddOrder((SubmitOrderRequest)request);
246 
247  case OrderRequestType.Update:
248  return UpdateOrder((UpdateOrderRequest)request);
249 
250  case OrderRequestType.Cancel:
251  return CancelOrder((CancelOrderRequest)request);
252 
253  default:
254  throw new ArgumentOutOfRangeException();
255  }
256  }
257 
258  /// <summary>
259  /// Add an order to collection and return the unique order id or negative if an error.
260  /// </summary>
261  /// <param name="request">A request detailing the order to be submitted</param>
262  /// <returns>New unique, increasing orderid</returns>
264  {
265  var response = !_algorithm.IsWarmingUp
266  ? OrderResponse.Success(request)
267  : OrderResponse.WarmingUp(request);
268 
269  var shortable = true;
270  if (request.Quantity < 0)
271  {
272  shortable = _algorithm.Shortable(request.Symbol, request.Quantity);
273  }
274 
275  if (!shortable)
276  {
277  var message = GetShortableErrorMessage(request.Symbol, request.Quantity);
278  if (_algorithm.LiveMode)
279  {
280  // in live mode we send a warning but we wont block the order being sent to the brokerage
281  _algorithm.Debug($"Warning: {message}");
282  }
283  else
284  {
285  response = OrderResponse.Error(request, OrderResponseErrorCode.ExceedsShortableQuantity, message);
286  }
287  }
288 
289  request.SetResponse(response);
290  var ticket = new OrderTicket(_algorithm.Transactions, request);
291 
292  Interlocked.Increment(ref _totalOrderCount);
293  // send the order to be processed after creating the ticket
294  if (response.IsSuccess)
295  {
296  _openOrderTickets.TryAdd(ticket.OrderId, ticket);
297  _completeOrderTickets.TryAdd(ticket.OrderId, ticket);
298  _orderRequestQueue.Add(request);
299 
300  // wait for the transaction handler to set the order reference into the new order ticket,
301  // so we can ensure the order has already been added to the open orders,
302  // before returning the ticket to the algorithm.
303  WaitForOrderSubmission(ticket);
304  }
305  else
306  {
307  // add it to the orders collection for recall later
308  var order = Order.CreateOrder(request);
309  var orderTag = response.ErrorCode == OrderResponseErrorCode.AlgorithmWarmingUp
310  ? "Algorithm warming up."
311  : response.ErrorMessage;
312 
313  // ensure the order is tagged with a currency
314  var security = _algorithm.Securities[order.Symbol];
315  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
316 
317  order.Status = OrderStatus.Invalid;
318  order.Tag = orderTag;
319  ticket.SetOrder(order);
320  _completeOrderTickets.TryAdd(ticket.OrderId, ticket);
321  _completeOrders.TryAdd(order.Id, order);
322 
323  HandleOrderEvent(new OrderEvent(order,
324  _algorithm.UtcTime,
325  OrderFee.Zero,
326  orderTag));
327  }
328  return ticket;
329  }
330 
331  /// <summary>
332  /// Wait for the order to be handled by the <see cref="_processingThread"/>
333  /// </summary>
334  /// <param name="ticket">The <see cref="OrderTicket"/> expecting to be submitted</param>
335  protected virtual void WaitForOrderSubmission(OrderTicket ticket)
336  {
337  var orderSetTimeout = Time.OneSecond;
338  if (!ticket.OrderSet.WaitOne(orderSetTimeout))
339  {
340  Log.Error("BrokerageTransactionHandler.WaitForOrderSubmission(): " +
341  $"The order request (Id={ticket.OrderId}) was not submitted within {orderSetTimeout.TotalSeconds} second(s).");
342  }
343  }
344 
345  /// <summary>
346  /// Update an order yet to be filled such as stop or limit orders.
347  /// </summary>
348  /// <param name="request">Request detailing how the order should be updated</param>
349  /// <remarks>Does not apply if the order is already fully filled</remarks>
351  {
352  OrderTicket ticket;
353  if (!_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
354  {
355  return OrderTicket.InvalidUpdateOrderId(_algorithm.Transactions, request);
356  }
357 
358  ticket.AddUpdateRequest(request);
359 
360  try
361  {
362  //Update the order from the behaviour
363  var order = GetOrderByIdInternal(request.OrderId);
364  var orderQuantity = request.Quantity ?? ticket.Quantity;
365 
366  var shortable = true;
367  if (order?.Direction == OrderDirection.Sell || orderQuantity < 0)
368  {
369  shortable = _algorithm.Shortable(ticket.Symbol, orderQuantity, order.Id);
370 
371  if (_algorithm.LiveMode && !shortable)
372  {
373  // let's override and just send warning
374  shortable = true;
375 
376  _algorithm.Debug($"Warning: {GetShortableErrorMessage(ticket.Symbol, ticket.Quantity)}");
377  }
378  }
379 
380  if (order == null)
381  {
382  // can't update an order that doesn't exist!
383  Log.Error("BrokerageTransactionHandler.Update(): Cannot update a null order");
384  request.SetResponse(OrderResponse.UnableToFindOrder(request));
385  }
386  else if (order.Status == OrderStatus.New)
387  {
388  // can't update a pending submit order
389  Log.Error("BrokerageTransactionHandler.Update(): Cannot update a pending submit order with status " + order.Status);
390  request.SetResponse(OrderResponse.InvalidNewStatus(request, order));
391  }
392  else if (order.Status.IsClosed() && !request.IsAllowedForClosedOrder())
393  {
394  // can't update a completed order
395  Log.Error("BrokerageTransactionHandler.Update(): Cannot update closed order with status " + order.Status);
396  request.SetResponse(OrderResponse.InvalidStatus(request, order));
397  }
398  else if (request.Quantity.HasValue && request.Quantity.Value == 0)
399  {
400  request.SetResponse(OrderResponse.ZeroQuantity(request));
401  }
402  else if (_algorithm.IsWarmingUp)
403  {
404  request.SetResponse(OrderResponse.WarmingUp(request));
405  }
406  else if (!shortable)
407  {
408  var shortableResponse = OrderResponse.Error(request, OrderResponseErrorCode.ExceedsShortableQuantity,
409  GetShortableErrorMessage(ticket.Symbol, ticket.Quantity));
410 
411  request.SetResponse(shortableResponse);
412  }
413  else
414  {
415  request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
416  _orderRequestQueue.Add(request);
417  }
418  }
419  catch (Exception err)
420  {
421  Log.Error(err);
422  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, err.Message));
423  }
424 
425  return ticket;
426  }
427 
428  /// <summary>
429  /// Remove this order from outstanding queue: user is requesting a cancel.
430  /// </summary>
431  /// <param name="request">Request containing the specific order id to remove</param>
433  {
434  OrderTicket ticket;
435  if (!_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
436  {
437  Log.Error("BrokerageTransactionHandler.CancelOrder(): Unable to locate ticket for order.");
438  return OrderTicket.InvalidCancelOrderId(_algorithm.Transactions, request);
439  }
440 
441  try
442  {
443  // if we couldn't set this request as the cancellation then another thread/someone
444  // else is already doing it or it in fact has already been cancelled
445  if (!ticket.TrySetCancelRequest(request))
446  {
447  // the ticket has already been cancelled
448  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.InvalidRequest, "Cancellation is already in progress."));
449  return ticket;
450  }
451 
452  //Error check
453  var order = GetOrderByIdInternal(request.OrderId);
454  if (order != null && request.Tag != null)
455  {
456  order.Tag = request.Tag;
457  }
458  if (order == null)
459  {
460  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot find this id.");
461  request.SetResponse(OrderResponse.UnableToFindOrder(request));
462  }
463  else if (order.Status == OrderStatus.New)
464  {
465  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot cancel order with status: " + order.Status);
466  request.SetResponse(OrderResponse.InvalidNewStatus(request, order));
467  }
468  else if (order.Status.IsClosed())
469  {
470  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot cancel order already " + order.Status);
471  request.SetResponse(OrderResponse.InvalidStatus(request, order));
472  }
473  else if (_algorithm.IsWarmingUp)
474  {
475  request.SetResponse(OrderResponse.WarmingUp(request));
476  }
477  else
478  {
479  _cancelPendingOrders.Set(order.Id, order.Status);
480  // update the order status
481  order.Status = OrderStatus.CancelPending;
482 
483  // notify the algorithm with an order event
484  HandleOrderEvent(new OrderEvent(order,
485  _algorithm.UtcTime,
486  OrderFee.Zero));
487 
488  // send the request to be processed
489  request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
490  _orderRequestQueue.Add(request);
491  }
492  }
493  catch (Exception err)
494  {
495  Log.Error(err);
496  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, err.Message));
497  }
498 
499  return ticket;
500  }
501 
502  /// <summary>
503  /// Gets and enumerable of <see cref="OrderTicket"/> matching the specified <paramref name="filter"/>
504  /// </summary>
505  /// <param name="filter">The filter predicate used to find the required order tickets</param>
506  /// <returns>An enumerable of <see cref="OrderTicket"/> matching the specified <paramref name="filter"/></returns>
507  public IEnumerable<OrderTicket> GetOrderTickets(Func<OrderTicket, bool> filter = null)
508  {
509  return _completeOrderTickets.Select(x => x.Value).Where(filter ?? (x => true));
510  }
511 
512  /// <summary>
513  /// Gets and enumerable of opened <see cref="OrderTicket"/> matching the specified <paramref name="filter"/>
514  /// </summary>
515  /// <param name="filter">The filter predicate used to find the required order tickets</param>
516  /// <returns>An enumerable of opened <see cref="OrderTicket"/> matching the specified <paramref name="filter"/></returns>
517  public IEnumerable<OrderTicket> GetOpenOrderTickets(Func<OrderTicket, bool> filter = null)
518  {
519  return _openOrderTickets.Select(x => x.Value).Where(filter ?? (x => true));
520  }
521 
522  /// <summary>
523  /// Gets the order ticket for the specified order id. Returns null if not found
524  /// </summary>
525  /// <param name="orderId">The order's id</param>
526  /// <returns>The order ticket with the specified id, or null if not found</returns>
527  public OrderTicket GetOrderTicket(int orderId)
528  {
529  OrderTicket ticket;
530  _completeOrderTickets.TryGetValue(orderId, out ticket);
531  return ticket;
532  }
533 
534  #endregion
535 
536  /// <summary>
537  /// Get the order by its id
538  /// </summary>
539  /// <param name="orderId">Order id to fetch</param>
540  /// <returns>A clone of the order with the specified id, or null if no match is found</returns>
541  public Order GetOrderById(int orderId)
542  {
543  Order order = GetOrderByIdInternal(orderId);
544  return order?.Clone();
545  }
546 
547  private Order GetOrderByIdInternal(int orderId)
548  {
549  Order order;
550  return _completeOrders.TryGetValue(orderId, out order) ? order : null;
551  }
552 
553  /// <summary>
554  /// Gets the order by its brokerage id
555  /// </summary>
556  /// <param name="brokerageId">The brokerage id to fetch</param>
557  /// <returns>The first order matching the brokerage id, or null if no match is found</returns>
558  public List<Order> GetOrdersByBrokerageId(string brokerageId)
559  {
560  var openOrders = GetOrdersByBrokerageId(brokerageId, _openOrders);
561 
562  if (openOrders.Count > 0)
563  {
564  return openOrders;
565  }
566 
567  return GetOrdersByBrokerageId(brokerageId, _completeOrders);
568  }
569 
570  private static List<Order> GetOrdersByBrokerageId(string brokerageId, ConcurrentDictionary<int, Order> orders)
571  {
572  return orders
573  .Where(x => x.Value.BrokerId.Contains(brokerageId))
574  .Select(kvp => kvp.Value.Clone())
575  .ToList();
576  }
577 
578  /// <summary>
579  /// Gets all orders matching the specified filter. Specifying null will return an enumerable
580  /// of all orders.
581  /// </summary>
582  /// <param name="filter">Delegate used to filter the orders</param>
583  /// <returns>All orders this order provider currently holds by the specified filter</returns>
584  public IEnumerable<Order> GetOrders(Func<Order, bool> filter = null)
585  {
586  if (filter != null)
587  {
588  // return a clone to prevent object reference shenanigans, you must submit a request to change the order
589  return _completeOrders.Select(x => x.Value).Where(filter).Select(x => x.Clone());
590  }
591  return _completeOrders.Select(x => x.Value).Select(x => x.Clone());
592  }
593 
594  /// <summary>
595  /// Gets open orders matching the specified filter
596  /// </summary>
597  /// <param name="filter">Delegate used to filter the orders</param>
598  /// <returns>All open orders this order provider currently holds</returns>
599  public List<Order> GetOpenOrders(Func<Order, bool> filter = null)
600  {
601  if (filter != null)
602  {
603  // return a clone to prevent object reference shenanigans, you must submit a request to change the order
604  return _openOrders.Select(x => x.Value).Where(filter).Select(x => x.Clone()).ToList();
605  }
606  return _openOrders.Select(x => x.Value).Select(x => x.Clone()).ToList();
607  }
608 
609  /// <summary>
610  /// Primary thread entry point to launch the transaction thread.
611  /// </summary>
612  protected void Run()
613  {
614  try
615  {
616  foreach (var request in _orderRequestQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
617  {
618  HandleOrderRequest(request);
620  }
621  }
622  catch (Exception err)
623  {
624  // unexpected error, we need to close down shop
625  _algorithm.SetRuntimeError(err, "HandleOrderRequest");
626  }
627 
628  if (_processingThread != null)
629  {
630  Log.Trace("BrokerageTransactionHandler.Run(): Ending Thread...");
631  IsActive = false;
632  }
633  }
634 
635  /// <summary>
636  /// Processes asynchronous events on the transaction handler's thread
637  /// </summary>
638  public virtual void ProcessAsynchronousEvents()
639  {
640  // NOP
641  }
642 
643  /// <summary>
644  /// Processes all synchronous events that must take place before the next time loop for the algorithm
645  /// </summary>
646  public virtual void ProcessSynchronousEvents()
647  {
648  // how to do synchronous market orders for real brokerages?
649 
650  // in backtesting we need to wait for orders to be removed from the queue and finished processing
651  if (!_algorithm.LiveMode)
652  {
653  if (_orderRequestQueue.IsBusy && !_orderRequestQueue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))
654  {
655  Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing.");
656  }
657  return;
658  }
659 
660  // check if the brokerage should perform cash sync now
661  if (!_algorithm.IsWarmingUp && _brokerage.ShouldPerformCashSync(CurrentTimeUtc))
662  {
663  // only perform cash syncs if we haven't had a fill for at least 10 seconds
664  if (TimeSinceLastFill > TimeSpan.FromSeconds(10))
665  {
666  if (!_brokerage.PerformCashSync(_algorithm, CurrentTimeUtc, () => TimeSinceLastFill))
667  {
668  if (++_failedCashSyncAttempts >= MaxCashSyncAttempts)
669  {
670  throw new Exception("The maximum number of attempts for brokerage cash sync has been reached.");
671  }
672  }
673  }
674  }
675 
676  // we want to remove orders older than 10k records, but only in live mode
677  const int maxOrdersToKeep = 10000;
678  if (_completeOrders.Count < maxOrdersToKeep + 1)
679  {
680  return;
681  }
682 
683  Log.Debug("BrokerageTransactionHandler.ProcessSynchronousEvents(): Start removing old orders...");
684  var max = _completeOrders.Max(x => x.Key);
685  var lowestOrderIdToKeep = max - maxOrdersToKeep;
686  foreach (var item in _completeOrders.Where(x => x.Key <= lowestOrderIdToKeep))
687  {
688  Order value;
689  OrderTicket ticket;
690  _completeOrders.TryRemove(item.Key, out value);
691  _completeOrderTickets.TryRemove(item.Key, out ticket);
692  }
693 
694  Log.Debug($"BrokerageTransactionHandler.ProcessSynchronousEvents(): New order count {_completeOrders.Count}. Exit");
695  }
696 
697  /// <summary>
698  /// Register an already open Order
699  /// </summary>
700  public void AddOpenOrder(Order order, IAlgorithm algorithm)
701  {
702  if (order.Status == OrderStatus.New || order.Status == OrderStatus.None)
703  {
704  // make sure we have a valid order status
705  order.Status = OrderStatus.Submitted;
706  }
707 
708  order.Id = algorithm.Transactions.GetIncrementOrderId();
709 
710  var orderTicket = order.ToOrderTicket(algorithm.Transactions);
711 
712  SetPriceAdjustmentMode(order, algorithm);
713 
714  _openOrders.AddOrUpdate(order.Id, order, (i, o) => order);
715  _completeOrders.AddOrUpdate(order.Id, order, (i, o) => order);
716  _openOrderTickets.AddOrUpdate(order.Id, orderTicket);
717  _completeOrderTickets.AddOrUpdate(order.Id, orderTicket);
718 
719  Interlocked.Increment(ref _totalOrderCount);
720  }
721 
722 
723  /// <summary>
724  /// Signal a end of thread request to stop monitoring the transactions.
725  /// </summary>
726  public void Exit()
727  {
728  var timeout = TimeSpan.FromSeconds(60);
729  if (_processingThread != null)
730  {
731  // only wait if the processing thread is running
732  if (_orderRequestQueue.IsBusy && !_orderRequestQueue.WaitHandle.WaitOne(timeout))
733  {
734  Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds.");
735  }
736  }
737 
738  _processingThread?.StopSafely(timeout, _cancellationTokenSource);
739  IsActive = false;
740  _cancellationTokenSource.DisposeSafely();
741  }
742 
743  /// <summary>
744  /// Handles a generic order request
745  /// </summary>
746  /// <param name="request"><see cref="OrderRequest"/> to be handled</param>
747  /// <returns><see cref="OrderResponse"/> for request</returns>
748  public void HandleOrderRequest(OrderRequest request)
749  {
750  OrderResponse response;
751  switch (request.OrderRequestType)
752  {
753  case OrderRequestType.Submit:
754  response = HandleSubmitOrderRequest((SubmitOrderRequest)request);
755  break;
756  case OrderRequestType.Update:
757  response = HandleUpdateOrderRequest((UpdateOrderRequest)request);
758  break;
759  case OrderRequestType.Cancel:
760  response = HandleCancelOrderRequest((CancelOrderRequest)request);
761  break;
762  default:
763  throw new ArgumentOutOfRangeException();
764  }
765 
766  // mark request as processed
767  request.SetResponse(response, OrderRequestStatus.Processed);
768  }
769 
770  /// <summary>
771  /// Handles a request to submit a new order
772  /// </summary>
773  private OrderResponse HandleSubmitOrderRequest(SubmitOrderRequest request)
774  {
775  OrderTicket ticket;
776  var order = Order.CreateOrder(request);
777 
778  // ensure the order is tagged with a currency
779  var security = _algorithm.Securities[order.Symbol];
780  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
781  if (string.IsNullOrEmpty(order.Tag))
782  {
783  order.Tag = order.GetDefaultTag();
784  }
785 
786  // rounds off the order towards 0 to the nearest multiple of lot size
787  order.Quantity = RoundOffOrder(order, security);
788 
789  if (!_openOrders.TryAdd(order.Id, order) || !_completeOrders.TryAdd(order.Id, order))
790  {
791  Log.Error("BrokerageTransactionHandler.HandleSubmitOrderRequest(): Unable to add new order, order not processed.");
792  return OrderResponse.Error(request, OrderResponseErrorCode.OrderAlreadyExists, "Cannot process submit request because order with id {0} already exists");
793  }
794  if (!_completeOrderTickets.TryGetValue(order.Id, out ticket))
795  {
796  Log.Error("BrokerageTransactionHandler.HandleSubmitOrderRequest(): Unable to retrieve order ticket, order not processed.");
797  return OrderResponse.UnableToFindOrder(request);
798  }
799 
800  var comboIsReady = order.TryGetGroupOrders(TryGetOrder, out var orders);
801  var comboSecuritiesFound = orders.TryGetGroupOrdersSecurities(_algorithm.Portfolio, out var securities);
802 
803  // rounds the order prices
804  RoundOrderPrices(order, security, comboIsReady, securities);
805 
806  // save current security prices
807  order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
808 
809  // Set order price adjustment mode
810  SetPriceAdjustmentMode(order, _algorithm);
811 
812  // update the ticket's internal storage with this new order reference
813  ticket.SetOrder(order);
814 
815  if (!comboIsReady)
816  {
817  // an Order of the group is missing
818  return OrderResponse.Success(request);
819  }
820 
821  if (orders.Any(o => o.Quantity == 0))
822  {
823  var response = OrderResponse.ZeroQuantity(request);
824  _algorithm.Error(response.ErrorMessage);
825 
826  InvalidateOrders(orders, response.ErrorMessage);
827  return response;
828  }
829 
830  if (!comboSecuritiesFound)
831  {
832  var response = OrderResponse.MissingSecurity(request);
833  _algorithm.Error(response.ErrorMessage);
834 
835  InvalidateOrders(orders, response.ErrorMessage);
836  return response;
837  }
838 
839  // check to see if we have enough money to place the order
840  HasSufficientBuyingPowerForOrderResult hasSufficientBuyingPowerResult;
841  try
842  {
843  hasSufficientBuyingPowerResult = _algorithm.Portfolio.HasSufficientBuyingPowerForOrder(orders);
844  }
845  catch (Exception err)
846  {
847  Log.Error(err);
848  _algorithm.Error($"Order Error: id: {order.Id.ToStringInvariant()}, Error executing margin models: {err.Message}");
849  HandleOrderEvent(new OrderEvent(order,
850  _algorithm.UtcTime,
851  OrderFee.Zero,
852  "Error executing margin models"));
853  return OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, "Error in GetSufficientCapitalForOrder");
854  }
855 
856  if (!hasSufficientBuyingPowerResult.IsSufficient)
857  {
858  var errorMessage = securities.GetErrorMessage(hasSufficientBuyingPowerResult);
859  _algorithm.Error(errorMessage);
860 
861  InvalidateOrders(orders, errorMessage);
862  return OrderResponse.Error(request, OrderResponseErrorCode.InsufficientBuyingPower, errorMessage);
863  }
864 
865  // verify that our current brokerage can actually take the order
866  foreach (var kvp in securities)
867  {
868  if (!_algorithm.BrokerageModel.CanSubmitOrder(kvp.Value, kvp.Key, out var message))
869  {
870  var errorMessage = $"BrokerageModel declared unable to submit order: [{string.Join(",", orders.Select(o => o.Id))}]";
871 
872  // if we couldn't actually process the order, mark it as invalid and bail
873  message ??= new BrokerageMessageEvent(BrokerageMessageType.Warning, "InvalidOrder", string.Empty);
874  var response = OrderResponse.Error(request, OrderResponseErrorCode.BrokerageModelRefusedToSubmitOrder, $"{errorMessage} {message}");
875 
876  InvalidateOrders(orders, response.ErrorMessage);
877  _algorithm.Error(response.ErrorMessage);
878  return response;
879  }
880  }
881 
882  // set the order status based on whether or not we successfully submitted the order to the market
883  bool orderPlaced;
884  try
885  {
886  orderPlaced = orders.All(o => _brokerage.PlaceOrder(o));
887  }
888  catch (Exception err)
889  {
890  Log.Error(err);
891  orderPlaced = false;
892  }
893 
894  if (!orderPlaced)
895  {
896  // we failed to submit the order, invalidate it
897  var errorMessage = $"Brokerage failed to place orders: [{string.Join(",", orders.Select(o => o.Id))}]";
898 
899  InvalidateOrders(orders, errorMessage);
900  _algorithm.Error(errorMessage);
901  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToSubmitOrder, errorMessage);
902  }
903 
904  return OrderResponse.Success(request);
905  }
906 
907  /// <summary>
908  /// Handles a request to update order properties
909  /// </summary>
910  private OrderResponse HandleUpdateOrderRequest(UpdateOrderRequest request)
911  {
912  Order order;
913  OrderTicket ticket;
914  if (!_completeOrders.TryGetValue(request.OrderId, out order) || !_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
915  {
916  Log.Error("BrokerageTransactionHandler.HandleUpdateOrderRequest(): Unable to update order with ID " + request.OrderId);
917  return OrderResponse.UnableToFindOrder(request);
918  }
919 
920  if (order.Status == OrderStatus.New)
921  {
922  return OrderResponse.InvalidNewStatus(request, order);
923  }
924 
925  var isClosedOrderUpdate = false;
926 
927  if (order.Status.IsClosed())
928  {
929  if (!request.IsAllowedForClosedOrder())
930  {
931  return OrderResponse.InvalidStatus(request, order);
932  }
933 
934  isClosedOrderUpdate = true;
935  }
936 
937  // rounds off the order towards 0 to the nearest multiple of lot size
938  var security = _algorithm.Securities[order.Symbol];
939  order.Quantity = RoundOffOrder(order, security);
940 
941  // verify that our current brokerage can actually update the order
942  BrokerageMessageEvent message;
943  if (!_algorithm.LiveMode && !_algorithm.BrokerageModel.CanUpdateOrder(_algorithm.Securities[order.Symbol], order, request, out message))
944  {
945  if (message == null) message = new BrokerageMessageEvent(BrokerageMessageType.Warning, "InvalidRequest", "BrokerageModel declared unable to update order: " + order.Id);
946  var response = OrderResponse.Error(request, OrderResponseErrorCode.BrokerageModelRefusedToUpdateOrder, "OrderID: " + order.Id + " " + message);
947  _algorithm.Error(response.ErrorMessage);
948  HandleOrderEvent(new OrderEvent(order,
949  _algorithm.UtcTime,
950  OrderFee.Zero,
951  "BrokerageModel declared unable to update order"));
952  return response;
953  }
954 
955  // modify the values of the order object
956  order.ApplyUpdateOrderRequest(request);
957 
958  // rounds the order prices
959  RoundOrderPrices(order, security);
960 
961  ticket.SetOrder(order);
962 
963  bool orderUpdated;
964  if (isClosedOrderUpdate)
965  {
966  orderUpdated = true;
967  }
968  else
969  {
970  try
971  {
972  orderUpdated = _brokerage.UpdateOrder(order);
973  }
974  catch (Exception err)
975  {
976  Log.Error(err);
977  orderUpdated = false;
978  }
979  }
980 
981  if (!orderUpdated)
982  {
983  // we failed to update the order for some reason
984  var errorMessage = "Brokerage failed to update order with id " + request.OrderId;
985  _algorithm.Error(errorMessage);
986  HandleOrderEvent(new OrderEvent(order,
987  _algorithm.UtcTime,
988  OrderFee.Zero,
989  "Brokerage failed to update order"));
990  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToUpdateOrder, errorMessage);
991  }
992 
993  return OrderResponse.Success(request);
994  }
995 
996  /// <summary>
997  /// Handles a request to cancel an order
998  /// </summary>
999  private OrderResponse HandleCancelOrderRequest(CancelOrderRequest request)
1000  {
1001  Order order;
1002  OrderTicket ticket;
1003  if (!_completeOrders.TryGetValue(request.OrderId, out order) || !_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
1004  {
1005  Log.Error("BrokerageTransactionHandler.HandleCancelOrderRequest(): Unable to cancel order with ID " + request.OrderId + ".");
1007  return OrderResponse.UnableToFindOrder(request);
1008  }
1009 
1010  if (order.Status == OrderStatus.New)
1011  {
1013  return OrderResponse.InvalidNewStatus(request, order);
1014  }
1015 
1016  if (order.Status.IsClosed())
1017  {
1019  return OrderResponse.InvalidStatus(request, order);
1020  }
1021 
1022  ticket.SetOrder(order);
1023 
1024  bool orderCanceled;
1025  try
1026  {
1027  orderCanceled = _brokerage.CancelOrder(order);
1028  }
1029  catch (Exception err)
1030  {
1031  Log.Error(err);
1032  orderCanceled = false;
1033  }
1034 
1035  if (!orderCanceled)
1036  {
1037  // failed to cancel the order
1038  var message = "Brokerage failed to cancel order with id " + order.Id;
1039  _algorithm.Error(message);
1041  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToCancelOrder, message);
1042  }
1043 
1044  if (request.Tag != null)
1045  {
1046  // update the tag, useful for 'why' we canceled the order
1047  order.Tag = request.Tag;
1048  }
1049 
1050  return OrderResponse.Success(request);
1051  }
1052 
1053  private void HandleOrderEvents(List<OrderEvent> orderEvents)
1054  {
1055  lock (_lockHandleOrderEvent)
1056  {
1057  // Get orders and tickets
1058  var orders = new List<Order>(orderEvents.Count);
1059 
1060  for (var i = 0; i < orderEvents.Count; i++)
1061  {
1062  var orderEvent = orderEvents[i];
1063 
1064  if (orderEvent.Status.IsClosed() && _openOrders.TryRemove(orderEvent.OrderId, out var order))
1065  {
1066  _completeOrders[orderEvent.OrderId] = order;
1067  }
1068  else if (!_completeOrders.TryGetValue(orderEvent.OrderId, out order))
1069  {
1070  Log.Error("BrokerageTransactionHandler.HandleOrderEvents(): Unable to locate open Combo Order with id " + orderEvent.OrderId);
1071  LogOrderEvent(orderEvent);
1072  return;
1073  }
1074  orders.Add(order);
1075 
1076  if (orderEvent.Status.IsClosed() && _openOrderTickets.TryRemove(orderEvent.OrderId, out var ticket))
1077  {
1078  _completeOrderTickets[orderEvent.OrderId] = ticket;
1079  }
1080  else if (!_completeOrderTickets.TryGetValue(orderEvent.OrderId, out ticket))
1081  {
1082  Log.Error("BrokerageTransactionHandler.HandleOrderEvents(): Unable to resolve open ticket: " + orderEvent.OrderId);
1083  LogOrderEvent(orderEvent);
1084  return;
1085  }
1086  orderEvent.Ticket = ticket;
1087  }
1088 
1089  var fillsToProcess = new List<OrderEvent>(orderEvents.Count);
1090 
1091  // now lets update the orders
1092  for (var i = 0; i < orderEvents.Count; i++)
1093  {
1094  var orderEvent = orderEvents[i];
1095  var order = orders[i];
1096  var ticket = orderEvent.Ticket;
1097 
1098  _cancelPendingOrders.UpdateOrRemove(order.Id, orderEvent.Status);
1099  // set the status of our order object based on the fill event except if the order status is filled/cancelled and the event is invalid
1100  // in live trading it can happen that we submit an update which get's rejected by the brokerage because the order is already filled
1101  // so we don't want the invalid update event to set the order status to invalid if it's already filled
1102  if (order.Status != OrderStatus.Filled && order.Status != OrderStatus.Canceled || orderEvent.Status != OrderStatus.Invalid)
1103  {
1104  order.Status = orderEvent.Status;
1105  }
1106 
1107  orderEvent.Id = order.GetNewId();
1108 
1109  // set the modified time of the order to the fill's timestamp
1110  switch (orderEvent.Status)
1111  {
1112  case OrderStatus.Canceled:
1113  order.CanceledTime = orderEvent.UtcTime;
1114  break;
1115 
1116  case OrderStatus.PartiallyFilled:
1117  case OrderStatus.Filled:
1118  order.LastFillTime = orderEvent.UtcTime;
1119 
1120  // append fill message to order tag, for additional information
1121  if (orderEvent.Status == OrderStatus.Filled && !string.IsNullOrWhiteSpace(orderEvent.Message))
1122  {
1123  if (string.IsNullOrWhiteSpace(order.Tag))
1124  {
1125  order.Tag = orderEvent.Message;
1126  }
1127  else
1128  {
1129  order.Tag += " - " + orderEvent.Message;
1130  }
1131  }
1132  break;
1133 
1134  case OrderStatus.UpdateSubmitted:
1135  case OrderStatus.Submitted:
1136  // submit events after the initial submission are all order updates
1137  if (ticket.UpdateRequests.Count > 0)
1138  {
1139  order.LastUpdateTime = orderEvent.UtcTime;
1140  }
1141  break;
1142  }
1143 
1144  // lets always set current Quantity, Limit and Stop prices in the order event so that it's easier for consumers
1145  // to know the current state and detect any update
1146  orderEvent.Quantity = order.Quantity;
1147  switch (order.Type)
1148  {
1149  case OrderType.Limit:
1150  var limit = order as LimitOrder;
1151  orderEvent.LimitPrice = limit.LimitPrice;
1152  break;
1153  case OrderType.ComboLegLimit:
1154  var legLimitOrder = order as ComboLegLimitOrder;
1155  orderEvent.LimitPrice = legLimitOrder.LimitPrice;
1156  break;
1157  case OrderType.StopMarket:
1158  var marketOrder = order as StopMarketOrder;
1159  orderEvent.StopPrice = marketOrder.StopPrice;
1160  break;
1161  case OrderType.StopLimit:
1162  var stopLimitOrder = order as StopLimitOrder;
1163  orderEvent.LimitPrice = stopLimitOrder.LimitPrice;
1164  orderEvent.StopPrice = stopLimitOrder.StopPrice;
1165  break;
1166  case OrderType.TrailingStop:
1167  var trailingStopOrder = order as TrailingStopOrder;
1168  orderEvent.StopPrice = trailingStopOrder.StopPrice;
1169  orderEvent.TrailingAmount = trailingStopOrder.TrailingAmount;
1170  break;
1171  case OrderType.LimitIfTouched:
1172  var limitIfTouchedOrder = order as LimitIfTouchedOrder;
1173  orderEvent.LimitPrice = limitIfTouchedOrder.LimitPrice;
1174  orderEvent.TriggerPrice = limitIfTouchedOrder.TriggerPrice;
1175  break;
1176  }
1177 
1178  // check if the fill currency and the order currency match the symbol currency
1179  if (orderEvent.Status == OrderStatus.Filled || orderEvent.Status == OrderStatus.PartiallyFilled)
1180  {
1181  fillsToProcess.Add(orderEvent);
1182  Interlocked.Exchange(ref _lastFillTimeTicks, CurrentTimeUtc.Ticks);
1183 
1184  var security = _algorithm.Securities[orderEvent.Symbol];
1185 
1186  if (orderEvent.Symbol.SecurityType == SecurityType.Crypto
1187  && order.Direction == OrderDirection.Buy
1188  && CurrencyPairUtil.TryDecomposeCurrencyPair(orderEvent.Symbol, out var baseCurrency, out var quoteCurrency)
1189  && orderEvent.OrderFee.Value.Currency == baseCurrency)
1190  {
1191  // fees are in the base currency, so we have to subtract them from the filled quantity
1192  // else the virtual position will bigger than the real size and we might no be able to liquidate
1193  orderEvent.FillQuantity -= orderEvent.OrderFee.Value.Amount;
1194  orderEvent.OrderFee = new ModifiedFillQuantityOrderFee(orderEvent.OrderFee.Value, quoteCurrency, security.SymbolProperties.ContractMultiplier);
1195 
1196  if (!_loggedFeeAdjustmentWarning)
1197  {
1198  _loggedFeeAdjustmentWarning = true;
1199  const string message = "When buying currency pairs, using Cash account types, fees in base currency" +
1200  " will be deducted from the filled quantity so virtual positions reflect actual holdings.";
1201  Log.Trace($"BrokerageTransactionHandler.HandleOrderEvent(): {message}");
1202  _algorithm.Debug(message);
1203  }
1204  }
1205  }
1206  }
1207 
1208  //Apply the filled orders to our portfolio:
1209  try
1210  {
1211  _algorithm.Portfolio.ProcessFills(fillsToProcess);
1212  }
1213  catch (Exception err)
1214  {
1215  Log.Error(err);
1216  _algorithm.Error($"Fill error: error in TradeBuilder.ProcessFill: {err.Message}");
1217  }
1218 
1219  // Apply the filled orders to the trade builder
1220  for (var i = 0; i < orderEvents.Count; i++)
1221  {
1222  var orderEvent = orderEvents[i];
1223 
1224  if (orderEvent.Status == OrderStatus.Filled || orderEvent.Status == OrderStatus.PartiallyFilled)
1225  {
1226  var security = _algorithm.Securities[orderEvent.Symbol];
1227 
1228  var multiplier = security.SymbolProperties.ContractMultiplier;
1229  var securityConversionRate = security.QuoteCurrency.ConversionRate;
1230  var feeInAccountCurrency = _algorithm.Portfolio.CashBook
1231  .ConvertToAccountCurrency(orderEvent.OrderFee.Value).Amount;
1232 
1233  try
1234  {
1235  _algorithm.TradeBuilder.ProcessFill(
1236  orderEvent,
1237  securityConversionRate,
1238  feeInAccountCurrency,
1239  multiplier);
1240  }
1241  catch (Exception err)
1242  {
1243  Log.Error(err);
1244  }
1245  }
1246 
1247  // update the ticket after we've processed the fill, but before the event, this way everything is ready for user code
1248  orderEvent.Ticket.AddOrderEvent(orderEvent);
1249  }
1250  }
1251 
1252  //We have the events! :) Orders filled, send them in to be handled by algorithm portfolio.
1253  for (var i = 0; i < orderEvents.Count; i++)
1254  {
1255  var orderEvent = orderEvents[i];
1256 
1257  if (orderEvent.Status != OrderStatus.None) //order.Status != OrderStatus.Submitted
1258  {
1259  _orderEvents.Enqueue(orderEvent);
1260 
1261  //Create new order event:
1262  _resultHandler.OrderEvent(orderEvent);
1263 
1264  NewOrderEvent?.Invoke(this, orderEvent);
1265 
1266  try
1267  {
1268  //Trigger our order event handler
1269  _algorithm.OnOrderEvent(orderEvent);
1270  }
1271  catch (Exception err)
1272  {
1273  // unexpected error, we need to close down shop
1274  _algorithm.SetRuntimeError(err, "Order Event Handler");
1275  }
1276  }
1277 
1278  LogOrderEvent(orderEvent);
1279  }
1280  }
1281 
1282  private void HandleOrderEvent(OrderEvent orderEvent)
1283  {
1284  HandleOrderEvents(new List<OrderEvent> { orderEvent });
1285  }
1286 
1287  private void HandleOrderUpdated(OrderUpdateEvent e)
1288  {
1289  if (!_completeOrders.TryGetValue(e.OrderId, out var order))
1290  {
1291  Log.Error("BrokerageTransactionHandler.HandleOrderUpdated(): Unable to locate open order with id " + e.OrderId);
1292  return;
1293  }
1294 
1295  switch (order.Type)
1296  {
1297  case OrderType.TrailingStop:
1298  ((TrailingStopOrder)order).StopPrice = e.TrailingStopPrice;
1299  break;
1300 
1301  case OrderType.StopLimit:
1302  ((StopLimitOrder)order).StopTriggered = e.StopTriggered;
1303  break;
1304  }
1305  }
1306 
1307  /// <summary>
1308  /// Gets the price adjustment mode for the specified symbol from its subscription configurations
1309  /// </summary>
1310  private void SetPriceAdjustmentMode(Order order, IAlgorithm algorithm)
1311  {
1312  if (algorithm.LiveMode)
1313  {
1314  // live trading always uses raw prices
1316  return;
1317  }
1318 
1319  if (!_priceAdjustmentModes.TryGetValue(order.Symbol, out var mode))
1320  {
1321  var configs = algorithm.SubscriptionManager.SubscriptionDataConfigService
1322  .GetSubscriptionDataConfigs(order.Symbol, includeInternalConfigs: true);
1323  if (configs.Count == 0)
1324  {
1325  throw new InvalidOperationException($"Unable to locate subscription data config for {order.Symbol}");
1326  }
1327 
1328  mode = configs[0].DataNormalizationMode;
1329  _priceAdjustmentModes[order.Symbol] = mode;
1330  }
1331 
1332  order.PriceAdjustmentMode = mode;
1333  }
1334 
1335  /// <summary>
1336  /// Debug logging helper method, called after HandleOrderEvent has finished updating status, price and quantity
1337  /// </summary>
1338  /// <param name="e">The order event</param>
1339  private static void LogOrderEvent(OrderEvent e)
1340  {
1341  if (Log.DebuggingEnabled)
1342  {
1343  Log.Debug("BrokerageTransactionHandler.LogOrderEvent(): " + e);
1344  }
1345  }
1346 
1347  /// <summary>
1348  /// Brokerages can send account updates, this include cash balance updates. Since it is of
1349  /// utmost important to always have an accurate picture of reality, we'll trust this information
1350  /// as truth
1351  /// </summary>
1352  private void HandleAccountChanged(AccountEvent account)
1353  {
1354  // how close are we?
1355  var existingCashBalance = _algorithm.Portfolio.CashBook[account.CurrencySymbol].Amount;
1356  if (existingCashBalance != account.CashBalance)
1357  {
1358  Log.Trace($"BrokerageTransactionHandler.HandleAccountChanged(): {account.CurrencySymbol} Cash Lean: {existingCashBalance} Brokerage: {account.CashBalance}. Will update: {_brokerage.AccountInstantlyUpdated}");
1359  }
1360 
1361  // maybe we don't actually want to do this, this data can be delayed. Must be explicitly supported by brokerage
1362  if (_brokerage.AccountInstantlyUpdated)
1363  {
1364  // override the current cash value so we're always guaranteed to be in sync with the brokerage's push updates
1365  _algorithm.Portfolio.CashBook[account.CurrencySymbol].SetAmount(account.CashBalance);
1366  }
1367  }
1368 
1369  /// <summary>
1370  /// Brokerage order id change is applied to the target order
1371  /// </summary>
1372  private void HandlerBrokerageOrderIdChangedEvent(BrokerageOrderIdChangedEvent brokerageOrderIdChangedEvent)
1373  {
1374  var originalOrder = GetOrderByIdInternal(brokerageOrderIdChangedEvent.OrderId);
1375 
1376  if (originalOrder == null)
1377  {
1378  // shouldn't happen but let's be careful
1379  Log.Error($"BrokerageTransactionHandler.HandlerBrokerageOrderIdChangedEvent(): Lean order id {brokerageOrderIdChangedEvent.OrderId} not found");
1380  return;
1381  }
1382 
1383  // we replace the whole collection
1384  originalOrder.BrokerId = brokerageOrderIdChangedEvent.BrokerId;
1385  }
1386 
1387  /// <summary>
1388  /// Option assignment/exercise event is received and propagated to the user algo
1389  /// </summary>
1390  private void HandlePositionAssigned(OrderEvent fill)
1391  {
1392  // informing user algorithm that option position has been assigned
1393  _algorithm.OnAssignmentOrderEvent(fill);
1394  }
1395 
1396  private void HandleDelistingNotification(DelistingNotificationEventArgs e)
1397  {
1398  if (_algorithm.Securities.TryGetValue(e.Symbol, out var security))
1399  {
1400  // only log always in live trading, in backtesting log if not 0 holdings
1401  if (_algorithm.LiveMode || security.Holdings.Quantity != 0)
1402  {
1403  Log.Trace(
1404  $"BrokerageTransactionHandler.HandleDelistingNotification(): UtcTime: {CurrentTimeUtc} clearing position for delisted holding: " +
1405  $"Symbol: {e.Symbol.Value}, " +
1406  $"Quantity: {security.Holdings.Quantity}");
1407  }
1408 
1409  // Only submit an order if we have holdings
1410  var quantity = -security.Holdings.Quantity;
1411  if (quantity != 0)
1412  {
1413  var tag = "Liquidate from delisting";
1414 
1415  // Create our order and add it
1416  var order = new MarketOrder(security.Symbol, quantity, _algorithm.UtcTime, tag);
1417  AddOpenOrder(order, _algorithm);
1418 
1419  // Create our fill with the latest price
1420  var fill = new OrderEvent(order, _algorithm.UtcTime, OrderFee.Zero)
1421  {
1422  FillPrice = security.Price,
1423  Status = OrderStatus.Filled,
1424  FillQuantity = order.Quantity
1425  };
1426 
1427  // Process this order event
1428  HandleOrderEvent(fill);
1429  }
1430  }
1431  }
1432 
1433  /// <summary>
1434  /// Option notification event is received and new order events are generated
1435  /// </summary>
1436  private void HandleOptionNotification(OptionNotificationEventArgs e)
1437  {
1438  if (_algorithm.Securities.TryGetValue(e.Symbol, out var security))
1439  {
1440  // let's take the order event lock, we will be looking at orders and security holdings
1441  // and we don't want them changing mid processing because of an order event coming in at the same time
1442  // for example: DateTime/decimal order attributes are not thread safe by nature!
1443  lock (_lockHandleOrderEvent)
1444  {
1446  {
1447  if (e.Position == 0)
1448  {
1449  // only log always in live trading, in backtesting log if not 0 holdings
1450  if (_algorithm.LiveMode || security.Holdings.Quantity != 0)
1451  {
1452  Log.Trace(
1453  $"BrokerageTransactionHandler.HandleOptionNotification(): UtcTime: {CurrentTimeUtc} clearing position for expired option holding: " +
1454  $"Symbol: {e.Symbol.Value}, " +
1455  $"Holdings: {security.Holdings.Quantity}");
1456  }
1457 
1458  var quantity = -security.Holdings.Quantity;
1459 
1460  // If the quantity is already 0 for Lean and the brokerage there is nothing else todo here
1461  if (quantity != 0)
1462  {
1463  var exerciseOrder = GenerateOptionExerciseOrder(security, quantity, e.Tag);
1464 
1465  EmitOptionNotificationEvents(security, exerciseOrder);
1466  }
1467  }
1468  else
1469  {
1470  Log.Error("BrokerageTransactionHandler.HandleOptionNotification(): " +
1471  $"unexpected position ({e.Position} instead of zero) " +
1472  $"for expired option contract: {e.Symbol.Value}");
1473  }
1474  }
1475  else
1476  {
1477  // if position is reduced, could be an early exercise or early assignment
1478  if (Math.Abs(e.Position) < security.Holdings.AbsoluteQuantity)
1479  {
1480  Log.Trace("BrokerageTransactionHandler.HandleOptionNotification(): " +
1481  $"Symbol {e.Symbol.Value} EventQuantity {e.Position} Holdings {security.Holdings.Quantity}");
1482 
1483  // if we are long the option and there is an open order, assume it's an early exercise
1484  if (security.Holdings.IsLong)
1485  {
1486  // we only care about open option exercise orders, if it's closed it means we already
1487  // processed it and we wouldn't have a need to handle it here
1488  if (GetOpenOrders(x =>
1489  x.Symbol == e.Symbol &&
1490  x.Type == OrderType.OptionExercise)
1491  .FirstOrDefault() is OptionExerciseOrder exerciseOrder)
1492  {
1493  EmitOptionNotificationEvents(security, exerciseOrder);
1494  }
1495  }
1496 
1497  // if we are short the option and there are no buy orders (open or recently closed), assume it's an early assignment
1498  else if (security.Holdings.IsShort)
1499  {
1500  var nowUtc = CurrentTimeUtc;
1501  // for some brokerages (like IB) there might be a race condition between getting an option
1502  // notification event and lean processing an order event. So if we are here, there are these options:
1503  // A) holdings -10 position 5
1504  // 1) WE just BOUGHT 15 and Lean doesn't know yet
1505  // 2) WE just SOLD 15 and this notification is old
1506  // B) holdings -10 position -5
1507  // 1) WE just BOUGHT 5 and Lean doesn't know yet
1508  // 2) WE just SOLD 5 more and this notification is old
1509  // - Seen this in production already
1510  // 3) Brokerage triggered an early assignment
1511 
1512  // so we get ALL orders for this symbol that were placed or got an update in the last 'orderWindowSeconds'
1513 
1514  const int orderWindowSeconds = 10;
1515  // NOTE: We do this checks for actual live trading only to handle the race condition stated above
1516  // for actual brokerages (excluding paper trading with PaperBrokerage).
1517  // TODO: If we confirm this race condition applies for IB only, we could move this to the brokerage itself.
1518  if (_brokerageIsBacktesting ||
1519  !GetOrders(x =>
1520  x.Symbol == e.Symbol
1521  && (x.Status.IsOpen() || x.Status.IsFill() &&
1522  (Math.Abs((x.Time - nowUtc).TotalSeconds) < orderWindowSeconds
1523  || (x.LastUpdateTime.HasValue && Math.Abs((x.LastUpdateTime.Value - nowUtc).TotalSeconds) < orderWindowSeconds)
1524  || (x.LastFillTime.HasValue && Math.Abs((x.LastFillTime.Value - nowUtc).TotalSeconds) < orderWindowSeconds)))).Any())
1525  {
1526  var quantity = e.Position - security.Holdings.Quantity;
1527 
1528  var exerciseOrder = GenerateOptionExerciseOrder(security, quantity, e.Tag);
1529 
1530  EmitOptionNotificationEvents(security, exerciseOrder);
1531  }
1532  }
1533  }
1534  }
1535  }
1536  }
1537  }
1538 
1539  /// <summary>
1540  /// New brokerage-side order event handler
1541  /// </summary>
1542  private void HandleNewBrokerageSideOrder(NewBrokerageOrderNotificationEventArgs e)
1543  {
1544  void onError(IReadOnlyCollection<SecurityType> supportedSecurityTypes) =>
1545  _algorithm.Debug($"Warning: New brokerage-side order could not be processed due to " +
1546  $"it's security not being supported. Supported security types are {string.Join(", ", supportedSecurityTypes)}");
1547 
1548  if (_algorithm.BrokerageMessageHandler.HandleOrder(e) &&
1549  _algorithm.GetOrAddUnrequestedSecurity(e.Order.Symbol, out _, onError))
1550  {
1551  AddOpenOrder(e.Order, _algorithm);
1552  }
1553  }
1554 
1555  private OptionExerciseOrder GenerateOptionExerciseOrder(Security security, decimal quantity, string tag)
1556  {
1557  // generate new exercise order and ticket for the option
1558  var order = new OptionExerciseOrder(security.Symbol, quantity, CurrentTimeUtc, tag);
1559 
1560  // save current security prices
1561  order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
1562  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
1563 
1564  AddOpenOrder(order, _algorithm);
1565  return order;
1566  }
1567 
1568  private void EmitOptionNotificationEvents(Security security, OptionExerciseOrder order)
1569  {
1570  // generate the order events reusing the option exercise model
1571  var option = (Option)security;
1572  var orderEvents = option.OptionExerciseModel.OptionExercise(option, order);
1573 
1574  foreach (var orderEvent in orderEvents)
1575  {
1576  HandleOrderEvent(orderEvent);
1577 
1578  if (orderEvent.IsAssignment)
1579  {
1580  orderEvent.Message = order.Tag;
1581  HandlePositionAssigned(orderEvent);
1582  }
1583  }
1584  }
1585 
1586  /// <summary>
1587  /// Gets the amount of time since the last call to algorithm.Portfolio.ProcessFill(fill)
1588  /// </summary>
1589  protected virtual TimeSpan TimeSinceLastFill =>
1590  CurrentTimeUtc - new DateTime(Interlocked.Read(ref _lastFillTimeTicks));
1591 
1592  /// <summary>
1593  /// Gets current time UTC. This is here to facilitate testing
1594  /// </summary>
1595  protected virtual DateTime CurrentTimeUtc => DateTime.UtcNow;
1596 
1597  /// <summary>
1598  /// Rounds off the order towards 0 to the nearest multiple of Lot Size
1599  /// </summary>
1600  public decimal RoundOffOrder(Order order, Security security)
1601  {
1602  var orderLotMod = order.Quantity % security.SymbolProperties.LotSize;
1603 
1604  if (orderLotMod != 0)
1605  {
1606  order.Quantity = order.Quantity - orderLotMod;
1607 
1608  if (!_firstRoundOffMessage)
1609  {
1610  _algorithm.Error("Warning: Due to brokerage limitations, orders will be rounded to " +
1611  $"the nearest lot size of {security.SymbolProperties.LotSize.ToStringInvariant()}"
1612  );
1613  _firstRoundOffMessage = true;
1614  }
1615  return order.Quantity;
1616  }
1617  else
1618  {
1619  return order.Quantity;
1620  }
1621  }
1622 
1623  /// <summary>
1624  /// Rounds the order prices to its security minimum price variation.
1625  /// <remarks>
1626  /// This procedure is needed to meet brokerage precision requirements.
1627  /// </remarks>
1628  /// </summary>
1629  protected void RoundOrderPrices(Order order, Security security)
1630  {
1631  var comboIsReady = order.TryGetGroupOrders(TryGetOrder, out var orders);
1632  orders.TryGetGroupOrdersSecurities(_algorithm.Portfolio, out var securities);
1633 
1634  RoundOrderPrices(order, security, comboIsReady, securities);
1635  }
1636 
1637  /// <summary>
1638  /// Rounds the order prices to its security minimum price variation.
1639  /// <remarks>
1640  /// This procedure is needed to meet brokerage precision requirements.
1641  /// </remarks>
1642  /// </summary>
1643  protected void RoundOrderPrices(Order order, Security security, bool comboIsReady, Dictionary<Order, Security> orders)
1644  {
1645  switch (order.Type)
1646  {
1647  case OrderType.Limit:
1648  {
1649  var limitOrder = (LimitOrder)order;
1650  RoundOrderPrice(security, limitOrder.LimitPrice, "LimitPrice", (roundedPrice) => limitOrder.LimitPrice = roundedPrice);
1651  }
1652  break;
1653 
1654  case OrderType.StopMarket:
1655  {
1656  var stopMarketOrder = (StopMarketOrder)order;
1657  RoundOrderPrice(security, stopMarketOrder.StopPrice, "StopPrice", (roundedPrice) => stopMarketOrder.StopPrice = roundedPrice);
1658  }
1659  break;
1660 
1661  case OrderType.StopLimit:
1662  {
1663  var stopLimitOrder = (StopLimitOrder)order;
1664  RoundOrderPrice(security, stopLimitOrder.LimitPrice, "LimitPrice", (roundedPrice) => stopLimitOrder.LimitPrice = roundedPrice);
1665  RoundOrderPrice(security, stopLimitOrder.StopPrice, "StopPrice", (roundedPrice) => stopLimitOrder.StopPrice = roundedPrice);
1666  }
1667  break;
1668 
1669  case OrderType.TrailingStop:
1670  {
1671  var trailingStopOrder = (TrailingStopOrder)order;
1672  RoundOrderPrice(security, trailingStopOrder.StopPrice, "StopPrice",
1673  (roundedPrice) => trailingStopOrder.StopPrice = roundedPrice);
1674 
1675  if (!trailingStopOrder.TrailingAsPercentage)
1676  {
1677  RoundOrderPrice(security, trailingStopOrder.TrailingAmount, "TrailingAmount",
1678  (roundedAmount) => trailingStopOrder.TrailingAmount = roundedAmount);
1679  }
1680  }
1681  break;
1682 
1683  case OrderType.LimitIfTouched:
1684  {
1685  var limitIfTouchedOrder = (LimitIfTouchedOrder)order;
1686  RoundOrderPrice(security, limitIfTouchedOrder.LimitPrice, "LimitPrice",
1687  (roundedPrice) => limitIfTouchedOrder.LimitPrice = roundedPrice);
1688  RoundOrderPrice(security, limitIfTouchedOrder.TriggerPrice, "TriggerPrice",
1689  (roundedPrice) => limitIfTouchedOrder.TriggerPrice = roundedPrice);
1690  }
1691  break;
1692 
1693  case OrderType.ComboLegLimit:
1694  {
1695  var comboLegOrder = (ComboLegLimitOrder)order;
1696  RoundOrderPrice(security, comboLegOrder.LimitPrice, "LimitPrice",
1697  (roundedPrice) => comboLegOrder.LimitPrice = roundedPrice);
1698  }
1699  break;
1700 
1701  case OrderType.ComboLimit:
1702  {
1703  if (comboIsReady)
1704  {
1705  // all orders in the combo have been received.
1706  // we can now round the limit price of the group order,
1707  // for which we need to find the smallest price variation from each leg security
1708  var groupOrderManager = order.GroupOrderManager;
1709  var increment = 0m;
1710  foreach (var (legOrder, legSecurity) in orders)
1711  {
1712  var legIncrement = legSecurity.PriceVariationModel.GetMinimumPriceVariation(
1713  new GetMinimumPriceVariationParameters(legSecurity, legOrder.Price));
1714  if (legIncrement > 0 && (increment == 0 || legIncrement < increment))
1715  {
1716  increment = legIncrement;
1717  }
1718  }
1719 
1720  RoundOrderPrice(groupOrderManager.LimitPrice, increment, "LimitPrice",
1721  (roundedPrice) => groupOrderManager.LimitPrice = roundedPrice);
1722  }
1723 
1724  }
1725  break;
1726  }
1727  }
1728 
1729  private void RoundOrderPrice(Security security, decimal price, string priceType, Action<decimal> setPrice)
1730  {
1731  var increment = security.PriceVariationModel.GetMinimumPriceVariation(new GetMinimumPriceVariationParameters(security, price));
1732  RoundOrderPrice(price, increment, priceType, setPrice);
1733  }
1734 
1735  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1736  private void RoundOrderPrice(decimal price, decimal increment, string priceType, Action<decimal> setPrice)
1737  {
1738  if (increment > 0)
1739  {
1740  var roundedPrice = Math.Round(price / increment) * increment;
1741  setPrice(roundedPrice);
1742  SendWarningOnPriceChange(priceType, roundedPrice, price);
1743  }
1744  }
1745 
1746  private Order TryGetOrder(int orderId)
1747  {
1748  _completeOrders.TryGetValue(orderId, out var order);
1749  return order;
1750  }
1751 
1752  private void InvalidateOrders(List<Order> orders, string message)
1753  {
1754  for (var i = 0; i < orders.Count; i++)
1755  {
1756  var orderInGroup = orders[i];
1757  if (!orderInGroup.Status.IsClosed())
1758  {
1759  orderInGroup.Status = OrderStatus.Invalid;
1760  }
1761  HandleOrderEvents(new List<OrderEvent> { new OrderEvent(orderInGroup, _algorithm.UtcTime, OrderFee.Zero, message) });
1762  }
1763  }
1764 
1765  private void SendWarningOnPriceChange(string priceType, decimal priceRound, decimal priceOriginal)
1766  {
1767  if (!priceOriginal.Equals(priceRound))
1768  {
1769  _algorithm.Error(
1770  $"Warning: To meet brokerage precision requirements, order {priceType.ToStringInvariant()} was rounded to {priceRound.ToStringInvariant()} from {priceOriginal.ToStringInvariant()}"
1771  );
1772  }
1773  }
1774 
1775  private string GetShortableErrorMessage(Symbol symbol, decimal quantity)
1776  {
1777  var shortableQuantity = _algorithm.ShortableQuantity(symbol);
1778  return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})";
1779  }
1780  }
1781 }
1782