Lean  $LEAN_TAG$
SubscriptionManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using Python.Runtime;
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using NodaTime;
24 using QuantConnect.Util;
25 using QuantConnect.Python;
26 
27 namespace QuantConnect.Data
28 {
29  /// <summary>
30  /// Enumerable Subscription Management Class
31  /// </summary>
32  public class SubscriptionManager
33  {
34  private readonly PriorityQueue<ConsolidatorWrapper, DateTime> _consolidatorsSortedByScanTime;
35  private readonly Dictionary<IDataConsolidator, ConsolidatorWrapper> _consolidators;
36  private readonly ITimeKeeper _timeKeeper;
37  private IAlgorithmSubscriptionManager _subscriptionManager;
38 
39  /// <summary>
40  /// Instance that implements <see cref="ISubscriptionDataConfigService" />
41  /// </summary>
43 
44  /// <summary>
45  /// Returns an IEnumerable of Subscriptions
46  /// </summary>
47  /// <remarks>Will not return internal subscriptions</remarks>
48  public IEnumerable<SubscriptionDataConfig> Subscriptions => _subscriptionManager.SubscriptionManagerSubscriptions.Where(config => !config.IsInternalFeed);
49 
50  /// <summary>
51  /// The different <see cref="TickType" /> each <see cref="SecurityType" /> supports
52  /// </summary>
53  public Dictionary<SecurityType, List<TickType>> AvailableDataTypes => _subscriptionManager.AvailableDataTypes;
54 
55  /// <summary>
56  /// Get the count of assets:
57  /// </summary>
58  public int Count => _subscriptionManager.SubscriptionManagerCount();
59 
60  /// <summary>
61  /// Creates a new instance
62  /// </summary>
63  public SubscriptionManager(ITimeKeeper timeKeeper)
64  {
65  _consolidators = new();
66  _timeKeeper = timeKeeper;
67  _consolidatorsSortedByScanTime = new(1000);
68  }
69 
70  /// <summary>
71  /// Add Market Data Required (Overloaded method for backwards compatibility).
72  /// </summary>
73  /// <param name="symbol">Symbol of the asset we're like</param>
74  /// <param name="resolution">Resolution of Asset Required</param>
75  /// <param name="timeZone">The time zone the subscription's data is time stamped in</param>
76  /// <param name="exchangeTimeZone">
77  /// Specifies the time zone of the exchange for the security this subscription is for. This
78  /// is this output time zone, that is, the time zone that will be used on BaseData instances
79  /// </param>
80  /// <param name="isCustomData">True if this is custom user supplied data, false for normal QC data</param>
81  /// <param name="fillForward">when there is no data pass the last tradebar forward</param>
82  /// <param name="extendedMarketHours">Request premarket data as well when true </param>
83  /// <returns>
84  /// The newly created <see cref="SubscriptionDataConfig" /> or existing instance if it already existed
85  /// </returns>
87  Symbol symbol,
88  Resolution resolution,
89  DateTimeZone timeZone,
90  DateTimeZone exchangeTimeZone,
91  bool isCustomData = false,
92  bool fillForward = true,
93  bool extendedMarketHours = false
94  )
95  {
96  //Set the type: market data only comes in two forms -- ticks(trade by trade) or tradebar(time summaries)
97  var dataType = typeof(TradeBar);
98  if (resolution == Resolution.Tick)
99  {
100  dataType = typeof(Tick);
101  }
102 
103  var tickType = LeanData.GetCommonTickTypeForCommonDataTypes(dataType, symbol.SecurityType);
104  return Add(dataType, tickType, symbol, resolution, timeZone, exchangeTimeZone, isCustomData, fillForward,
105  extendedMarketHours);
106  }
107 
108  /// <summary>
109  /// Add Market Data Required - generic data typing support as long as Type implements BaseData.
110  /// </summary>
111  /// <param name="dataType">Set the type of the data we're subscribing to.</param>
112  /// <param name="tickType">Tick type for the subscription.</param>
113  /// <param name="symbol">Symbol of the asset we're like</param>
114  /// <param name="resolution">Resolution of Asset Required</param>
115  /// <param name="dataTimeZone">The time zone the subscription's data is time stamped in</param>
116  /// <param name="exchangeTimeZone">
117  /// Specifies the time zone of the exchange for the security this subscription is for. This
118  /// is this output time zone, that is, the time zone that will be used on BaseData instances
119  /// </param>
120  /// <param name="isCustomData">True if this is custom user supplied data, false for normal QC data</param>
121  /// <param name="fillForward">when there is no data pass the last tradebar forward</param>
122  /// <param name="extendedMarketHours">Request premarket data as well when true </param>
123  /// <param name="isInternalFeed">
124  /// Set to true to prevent data from this subscription from being sent into the algorithm's
125  /// OnData events
126  /// </param>
127  /// <param name="isFilteredSubscription">
128  /// True if this subscription should have filters applied to it (market hours/user
129  /// filters from security), false otherwise
130  /// </param>
131  /// <param name="dataNormalizationMode">Define how data is normalized</param>
132  /// <returns>
133  /// The newly created <see cref="SubscriptionDataConfig" /> or existing instance if it already existed
134  /// </returns>
136  Type dataType,
137  TickType tickType,
138  Symbol symbol,
139  Resolution resolution,
140  DateTimeZone dataTimeZone,
141  DateTimeZone exchangeTimeZone,
142  bool isCustomData,
143  bool fillForward = true,
144  bool extendedMarketHours = false,
145  bool isInternalFeed = false,
146  bool isFilteredSubscription = true,
147  DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted
148  )
149  {
150  return SubscriptionDataConfigService.Add(symbol, resolution, fillForward,
151  extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
152  new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(dataType, tickType) },
153  dataNormalizationMode).First();
154  }
155 
156 
157  /// <summary>
158  /// Add a consolidator for the symbol
159  /// </summary>
160  /// <param name="symbol">Symbol of the asset to consolidate</param>
161  /// <param name="consolidator">The consolidator</param>
162  /// <param name="tickType">Desired tick type for the subscription</param>
163  public void AddConsolidator(Symbol symbol, IDataConsolidator consolidator, TickType? tickType = null)
164  {
165  // Find the right subscription and add the consolidator to it
166  var subscriptions = Subscriptions.Where(x => x.Symbol == symbol).ToList();
167 
168  if (subscriptions.Count == 0)
169  {
170  // If we made it here it is because we never found the symbol in the subscription list
171  throw new ArgumentException("Please subscribe to this symbol before adding a consolidator for it. Symbol: " +
172  symbol.Value);
173  }
174 
175  foreach (var subscription in subscriptions)
176  {
177  // we need to be able to pipe data directly from the data feed into the consolidator
178  if (IsSubscriptionValidForConsolidator(subscription, consolidator, tickType))
179  {
180  subscription.Consolidators.Add(consolidator);
181 
182  var wrapper = _consolidators[consolidator] =
183  new ConsolidatorWrapper(consolidator, subscription.Increment, _timeKeeper, _timeKeeper.GetLocalTimeKeeper(subscription.ExchangeTimeZone));
184 
185  _consolidatorsSortedByScanTime.Enqueue(wrapper, wrapper.UtcScanTime);
186  return;
187  }
188  }
189 
190  string tickTypeException = null;
191  if (tickType != null && !subscriptions.Where(x => x.TickType == tickType).Any())
192  {
193  tickTypeException = $"No subscription with the requested Tick Type {tickType} was found. Available Tick Types: {string.Join(", ", subscriptions.Select(x => x.TickType))}";
194  }
195 
196  throw new ArgumentException(tickTypeException ?? ("Type mismatch found between consolidator and symbol. " +
197  $"Symbol: {symbol.Value} does not support input type: {consolidator.InputType.Name}. " +
198  $"Supported types: {string.Join(",", subscriptions.Select(x => x.Type.Name))}."));
199  }
200 
201  /// <summary>
202  /// Add a custom python consolidator for the symbol
203  /// </summary>
204  /// <param name="symbol">Symbol of the asset to consolidate</param>
205  /// <param name="pyConsolidator">The custom python consolidator</param>
206  public void AddConsolidator(Symbol symbol, PyObject pyConsolidator)
207  {
208  if (!pyConsolidator.TryConvert(out IDataConsolidator consolidator))
209  {
210  consolidator = new DataConsolidatorPythonWrapper(pyConsolidator);
211  }
212 
213  AddConsolidator(symbol, consolidator);
214  }
215 
216  /// <summary>
217  /// Removes the specified consolidator for the symbol
218  /// </summary>
219  /// <param name="symbol">The symbol the consolidator is receiving data from</param>
220  /// <param name="consolidator">The consolidator instance to be removed</param>
221  public void RemoveConsolidator(Symbol symbol, IDataConsolidator consolidator)
222  {
223  // let's try to get associated symbol, not required but nice to have
224  symbol ??= consolidator.Consolidated?.Symbol;
225  symbol ??= consolidator.WorkingData?.Symbol;
226 
227  // remove consolidator from each subscription
228  foreach (var subscription in _subscriptionManager.GetSubscriptionDataConfigs(symbol))
229  {
230  subscription.Consolidators.Remove(consolidator);
231  if (_consolidators.Remove(consolidator, out var consolidatorsToScan))
232  {
233  consolidatorsToScan.Dispose();
234  }
235  }
236 
237  // dispose of the consolidator to remove any remaining event handlers
238  consolidator.DisposeSafely();
239  }
240 
241  /// <summary>
242  /// Will trigger past consolidator scans
243  /// </summary>
244  /// <param name="newUtcTime">The new utc time</param>
245  /// <param name="algorithm">The algorithm instance</param>
246  public void ScanPastConsolidators(DateTime newUtcTime, IAlgorithm algorithm)
247  {
248  while (_consolidatorsSortedByScanTime.TryPeek(out _, out var utcScanTime) && utcScanTime < newUtcTime)
249  {
250  var consolidatorToScan = _consolidatorsSortedByScanTime.Dequeue();
251  if (consolidatorToScan.Disposed)
252  {
253  // consolidator has been removed
254  continue;
255  }
256 
257  if (utcScanTime != algorithm.UtcTime)
258  {
259  // only update the algorithm time once, it's not cheap because of TZ conversions
260  algorithm.SetDateTime(utcScanTime);
261  }
262 
263  if (consolidatorToScan.UtcScanTime <= utcScanTime)
264  {
265  // only scan if we still need to
266  consolidatorToScan.Scan();
267  }
268 
269  _consolidatorsSortedByScanTime.Enqueue(consolidatorToScan, consolidatorToScan.UtcScanTime);
270  }
271  }
272 
273  /// <summary>
274  /// Hard code the set of default available data feeds
275  /// </summary>
276  public static Dictionary<SecurityType, List<TickType>> DefaultDataTypes()
277  {
278  return new Dictionary<SecurityType, List<TickType>>
279  {
280  {SecurityType.Base, new List<TickType> {TickType.Trade}},
281  {SecurityType.Index, new List<TickType> {TickType.Trade}},
282  {SecurityType.Forex, new List<TickType> {TickType.Quote}},
283  {SecurityType.Equity, new List<TickType> {TickType.Trade, TickType.Quote}},
284  {SecurityType.Option, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
285  {SecurityType.FutureOption, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
286  {SecurityType.IndexOption, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
287  {SecurityType.Cfd, new List<TickType> {TickType.Quote}},
288  {SecurityType.Future, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
289  {SecurityType.Commodity, new List<TickType> {TickType.Trade}},
290  {SecurityType.Crypto, new List<TickType> {TickType.Trade, TickType.Quote}},
291  {SecurityType.CryptoFuture, new List<TickType> {TickType.Trade, TickType.Quote}}
292  };
293  }
294 
295  /// <summary>
296  /// Get the available data types for a security
297  /// </summary>
298  public IReadOnlyList<TickType> GetDataTypesForSecurity(SecurityType securityType)
299  {
300  return AvailableDataTypes[securityType];
301  }
302 
303  /// <summary>
304  /// Get the data feed types for a given <see cref="SecurityType" /> <see cref="Resolution" />
305  /// </summary>
306  /// <param name="symbolSecurityType">The <see cref="SecurityType" /> used to determine the types</param>
307  /// <param name="resolution">The resolution of the data requested</param>
308  /// <param name="isCanonical">Indicates whether the security is Canonical (future and options)</param>
309  /// <returns>Types that should be added to the <see cref="SubscriptionDataConfig" /></returns>
310  public List<Tuple<Type, TickType>> LookupSubscriptionConfigDataTypes(
311  SecurityType symbolSecurityType,
312  Resolution resolution,
313  bool isCanonical
314  )
315  {
316  return _subscriptionManager.LookupSubscriptionConfigDataTypes(symbolSecurityType, resolution, isCanonical);
317  }
318 
319  /// <summary>
320  /// Sets the Subscription Manager
321  /// </summary>
322  public void SetDataManager(IAlgorithmSubscriptionManager subscriptionManager)
323  {
324  _subscriptionManager = subscriptionManager;
325  }
326 
327  /// <summary>
328  /// Checks if the subscription is valid for the consolidator
329  /// </summary>
330  /// <param name="subscription">The subscription configuration</param>
331  /// <param name="consolidator">The consolidator</param>
332  /// <param name="desiredTickType">The desired tick type for the subscription. If not given is null.</param>
333  /// <returns>true if the subscription is valid for the consolidator</returns>
334  public static bool IsSubscriptionValidForConsolidator(SubscriptionDataConfig subscription, IDataConsolidator consolidator, TickType? desiredTickType = null)
335  {
336  if (subscription.Type == typeof(Tick) &&
338  {
339  if (desiredTickType == null)
340  {
342  consolidator.OutputType,
343  subscription.Symbol.SecurityType);
344 
345  return subscription.TickType == tickType;
346  }
347  else if (subscription.TickType != desiredTickType)
348  {
349  return false;
350  }
351  }
352 
353  return consolidator.InputType.IsAssignableFrom(subscription.Type);
354  }
355 
356  /// <summary>
357  /// Returns true if the provided data is the default data type associated with it's <see cref="SecurityType"/>.
358  /// This is useful to determine if a data point should be used/cached in an environment where consumers will not provider a data type and we want to preserve
359  /// determinism and backwards compatibility when there are multiple data types available per <see cref="SecurityType"/> or new ones added.
360  /// </summary>
361  /// <remarks>Temporary until we have a dictionary for the default data type per security type see GH issue 4196.
362  /// Internal so it's only accessible from this assembly.</remarks>
363  internal static bool IsDefaultDataType(BaseData data)
364  {
365  switch (data.Symbol.SecurityType)
366  {
367  case SecurityType.Equity:
368  if (data.DataType == MarketDataType.QuoteBar || data.DataType == MarketDataType.Tick && (data as Tick).TickType == TickType.Quote)
369  {
370  return false;
371  }
372  break;
373  }
374  return true;
375  }
376  }
377 }