Оптимизация расчета средних
При разработке торговых алгоритмов и индикаторов, например таких как Simple Moving Average или SMA, зачастую требуется выполнить расчет среднего значения некоторого показателя, например цены, за некоторый период времени. При этом количество значений, которые участвуют в расчете, может варьироваться от нескольких единиц до нескольких сотен или даже тысяч.
Более того, часто нужно рассчитывать среднее не по одному такому ряду, а по нескольким, к тому же сразу в нескольких стратегиях. Дело усложняется еще и тем, что эти расчеты чаще всего требуется проводить в режиме реального времени при поступлении новых данных о цене в моменты рыночных сделок или при обновлении данных в стаканах. А уж если говорить о тестировании стратегий, а тем более об их оптимизации, то скорость расчетов становится критически важной.
Давайте рассмотрим варианты, которые можно было бы использовать для расчетов средних по набору значений. Самый простой способ — в цикле пробежаться по всем значениям, посчитать сумму и результат разделить на их количество:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
#include <iostream> #include <vector> std::vector<double> values = { 1.92, 1.93, 1.96, 1.98, 1.99, 1.93, 1.95, 1.93, 1.95, 1.91, 1.90, 1.95, 1.91, 1.94, 1.94, 1.91, 1.90, 1.89, 1.88, 1.87, 1.86, 1.85, 1.84, 1.88, 1.89, 1.85, 1.89, 1.87, 1.85, 1.84, 1.83, 1.82, 1.81, 1.80, 1.81, 1.80, 1.81, 1.83, 1.81, 1.84 }; double calcNaiveAverage(std::vector<double> const& values) { double sum = 0.0; for(double value : values) sum += value; return sum / values.size(); } int main(int argc, char *argv[]) { double average = calcNaiveAverage(values); std::cout << "Average of array is: " << average << std::endl; return 0; } |
Этот подход вполне прост, понятен и не требует особых знаний в программировании, чтобы его реализовать. Можно даже воспользоваться готовым алгоритмом из стандартной библиотеки:
1 2 3 4 5 6 |
#include <numeric> double calcNaiveAverage(std::vector<double> const& values) { return std::accumulate(values.begin(), values.end(), 0.0) / values.size(); } |
Теперь представим, что у нас происходит регулярное обновление последнего значения в массиве, и нам нужно каждый раз выполнять этот расчет заново. Для этого будем генерировать эти значения, используя остаток от деления на 10 номера итерации:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
int main(int argc, char *argv[]) { int count = 10; double average; std::cout << "Naive averages of array are: | "; for(int i=0; i<count; i++) { values.back() = 1.8 + i%10*0.01; average = calcNaiveAverage(values); std::cout << average << " | "; } std::cout << std::endl; return 0; } |
Вывод нашей итоговой программы будет следующим:
1 |
Naive averages of array are: | 1.882 | 1.88225 | 1.8825 | 1.88275 | 1.883 | 1.88325 | 1.8835 | 1.88375 | 1.884 | 1.88425 | |
Нетрудно видеть, что при небольшом размере буфера с данными и малых значениях count, данный расчет вполне приемлем по скорости выполнения. Однако с ростом размеров буфера и достаточно частом обновлении его значений нагрузка будет повышаться. Как же можно оптимизировать данный алгоритм?
Если присмотреться повнимательнее, то можно заметить, что большинство значений, участвующих в расчете, остаются неизменными, а меняется только последнее значение. А что, если каким-то образом сделать расчет базового значения средней заранее и использовать его результат уже в итоговом вычислении при обновлении последнего значения массива? Ниже показано, как этого можно добиться:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
#include <iostream> #include <vector> #include <numeric> std::vector<double> values = { 1.92, 1.93, 1.96, 1.98, 1.99, 1.93, 1.95, 1.93, 1.95, 1.91, 1.90, 1.95, 1.91, 1.94, 1.94, 1.91, 1.90, 1.89, 1.88, 1.87, 1.86, 1.85, 1.84, 1.88, 1.89, 1.85, 1.89, 1.87, 1.85, 1.84, 1.83, 1.82, 1.81, 1.80, 1.81, 1.80, 1.81, 1.83, 1.81, 1.84 }; double calcPreAverage(std::vector<double> const& values) { return std::accumulate(values.begin(), values.end()-1, 0.0) / (values.size()-1); } double calcSmartAverage(double pre_average, std::vector<double> const& values) { return pre_average+(values.back()-pre_average)/values.size(); } int main(int argc, char *argv[]) { int count = 10; double average; double pre_average = calcPreAverage(values); std::cout << "Smart averages of array are: | "; for(int i=0; i<count; i++) { values.back() = 1.8 + i%10*0.01; average = calcSmartAverage(pre_average, values); std::cout << average << " | "; } std::cout << std::endl; return 0; } |
1 |
Smart averages of array are: | 1.882 | 1.88225 | 1.8825 | 1.88275 | 1.883 | 1.88325 | 1.8835 | 1.88375 | 1.884 | 1.88425 | |
Несколько слов о том, что мы тут сделали. Сначала мы рассчитали среднее по массиву с данными, исключая последнее значение (функция calcPreAverage() и вызов ее в строке 27) и результат запомнили в переменной pre_average. Затем в цикле в строке 32 передаем эту переменную для расчета окончательного среднего значения в функцию calcSmartAverage().
Теперь давайте посмотрим, что мы выиграли от усовершенствованного способа расчета среднего. Для этого немного переделаем код и воспользуемся классом TimingUtil, который был описан в статье о контейнере boost::circular_buffer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
#include <iostream> #include <vector> #include <numeric> #include "TimingUtil.h" std::vector<double> values = { 1.92, 1.93, 1.96, 1.98, 1.99, 1.93, 1.95, 1.93, 1.95, 1.91, 1.90, 1.95, 1.91, 1.94, 1.94, 1.91, 1.90, 1.89, 1.88, 1.87, 1.86, 1.85, 1.84, 1.88, 1.89, 1.85, 1.89, 1.87, 1.85, 1.84, 1.83, 1.82, 1.81, 1.80, 1.81, 1.80, 1.81, 1.83, 1.81, 1.84 }; double calcNaiveAverage(std::vector<double> const& values) { return std::accumulate(values.begin(), values.end(), 0.0) / values.size(); } double calcPreAverage(std::vector<double> const& values) { return std::accumulate(values.begin(), values.end()-1, 0.0) / (values.size()-1); } double calcSmartAverage(double pre_average, std::vector<double> const& values) { return pre_average+(values.back()-pre_average)/values.size(); } void testNaiveAverage(size_t count) { TimingUtil t("naive average"); for(size_t i=0; i<count; i++) { values.back() = 1.8 + i%10*0.01; calcNaiveAverage(values); } } void testSmartAverage(size_t count) { TimingUtil t("smart average"); double pre_average = calcPreAverage(values); for(size_t i=0; i<count; i++) { values.back() = 1.8 + i%10*0.01; calcSmartAverage(pre_average, values); } } int main(int argc, char *argv[]) { testNaiveAverage(1000000); testSmartAverage(1000000); return 0; } |
В результате выполнения программы можем наблюдать существенную разницу в скорости выполнения каждого алгоритма:
1 2 |
naive average took 25799 ms. smart average took 5424 ms. |
Итак, имея выигрыш по скорости почти в пять раз, можно значительно повысить общую производительность системы.
В дополнение нужно сказать, что если в массив добавляется новое значение, например, при смене какого-то временного интервала, то, конечно же, базовое среднее значение нужно будет пересчитать. Однако это будет происходить на несколько порядков реже и не будет вносить заметного влияния.
В следующей статье мы рассмотрим, как можно быстро рассчитать среднее значение не при обновлениях последнего элемента массива, а при добавлениях новых данных в конец очереди.