18 using System.Collections.Generic;
34 private readonly PriorityQueue<ConsolidatorWrapper, DateTime> _consolidatorsSortedByScanTime;
35 private readonly Dictionary<IDataConsolidator, ConsolidatorWrapper> _consolidators;
65 _consolidators =
new();
66 _timeKeeper = timeKeeper;
67 _consolidatorsSortedByScanTime =
new(1000);
89 DateTimeZone timeZone,
90 DateTimeZone exchangeTimeZone,
91 bool isCustomData =
false,
92 bool fillForward =
true,
93 bool extendedMarketHours =
false
100 dataType = typeof(
Tick);
104 return Add(dataType, tickType, symbol, resolution, timeZone, exchangeTimeZone, isCustomData, fillForward,
105 extendedMarketHours);
140 DateTimeZone dataTimeZone,
141 DateTimeZone exchangeTimeZone,
143 bool fillForward =
true,
144 bool extendedMarketHours =
false,
145 bool isInternalFeed =
false,
146 bool isFilteredSubscription =
true,
151 extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
152 new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(dataType, tickType) },
153 dataNormalizationMode).First();
166 var subscriptions =
Subscriptions.Where(x => x.Symbol == symbol).ToList();
168 if (subscriptions.Count == 0)
171 throw new ArgumentException(
"Please subscribe to this symbol before adding a consolidator for it. Symbol: " +
175 foreach (var subscription
in subscriptions)
180 subscription.Consolidators.Add(consolidator);
182 var wrapper = _consolidators[consolidator] =
183 new ConsolidatorWrapper(consolidator, subscription.Increment, _timeKeeper, _timeKeeper.GetLocalTimeKeeper(subscription.ExchangeTimeZone));
185 _consolidatorsSortedByScanTime.Enqueue(wrapper, wrapper.UtcScanTime);
190 string tickTypeException =
null;
191 if (tickType !=
null && !subscriptions.Where(x => x.TickType == tickType).Any())
193 tickTypeException = $
"No subscription with the requested Tick Type {tickType} was found. Available Tick Types: {string.Join(",
", subscriptions.Select(x => x.TickType))}";
196 throw new ArgumentException(tickTypeException ?? (
"Type mismatch found between consolidator and symbol. " +
197 $
"Symbol: {symbol.Value} does not support input type: {consolidator.InputType.Name}. " +
198 $
"Supported types: {string.Join(",
", subscriptions.Select(x => x.Type.Name))}."));
230 subscription.Consolidators.Remove(consolidator);
231 if (_consolidators.Remove(consolidator, out var consolidatorsToScan))
233 consolidatorsToScan.Dispose();
238 consolidator.DisposeSafely();
248 while (_consolidatorsSortedByScanTime.TryPeek(out _, out var utcScanTime) && utcScanTime < newUtcTime)
250 var consolidatorToScan = _consolidatorsSortedByScanTime.Dequeue();
251 if (consolidatorToScan.Disposed)
257 if (utcScanTime != algorithm.
UtcTime)
263 if (consolidatorToScan.UtcScanTime <= utcScanTime)
266 consolidatorToScan.Scan();
269 _consolidatorsSortedByScanTime.Enqueue(consolidatorToScan, consolidatorToScan.UtcScanTime);
278 return new Dictionary<SecurityType, List<TickType>>
324 _subscriptionManager = subscriptionManager;
336 if (subscription.
Type == typeof(
Tick) &&
339 if (desiredTickType ==
null)
345 return subscription.
TickType == tickType;
347 else if (subscription.
TickType != desiredTickType)
353 return consolidator.
InputType.IsAssignableFrom(subscription.
Type);
363 internal static bool IsDefaultDataType(
BaseData data)