Lean  $LEAN_TAG$
AlgorithmManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using System.Threading;
21 using Fasterflect;
24 using QuantConnect.Data;
33 using QuantConnect.Logging;
34 using QuantConnect.Orders;
35 using QuantConnect.Packets;
40 
42 {
43  /// <summary>
44  /// Algorithm manager class executes the algorithm and generates and passes through the algorithm events.
45  /// </summary>
46  public class AlgorithmManager
47  {
48  private IAlgorithm _algorithm;
49  private readonly object _lock;
50  private readonly bool _liveMode;
51 
52  /// <summary>
53  /// Publicly accessible algorithm status
54  /// </summary>
55  public AlgorithmStatus State => _algorithm?.Status ?? AlgorithmStatus.Running;
56 
57  /// <summary>
58  /// Public access to the currently running algorithm id.
59  /// </summary>
60  public string AlgorithmId { get; private set; }
61 
62  /// <summary>
63  /// Provides the isolator with a function for verifying that we're not spending too much time in each
64  /// algorithm manager time loop
65  /// </summary>
67 
68  /// <summary>
69  /// Quit state flag for the running algorithm. When true the user has requested the backtest stops through a Quit() method.
70  /// </summary>
71  /// <seealso cref="QCAlgorithm.Quit(String)"/>
72  public bool QuitState => State == AlgorithmStatus.Deleted;
73 
74  /// <summary>
75  /// Gets the number of data points processed per second
76  /// </summary>
77  public long DataPoints { get; private set; }
78 
79  /// <summary>
80  /// Gets the number of data points of algorithm history provider
81  /// </summary>
83 
84  /// <summary>
85  /// Initializes a new instance of the <see cref="AlgorithmManager"/> class
86  /// </summary>
87  /// <param name="liveMode">True if we're running in live mode, false for backtest mode</param>
88  /// <param name="job">Provided by LEAN when creating a new algo manager. This is the job
89  /// that the algo manager is about to execute. Research and other consumers can provide the
90  /// default value of null</param>
91  public AlgorithmManager(bool liveMode, AlgorithmNodePacket job = null)
92  {
93  AlgorithmId = "";
94  _liveMode = liveMode;
95  _lock = new object();
96 
97  // initialize the time limit manager
99  CreateTokenBucket(job?.Controls?.TrainingLimits),
100  TimeSpan.FromMinutes(Config.GetDouble("algorithm-manager-time-loop-maximum", 20))
101  );
102  }
103 
104  /// <summary>
105  /// Launch the algorithm manager to run this strategy
106  /// </summary>
107  /// <param name="job">Algorithm job</param>
108  /// <param name="algorithm">Algorithm instance</param>
109  /// <param name="synchronizer">Instance which implements <see cref="ISynchronizer"/>. Used to stream the data</param>
110  /// <param name="transactions">Transaction manager object</param>
111  /// <param name="results">Result handler object</param>
112  /// <param name="realtime">Realtime processing object</param>
113  /// <param name="leanManager">ILeanManager implementation that is updated periodically with the IAlgorithm instance</param>
114  /// <param name="token">Cancellation token</param>
115  /// <remarks>Modify with caution</remarks>
116  public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationToken token)
117  {
118  //Initialize:
119  DataPoints = 0;
120  _algorithm = algorithm;
121 
122  var backtestMode = (job.Type == PacketType.BacktestNode);
123  var methodInvokers = new Dictionary<Type, MethodInvoker>();
124  var marginCallFrequency = TimeSpan.FromMinutes(5);
125  var nextMarginCallTime = DateTime.MinValue;
126  var nextSecurityModelScan = algorithm.UtcTime.RoundDown(Time.OneHour) + Time.OneHour;
127  var time = algorithm.StartDate.Date;
128 
129  var pendingDelistings = new List<Delisting>();
130  var splitWarnings = new List<Split>();
131 
132  //Initialize Properties:
133  AlgorithmId = job.AlgorithmId;
134 
135  //Go through the subscription types and create invokers to trigger the event handlers for each custom type:
136  foreach (var config in algorithm.SubscriptionManager.Subscriptions)
137  {
138  //If type is a custom feed, check for a dedicated event handler
139  if (config.IsCustomData)
140  {
141  //Get the matching method for this event handler - e.g. public void OnData(Quandl data) { .. }
142  var genericMethod = (algorithm.GetType()).GetMethod("OnData", new[] { config.Type });
143 
144  //If we already have this Type-handler then don't add it to invokers again.
145  if (methodInvokers.ContainsKey(config.Type)) continue;
146 
147  if (genericMethod != null)
148  {
149  methodInvokers.Add(config.Type, genericMethod.DelegateForCallMethod());
150  }
151  }
152  }
153 
154  // Schedule a daily event for sampling at midnight every night
155  algorithm.Schedule.On("Daily Sampling", algorithm.Schedule.DateRules.EveryDay(),
156  algorithm.Schedule.TimeRules.Midnight, () =>
157  {
158  results.Sample(algorithm.UtcTime);
159  });
160 
161  //Loop over the queues: get a data collection, then pass them all into relevent methods in the algorithm.
162  Log.Trace($"AlgorithmManager.Run(): Begin DataStream - Start: {algorithm.StartDate} Stop: {algorithm.EndDate} Time: {algorithm.Time} Warmup: {algorithm.IsWarmingUp}");
163  foreach (var timeSlice in Stream(algorithm, synchronizer, results, token))
164  {
165  // reset our timer on each loop
167 
168  //Check this backtest is still running:
169  if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null)
170  {
171  Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
172  break;
173  }
174 
175  //Execute with TimeLimit Monitor:
176  if (token.IsCancellationRequested)
177  {
178  Log.Error($"AlgorithmManager.Run(): CancellationRequestion at {timeSlice.Time.ToStringInvariant()}");
179  return;
180  }
181 
182  // Update the ILeanManager
183  leanManager.Update();
184 
185  time = timeSlice.Time;
186  DataPoints += timeSlice.DataPointCount;
187 
188  if (backtestMode && algorithm.Portfolio.TotalPortfolioValue <= 0)
189  {
190  var logMessage = "AlgorithmManager.Run(): Portfolio value is less than or equal to zero, stopping algorithm.";
191  Log.Error(logMessage);
192  results.SystemDebugMessage(logMessage);
193  break;
194  }
195 
196  // If backtesting/warmup, we need to check if there are realtime events in the past
197  // which didn't fire because at the scheduled times there was no data (i.e. markets closed)
198  // and fire them with the correct date/time.
199  realtime.ScanPastEvents(time);
200 
201  // will scan registered consolidators for which we've past the expected scan call
202  algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm);
203 
204  //Set the algorithm and real time handler's time
205  algorithm.SetDateTime(time);
206 
207  // the time pulse are just to advance algorithm time, lets shortcut the loop here
208  if (timeSlice.IsTimePulse)
209  {
210  continue;
211  }
212 
213  // Update the current slice before firing scheduled events or any other task
214  algorithm.SetCurrentSlice(timeSlice.Slice);
215 
216  if (timeSlice.Slice.SymbolChangedEvents.Count != 0)
217  {
218  try
219  {
220  algorithm.OnSymbolChangedEvents(timeSlice.Slice.SymbolChangedEvents);
221  }
222  catch (Exception err)
223  {
224  algorithm.SetRuntimeError(err, "OnSymbolChangedEvents");
225  return;
226  }
227 
228  foreach (var symbol in timeSlice.Slice.SymbolChangedEvents.Keys)
229  {
230  // cancel all orders for the old symbol
231  foreach (var ticket in transactions.GetOpenOrderTickets(x => x.Symbol == symbol))
232  {
233  ticket.Cancel("Open order cancelled on symbol changed event");
234  }
235  }
236  }
237 
238  if (timeSlice.SecurityChanges != SecurityChanges.None)
239  {
240  algorithm.ProcessSecurityChanges(timeSlice.SecurityChanges);
241 
242  leanManager.OnSecuritiesChanged(timeSlice.SecurityChanges);
243  realtime.OnSecuritiesChanged(timeSlice.SecurityChanges);
244  results.OnSecuritiesChanged(timeSlice.SecurityChanges);
245  }
246 
247  //Update the securities properties: first before calling user code to avoid issues with data
248  foreach (var update in timeSlice.SecuritiesUpdateData)
249  {
250  var security = update.Target;
251 
252  security.Update(update.Data, update.DataType, update.ContainsFillForwardData);
253 
254  // Send market price updates to the TradeBuilder
255  algorithm.TradeBuilder.SetMarketPrice(security.Symbol, security.Price);
256  }
257 
258  // TODO: potentially push into a scheduled event
259  if (time >= nextSecurityModelScan)
260  {
261  foreach (var security in algorithm.Securities.Values)
262  {
263  security.MarginInterestRateModel.ApplyMarginInterestRate(new MarginInterestRateParameters(security, time));
264 
265  // perform check for settlement of unsettled funds
266  security.SettlementModel.Scan(new ScanSettlementModelParameters(algorithm.Portfolio, security, time));
267  }
268  nextSecurityModelScan = time.RoundDown(Time.OneHour) + Time.OneHour;
269  }
270 
271  //Update the securities properties with any universe data
272  if (timeSlice.UniverseData.Count > 0)
273  {
274  foreach (var dataCollection in timeSlice.UniverseData.Values)
275  {
276  if (!dataCollection.ShouldCacheToSecurity()) continue;
277 
278  foreach (var data in dataCollection.Data)
279  {
280  if (algorithm.Securities.TryGetValue(data.Symbol, out var security))
281  {
282  security.Cache.StoreData(new[] { data }, data.GetType());
283  }
284  }
285  }
286  }
287 
288  // poke each cash object to update from the recent security data
289  foreach (var cash in algorithm.Portfolio.CashBook.Values.Where(x => x.CurrencyConversion != null))
290  {
291  cash.Update();
292  }
293 
294  // security prices got updated
295  algorithm.Portfolio.InvalidateTotalPortfolioValue();
296 
297  // process fill models on the updated data before entering algorithm, applies to all non-market orders
298  transactions.ProcessSynchronousEvents();
299 
300  // fire real time events after we've updated based on the new data
301  realtime.SetTime(timeSlice.Time);
302 
303  // process split warnings for options
304  ProcessSplitSymbols(algorithm, splitWarnings, pendingDelistings);
305 
306  //Check if the user's signalled Quit: loop over data until day changes.
307  if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null)
308  {
309  Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
310  break;
311  }
312  if (algorithm.RunTimeError != null)
313  {
314  Log.Error($"AlgorithmManager.Run(): Stopping, encountered a runtime error at {algorithm.UtcTime} UTC.");
315  return;
316  }
317 
318  // perform margin calls, in live mode we can also use realtime to emit these
319  if (time >= nextMarginCallTime || (_liveMode && nextMarginCallTime > DateTime.UtcNow))
320  {
321  // determine if there are possible margin call orders to be executed
322  bool issueMarginCallWarning;
323  var marginCallOrders = algorithm.Portfolio.MarginCallModel.GetMarginCallOrders(out issueMarginCallWarning);
324  if (marginCallOrders.Count != 0)
325  {
326  var executingMarginCall = false;
327  try
328  {
329  // tell the algorithm we're about to issue the margin call
330  algorithm.OnMarginCall(marginCallOrders);
331 
332  executingMarginCall = true;
333 
334  // execute the margin call orders
335  var executedTickets = algorithm.Portfolio.MarginCallModel.ExecuteMarginCall(marginCallOrders);
336  foreach (var ticket in executedTickets)
337  {
338  algorithm.Error($"{algorithm.Time.ToStringInvariant()} - Executed MarginCallOrder: {ticket.Symbol} - " +
339  $"Quantity: {ticket.Quantity.ToStringInvariant()} @ {ticket.AverageFillPrice.ToStringInvariant()}"
340  );
341  }
342  }
343  catch (Exception err)
344  {
345  algorithm.SetRuntimeError(err, executingMarginCall ? "Portfolio.MarginCallModel.ExecuteMarginCall" : "OnMarginCall");
346  return;
347  }
348  }
349  // we didn't perform a margin call, but got the warning flag back, so issue the warning to the algorithm
350  else if (issueMarginCallWarning)
351  {
352  try
353  {
354  algorithm.OnMarginCallWarning();
355  }
356  catch (Exception err)
357  {
358  algorithm.SetRuntimeError(err, "OnMarginCallWarning");
359  return;
360  }
361  }
362 
363  nextMarginCallTime = time + marginCallFrequency;
364  }
365 
366  // before we call any events, let the algorithm know about universe changes
367  if (timeSlice.SecurityChanges != SecurityChanges.None)
368  {
369  try
370  {
371  var algorithmSecurityChanges = new SecurityChanges(timeSlice.SecurityChanges)
372  {
373  // by default for user code we want to filter out custom securities
374  FilterCustomSecurities = true,
375  // by default for user code we want to filter out internal securities
376  FilterInternalSecurities = true
377  };
378 
379  algorithm.OnSecuritiesChanged(algorithmSecurityChanges);
380  algorithm.OnFrameworkSecuritiesChanged(algorithmSecurityChanges);
381  }
382  catch (Exception err)
383  {
384  algorithm.SetRuntimeError(err, "OnSecuritiesChanged");
385  return;
386  }
387  }
388 
389  // apply dividends
390  HandleDividends(timeSlice, algorithm, _liveMode);
391 
392  // apply splits
393  HandleSplits(timeSlice, algorithm, _liveMode);
394 
395  //Update registered consolidators for this symbol index
396  try
397  {
398  if (timeSlice.ConsolidatorUpdateData.Count > 0)
399  {
400  var timeKeeper = algorithm.TimeKeeper;
401  foreach (var update in timeSlice.ConsolidatorUpdateData)
402  {
403  var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
404  var consolidators = update.Target.Consolidators;
405  foreach (var consolidator in consolidators)
406  {
407  foreach (var dataPoint in update.Data)
408  {
409  // only push data into consolidators on the native, subscribed to resolution
410  if (EndTimeIsInNativeResolution(update.Target, dataPoint.EndTime))
411  {
412  consolidator.Update(dataPoint);
413  }
414  }
415 
416  // scan for time after we've pumped all the data through for this consolidator
417  consolidator.Scan(localTime);
418  }
419  }
420  }
421  }
422  catch (Exception err)
423  {
424  algorithm.SetRuntimeError(err, "Consolidators update");
425  return;
426  }
427 
428  // fire custom event handlers
429  foreach (var update in timeSlice.CustomData)
430  {
431  MethodInvoker methodInvoker;
432  if (!methodInvokers.TryGetValue(update.DataType, out methodInvoker))
433  {
434  continue;
435  }
436 
437  try
438  {
439  foreach (var dataPoint in update.Data)
440  {
441  if (update.DataType.IsInstanceOfType(dataPoint))
442  {
443  methodInvoker(algorithm, dataPoint);
444  }
445  }
446  }
447  catch (Exception err)
448  {
449  algorithm.SetRuntimeError(err, "Custom Data");
450  return;
451  }
452  }
453 
454  try
455  {
456  if (timeSlice.Slice.Splits.Count != 0)
457  {
458  algorithm.OnSplits(timeSlice.Slice.Splits);
459  }
460  }
461  catch (Exception err)
462  {
463  algorithm.SetRuntimeError(err, "OnSplits");
464  return;
465  }
466 
467  try
468  {
469  if (timeSlice.Slice.Dividends.Count != 0)
470  {
471  algorithm.OnDividends(timeSlice.Slice.Dividends);
472  }
473  }
474  catch (Exception err)
475  {
476  algorithm.SetRuntimeError(err, "OnDividends");
477  return;
478  }
479 
480  try
481  {
482  if (timeSlice.Slice.Delistings.Count != 0)
483  {
484  algorithm.OnDelistings(timeSlice.Slice.Delistings);
485  }
486  }
487  catch (Exception err)
488  {
489  algorithm.SetRuntimeError(err, "OnDelistings");
490  return;
491  }
492 
493  // Only track pending delistings in non-live mode.
494  if (!algorithm.LiveMode)
495  {
496  // Keep this up to date even though we don't process delistings here anymore
497  foreach (var delisting in timeSlice.Slice.Delistings.Values)
498  {
499  if (delisting.Type == DelistingType.Warning)
500  {
501  // Store our delistings warnings because they are still used by ProcessSplitSymbols above
502  pendingDelistings.Add(delisting);
503  }
504  else
505  {
506  // If we have an actual delisting event, remove it from pending delistings
507  var index = pendingDelistings.FindIndex(x => x.Symbol == delisting.Symbol);
508  if (index != -1)
509  {
510  pendingDelistings.RemoveAt(index);
511  }
512  }
513  }
514  }
515 
516  // run split logic after firing split events
517  HandleSplitSymbols(timeSlice.Slice.Splits, splitWarnings);
518 
519  try
520  {
521  if (timeSlice.Slice.HasData)
522  {
523  // EVENT HANDLER v3.0 -- all data in a single event
524  algorithm.OnData(algorithm.CurrentSlice);
525  }
526 
527  // always turn the crank on this method to ensure universe selection models function properly on day changes w/out data
528  algorithm.OnFrameworkData(timeSlice.Slice);
529  }
530  catch (Exception err)
531  {
532  algorithm.SetRuntimeError(err, "OnData");
533  return;
534  }
535 
536  //If its the historical/paper trading models, wait until market orders have been "filled"
537  // Manually trigger the event handler to prevent thread switch.
538  transactions.ProcessSynchronousEvents();
539 
540  // Process any required events of the results handler such as sampling assets, equity, or stock prices.
541  results.ProcessSynchronousEvents();
542 
543  // poke the algorithm at the end of each time step
544  algorithm.OnEndOfTimeStep();
545 
546  } // End of ForEach feed.Bridge.GetConsumingEnumerable
547 
548  // stop timing the loops
549  TimeLimit.StopEnforcingTimeLimit();
550 
551  //Stream over:: Send the final packet and fire final events:
552  Log.Trace("AlgorithmManager.Run(): Firing On End Of Algorithm...");
553  try
554  {
555  algorithm.OnEndOfAlgorithm();
556  }
557  catch (Exception err)
558  {
559  algorithm.SetRuntimeError(err, "OnEndOfAlgorithm");
560  return;
561  }
562 
563  // Process any required events of the results handler such as sampling assets, equity, or stock prices.
564  results.ProcessSynchronousEvents(forceProcess: true);
565 
566  //Liquidate Holdings for Calculations:
567  if (_algorithm.Status == AlgorithmStatus.Liquidated && _liveMode)
568  {
569  Log.Trace("AlgorithmManager.Run(): Liquidating algorithm holdings...");
570  algorithm.Liquidate();
571  results.LogMessage("Algorithm Liquidated");
572  results.SendStatusUpdate(AlgorithmStatus.Liquidated);
573  }
574 
575  //Manually stopped the algorithm
576  if (_algorithm.Status == AlgorithmStatus.Stopped)
577  {
578  Log.Trace("AlgorithmManager.Run(): Stopping algorithm...");
579  results.LogMessage("Algorithm Stopped");
580  results.SendStatusUpdate(AlgorithmStatus.Stopped);
581  }
582 
583  //Backtest deleted.
584  if (_algorithm.Status == AlgorithmStatus.Deleted)
585  {
586  Log.Trace("AlgorithmManager.Run(): Deleting algorithm...");
587  results.DebugMessage("Algorithm Id:(" + job.AlgorithmId + ") Deleted by request.");
588  results.SendStatusUpdate(AlgorithmStatus.Deleted);
589  }
590 
591  //Algorithm finished, send regardless of commands:
592  results.SendStatusUpdate(AlgorithmStatus.Completed);
593  SetStatus(AlgorithmStatus.Completed);
594 
595  //Take final samples:
596  results.Sample(time);
597 
598  } // End of Run();
599 
600  /// <summary>
601  /// Set the quit state.
602  /// </summary>
603  public void SetStatus(AlgorithmStatus state)
604  {
605  lock (_lock)
606  {
607  //We don't want anyone else to set our internal state to "Running".
608  //This is controlled by the algorithm private variable only.
609  //Algorithm could be null after it's initialized and they call Run on us
610  if (state != AlgorithmStatus.Running && _algorithm != null)
611  {
612  _algorithm.SetStatus(state);
613  }
614  }
615  }
616 
617  private IEnumerable<TimeSlice> Stream(IAlgorithm algorithm, ISynchronizer synchronizer, IResultHandler results, CancellationToken cancellationToken)
618  {
619  var nextWarmupStatusTime = DateTime.MinValue;
620  var warmingUp = algorithm.IsWarmingUp;
621  var warmingUpPercent = 0;
622  if (warmingUp)
623  {
624  nextWarmupStatusTime = DateTime.UtcNow.AddSeconds(1);
625  algorithm.Debug("Algorithm starting warm up...");
626  results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}");
627  }
628  else
629  {
630  results.SendStatusUpdate(AlgorithmStatus.Running);
631  // let's be polite, and call warmup finished even though there was no warmup period and avoid algorithms having to handle it instead.
632  // we trigger this callback here and not internally in the algorithm so that we can go through python if required
633  algorithm.OnWarmupFinished();
634  }
635 
636  // bellow we compare with slice.Time which is in UTC
637  var startTimeTicks = algorithm.UtcTime.Ticks;
638  var warmupEndTicks = algorithm.StartDate.ConvertToUtc(algorithm.TimeZone).Ticks;
639 
640  // fulfilling history requirements of volatility models in live mode
641  if (algorithm.LiveMode)
642  {
643  warmupEndTicks = DateTime.UtcNow.Ticks;
644  ProcessVolatilityHistoryRequirements(algorithm, _liveMode);
645  }
646 
647  foreach (var timeSlice in synchronizer.StreamData(cancellationToken))
648  {
649  if (algorithm.IsWarmingUp)
650  {
651  var now = DateTime.UtcNow;
652  if (now > nextWarmupStatusTime)
653  {
654  // send some status to the user letting them know we're done history, but still warming up,
655  // catching up to real time data
656  nextWarmupStatusTime = now.AddSeconds(2);
657  var newPercent = (int)(100 * (timeSlice.Time.Ticks - startTimeTicks) / (double)(warmupEndTicks - startTimeTicks));
658  // if there isn't any progress don't send the same update many times
659  if (newPercent != warmingUpPercent)
660  {
661  warmingUpPercent = newPercent;
662  algorithm.Debug($"Processing algorithm warm-up request {warmingUpPercent}%...");
663  results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}");
664  }
665  }
666  }
667  else if (warmingUp)
668  {
669  // warmup finished, send an update
670  warmingUp = false;
671  // we trigger this callback here and not internally in the algorithm so that we can go through python if required
672  algorithm.OnWarmupFinished();
673  algorithm.Debug("Algorithm finished warming up.");
674  results.SendStatusUpdate(AlgorithmStatus.Running, "100");
675  }
676  yield return timeSlice;
677  }
678  }
679 
680  /// <summary>
681  /// Helper method used to process securities volatility history requirements
682  /// </summary>
683  /// <remarks>Implemented as static to facilitate testing</remarks>
684  /// <param name="algorithm">The algorithm instance</param>
685  /// <param name="liveMode">Whether the algorithm is in live mode</param>
686  public static void ProcessVolatilityHistoryRequirements(IAlgorithm algorithm, bool liveMode)
687  {
688  Log.Trace("ProcessVolatilityHistoryRequirements(): Updating volatility models with historical data...");
689 
690  foreach (var security in algorithm.Securities.Values)
691  {
692  security.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime,
693  algorithm.TimeZone, liveMode);
694  }
695 
696  Log.Trace("ProcessVolatilityHistoryRequirements(): finished.");
697  }
698 
699  /// <summary>
700  /// Helper method to apply a split to an algorithm instance
701  /// </summary>
702  public static void HandleSplits(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode)
703  {
704  foreach (var split in timeSlice.Slice.Splits.Values)
705  {
706  try
707  {
708  // only process split occurred events (ignore warnings)
709  if (split.Type != SplitType.SplitOccurred)
710  {
711  continue;
712  }
713 
714  if (liveMode && algorithm.IsWarmingUp)
715  {
716  // skip past split during live warmup, the algorithms position already reflects them
717  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Split during live warmup: {split}");
718  continue;
719  }
720 
721  if (Log.DebuggingEnabled)
722  {
723  Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Split for {split.Symbol}");
724  }
725 
726  Security security = null;
727  if (algorithm.Securities.TryGetValue(split.Symbol, out security) && liveMode)
728  {
729  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
730  }
731 
733  .GetSubscriptionDataConfigs(split.Symbol)
734  .DataNormalizationMode();
735 
736  // apply the split event to the portfolio
737  algorithm.Portfolio.ApplySplit(split, security, liveMode, mode);
738 
739  // apply the split event to the trade builder
740  algorithm.TradeBuilder.ApplySplit(split, liveMode, mode);
741 
742  // apply the split event to the security volatility model
743  ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
744 
745  if (liveMode && security != null)
746  {
747  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
748  }
749 
750  // apply the split to open orders as well in raw mode, all other modes are split adjusted
751  if (liveMode || mode == DataNormalizationMode.Raw)
752  {
753  // in live mode we always want to have our order match the order at the brokerage, so apply the split to the orders
754  var openOrders = algorithm.Transactions.GetOpenOrderTickets(ticket => ticket.Symbol == split.Symbol);
755  algorithm.BrokerageModel.ApplySplit(openOrders.ToList(), split);
756  }
757  }
758  catch (Exception err)
759  {
760  algorithm.SetRuntimeError(err, "Split event");
761  return;
762  }
763  }
764  }
765 
766  /// <summary>
767  /// Helper method to apply a dividend to an algorithm instance
768  /// </summary>
769  public static void HandleDividends(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode)
770  {
771  foreach (var dividend in timeSlice.Slice.Dividends.Values)
772  {
773  if (liveMode && algorithm.IsWarmingUp)
774  {
775  // skip past dividends during live warmup, the algorithms position already reflects them
776  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Dividend during live warmup: {dividend}");
777  continue;
778  }
779 
780  if (Log.DebuggingEnabled)
781  {
782  Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Dividend: {dividend}");
783  }
784 
785  Security security = null;
786  if (algorithm.Securities.TryGetValue(dividend.Symbol, out security) && liveMode)
787  {
788  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Dividend: {dividend}. " +
789  $"Security Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
790  $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
791  }
792 
794  .GetSubscriptionDataConfigs(dividend.Symbol)
795  .DataNormalizationMode();
796 
797  // apply the dividend event to the portfolio
798  algorithm.Portfolio.ApplyDividend(dividend, liveMode, mode);
799 
800  // apply the dividend event to the security volatility model
801  ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
802 
803  if (liveMode && security != null)
804  {
805  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Dividend: {dividend}. Security " +
806  $"Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
807  $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
808  }
809  }
810  }
811 
812  /// <summary>
813  /// Keeps track of split warnings so we can later liquidate option contracts
814  /// </summary>
815  private void HandleSplitSymbols(Splits newSplits, List<Split> splitWarnings)
816  {
817  foreach (var split in newSplits.Values)
818  {
819  if (split.Type != SplitType.Warning)
820  {
821  if (Log.DebuggingEnabled)
822  {
823  Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split occurred: Split Factor: {split} Reference Price: {split.ReferencePrice}");
824  }
825  continue;
826  }
827 
828  if (Log.DebuggingEnabled)
829  {
830  Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split warning: {split}");
831  }
832 
833  if (!splitWarnings.Any(x => x.Symbol == split.Symbol && x.Type == SplitType.Warning))
834  {
835  splitWarnings.Add(split);
836  }
837  }
838  }
839 
840  /// <summary>
841  /// Liquidate option contact holdings who's underlying security has split
842  /// </summary>
843  private void ProcessSplitSymbols(IAlgorithm algorithm, List<Split> splitWarnings, List<Delisting> pendingDelistings)
844  {
845  // NOTE: This method assumes option contracts have the same core trading hours as their underlying contract
846  // This is a small performance optimization to prevent scanning every contract on every time step,
847  // instead we scan just the underlyings, thereby reducing the time footprint of this methods by a factor
848  // of N, the number of derivative subscriptions
849  for (int i = splitWarnings.Count - 1; i >= 0; i--)
850  {
851  var split = splitWarnings[i];
852  var security = algorithm.Securities[split.Symbol];
853 
854  if (!security.IsTradable
855  && !algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol))
856  {
857  Log.Debug($"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - Removing split warning for {security.Symbol}");
858 
859  // remove the warning from out list
860  splitWarnings.RemoveAt(i);
861  // Since we are storing the split warnings for a loop
862  // we need to check if the security was removed.
863  // When removed, it will be marked as non tradable but just in case
864  // we expect it not to be an active security either
865  continue;
866  }
867 
868  var nextMarketClose = security.Exchange.Hours.GetNextMarketClose(security.LocalTime, false);
869 
870  // determine the latest possible time we can submit a MOC order
871  var configs = algorithm.SubscriptionManager.SubscriptionDataConfigService
872  .GetSubscriptionDataConfigs(security.Symbol);
873 
874  if (configs.Count == 0)
875  {
876  // should never happen at this point, if it does let's give some extra info
877  throw new Exception(
878  $"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - No subscriptions found for {security.Symbol}" +
879  $", IsTradable: {security.IsTradable}" +
880  $", Active: {algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)}");
881  }
882 
883  var latestMarketOnCloseTimeRoundedDownByResolution = nextMarketClose.Subtract(MarketOnCloseOrder.SubmissionTimeBuffer)
884  .RoundDownInTimeZone(configs.GetHighestResolution().ToTimeSpan(), security.Exchange.TimeZone, configs.First().DataTimeZone);
885 
886  // we don't need to do anyhing until the market closes
887  if (security.LocalTime < latestMarketOnCloseTimeRoundedDownByResolution) continue;
888 
889  // fetch all option derivatives of the underlying with holdings (excluding the canonical security)
890  var derivatives = algorithm.Securities.Values.Where(potentialDerivate =>
891  potentialDerivate.Symbol.SecurityType.IsOption() &&
892  potentialDerivate.Symbol.Underlying == security.Symbol &&
893  !potentialDerivate.Symbol.Underlying.IsCanonical() &&
894  potentialDerivate.HoldStock
895  );
896 
897  foreach (var derivative in derivatives)
898  {
899  var optionContractSymbol = derivative.Symbol;
900  var optionContractSecurity = (Option)derivative;
901 
902  if (pendingDelistings.Any(x => x.Symbol == optionContractSymbol
903  && x.Time.Date == optionContractSecurity.LocalTime.Date))
904  {
905  // if the option is going to be delisted today we skip sending the market on close order
906  continue;
907  }
908 
909  // close any open orders
910  algorithm.Transactions.CancelOpenOrders(optionContractSymbol, "Canceled due to impending split. Separate MarketOnClose order submitted to liquidate position.");
911 
912  var request = new SubmitOrderRequest(OrderType.MarketOnClose, optionContractSecurity.Type, optionContractSymbol,
913  -optionContractSecurity.Holdings.Quantity, 0, 0, algorithm.UtcTime,
914  "Liquidated due to impending split. Option splits are not currently supported."
915  );
916 
917  // send MOC order to liquidate option contract holdings
918  algorithm.Transactions.AddOrder(request);
919 
920  // mark option contract as not tradable
921  optionContractSecurity.IsTradable = false;
922 
923  algorithm.Debug($"MarketOnClose order submitted for option contract '{optionContractSymbol}' due to impending {split.Symbol.Value} split event. "
924  + "Option splits are not currently supported.");
925  }
926 
927  // remove the warning from out list
928  splitWarnings.RemoveAt(i);
929  }
930  }
931 
932  /// <summary>
933  /// Warms up the security's volatility model in the case of a split or dividend to avoid discontinuities when data is raw or in live mode
934  /// </summary>
935  private static void ApplySplitOrDividendToVolatilityModel(IAlgorithm algorithm, Security security, bool liveMode,
936  DataNormalizationMode dataNormalizationMode)
937  {
938  if (security.Type == SecurityType.Equity && (liveMode || dataNormalizationMode == DataNormalizationMode.Raw))
939  {
940  security?.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime,
941  algorithm.TimeZone, liveMode, dataNormalizationMode);
942  }
943  }
944 
945  /// <summary>
946  /// Determines if a data point is in it's native, configured resolution
947  /// </summary>
948  private static bool EndTimeIsInNativeResolution(SubscriptionDataConfig config, DateTime dataPointEndTime)
949  {
950  if (config.Resolution == Resolution.Tick
951  ||
952  // time zones don't change seconds or milliseconds so we can
953  // shortcut timezone conversions
954  (config.Resolution == Resolution.Second
955  || config.Resolution == Resolution.Minute)
956  && dataPointEndTime.Ticks % config.Increment.Ticks == 0)
957  {
958  return true;
959  }
960 
961  var roundedDataPointEndTime = dataPointEndTime.RoundDownInTimeZone(config.Increment, config.ExchangeTimeZone, config.DataTimeZone);
962  return dataPointEndTime == roundedDataPointEndTime;
963  }
964 
965  /// <summary>
966  /// Constructs the correct <see cref="ITokenBucket"/> instance per the provided controls.
967  /// The provided controls will be null when
968  /// </summary>
969  private static ITokenBucket CreateTokenBucket(LeakyBucketControlParameters controls)
970  {
971  if (controls == null)
972  {
973  // this will only be null when the AlgorithmManager is being initialized outside of LEAN
974  // for example, in unit tests that don't provide a job package as well as from Research
975  // in each of the above cases, it seems best to not enforce the leaky bucket restrictions
976  return TokenBucket.Null;
977  }
978 
979  Log.Trace("AlgorithmManager.CreateTokenBucket(): Initializing LeakyBucket: " +
980  $"Capacity: {controls.Capacity} " +
981  $"RefillAmount: {controls.RefillAmount} " +
982  $"TimeInterval: {controls.TimeIntervalMinutes}"
983  );
984 
985  // these parameters view 'minutes' as the resource being rate limited. the capacity is the total
986  // number of minutes available for burst operations and after controls.TimeIntervalMinutes time
987  // has passed, we'll add controls.RefillAmount to the 'minutes' available, maxing at controls.Capacity
988  return new LeakyBucket(
989  controls.Capacity,
990  controls.RefillAmount,
991  TimeSpan.FromMinutes(controls.TimeIntervalMinutes)
992  );
993  }
994  }
995 }