Friday, June 9, 2017

Excel/VBA : Multi-threading example

Let us face the fact : there is no multi-threading possibilities for VBA. However, if you have any VBA program in your Excel workbook, create multiple copies of that workbook and finally start that VBA program within each of the copied workbook, you got multiple Excel workbooks (VBA programs) running simultaneously. By definition, that is multi-threading.

One may ask with very good reason why even bother, since we already have easy-to-use multi-threading libraries available for more sophisticated languages ? Sometimes you may not have any other choice. A few years ago I started to work with one "state-of-the-art" analytics library for processing some extremely time-consuming calculations. By that time, all calculations were supposed to be processed in Excel and we did not have any access to real development API. Very fortunately, I discovered a collection of relevant VBA interface functions available for that library. Despite of this amazing discovery, processing calculations in VBA was still annoyingly slow. Finally I decided to test the scheme described above. I created multiple copies of one master workbook (which was having a relevant program for processing required calculations), opened multiple Excel workbooks and finally started a program within each workbook (almost) simultaneously. This was truly a "poor man's multi-threading", but despite of that it really did the job well. Example program in this post is taking this described scheme a bit further, as it completely removes the burden of administrating required Excel workbooks.

The main idea for this program is to create desired amount of active workbook copies (which has a relevant program for processing required calculations) into a folder. Moreover, for each of the workbook copy, corresponding VB script file will be created for starting required VBA program within Excel workbook copy. VB script is also taking care of all relevant administrative responsibilities (cleaning all Excel workbooks and VB script files from folder after program execution). Calculation results from different Excel threads will be printed into a shared text file. It should be noted, that SomeComplexAlgorithm procedure is an entry point for Excel thread (started by VB script). For brevity reasons, the content of this example program has been left to be trivial (simulate random number between one and ten for delay time execution and finally store that number into a collection).

Insert a new VBA module and copy-paste the following program.

Option Explicit
'
' common text file for results from all Excel threads
Private Const resultsFilePathName As String = "C:\Users\Administrator\Desktop\ExcelThreading\shared.txt"
'
Public Sub CreateExcelThreads()
    '
    ' clean results text file
    Dim fileSystem As New Scripting.FileSystemObject
    fileSystem.CreateTextFile resultsFilePathName, True
    Set fileSystem = Nothing
    '
    ' create (and execute) Excel workbook threads
    Dim ExcelThreadsFolderPathName As String: ExcelThreadsFolderPathName = "C:\Users\Administrator\Desktop\ExcelThreading\"
    Dim numberOfExcelThreads As Integer: numberOfExcelThreads = 4
    Dim ExcelThreadName As String
    Dim i As Integer
    '
    For i = 1 To numberOfExcelThreads
        ExcelThreadName = "ExcelThread_" + VBA.CStr(i)
        ExecuteExcelThread "SomeComplexAlgorithm", ExcelThreadName, ExcelThreadsFolderPathName
    Next i
End Sub
'
Public Function ExecuteExcelThread(ByVal TargetProgram As String, ByVal ExcelThreadName As String, ByVal ExcelThreadsPathName As String)
    '
    ' save a copy of current active workbook
    Dim ExcelThreadFilePathName As String: ExcelThreadFilePathName = ExcelThreadsPathName + ExcelThreadName + ".xlsm"
    TargetProgram = ExcelThreadName + ".xlsm!" + TargetProgram
    Dim VBScriptFilePathName As String: VBScriptFilePathName = ExcelThreadsPathName + ExcelThreadName + ".vbs"
    ActiveWorkbook.SaveCopyAs ExcelThreadFilePathName
    '
    ' create commands for VB script file
    Dim fileSystem As New Scripting.FileSystemObject
    Dim writer As Scripting.TextStream
    Set writer = fileSystem.OpenTextFile(VBScriptFilePathName, ForWriting, True)
    ' re-open previously saved Excel workbook
    writer.WriteLine "Set ExcelApplication = CreateObject(""Excel.Application"")"
    writer.WriteLine "Set ExcelWorkbook = ExcelApplication.Workbooks.Open(""" + ExcelThreadFilePathName + """)"
    writer.WriteLine "ExcelApplication.Visible = False"
    ' run target VBA program and close Excel workbook
    writer.WriteLine "ExcelWorkbook.Application.Run """ + TargetProgram + """"
    writer.WriteLine "ExcelApplication.ActiveWorkbook.Close True"
    writer.WriteLine "ExcelApplication.Application.Quit"
    ' delete copies of Excel workbook and VB script
    writer.WriteLine "Set fileSystem = CreateObject(""Scripting.FileSystemObject"")"
    writer.WriteLine "fileSystem.DeleteFile """ + ExcelThreadFilePathName + """"
    writer.WriteLine "fileSystem.DeleteFile """ + VBScriptFilePathName + """"
    writer.Close
    Set writer = Nothing
    Set fileSystem = Nothing
    '
    ' execute VB script
    Dim scriptingShell As Object: Set scriptingShell = VBA.CreateObject("WScript.Shell")
    scriptingShell.Run VBScriptFilePathName
    Set scriptingShell = Nothing
End Function
'
Public Sub SomeComplexAlgorithm()
    '
    ' this is target program to be executed by Excel thread
    ' program creates N random numbers between 1 and 10, stores these into
    ' collection and finally prints the content into a specific text file
    Dim simulationResult As New Collection
    Dim i As Integer
    For i = 1 To 25
        ' due to brevity reasons, we just simulate some time-consuming algorithm
        Dim delayTime As Long: delayTime = WorksheetFunction.RandBetween(1, 10)
        Sleep delayTime
        ' store one simulated result (random delay time) into collection
        simulationResult.Add delayTime
    Next i
    '
    ' print result collection into a specific text file
    ' we have to be prepared for the case in which multiple users (Excel threads)
    ' are accessing the same specific text file at the same time
recoveryPoint:
    On Error GoTo errorHandler
    Dim fileSystem As New Scripting.FileSystemObject
    Dim writer As Scripting.TextStream
    '
    ' if the file is in use, error will be thrown below here
    Set writer = fileSystem.OpenTextFile(resultsFilePathName, ForAppending, False)
    For i = 1 To simulationResult.Count
        writer.WriteLine ActiveWorkbook.Name + "=" + VBA.CStr(simulationResult(i))
    Next i
    writer.Close
    Set simulationResult = Nothing
    Set writer = Nothing
    Set fileSystem = Nothing
    Exit Sub
    '
errorHandler:
    ' get one second delay and re-access text file
    Sleep 1
    Resume recoveryPoint
End Sub
'
Public Function Sleep(ByVal seconds As Long)
    '
    Dim startTime As Long: startTime = VBA.Timer
    Do
        If (VBA.Timer >= (startTime + seconds)) Then Exit Do
    Loop
End Function
'

Simulating 100 random numbers (setting delay time to 1) using just one Excel thread : processing time 0:01:50.














Simulating 100 random numbers (setting delay time to 1) using four Excel threads (25 numbers for each thread) : processing time 0:00:30, which turns out to be almost quadruple time improvement in comparison with single-threaded processing.














Finally, thanks for reading this blog.
-Mike

Friday, March 24, 2017

C++11 : Wrapper for Bloomberg API Reference Data

Bloomberg is offering an access to its market data API with several different programming languages. Previously, I have presented my wrappers for VBA and C#. This time, I wanted to open up an implementation for C++ wrapper, which covers different types of reference data requests (snapshot, historical, intraday bar, intraday tick). For reference data, API functionality is basically the same for all types of data request types : create inputs (security, field), start session, open service, create request, send request to server, poll for results, unzip messages from server and finally, stop session.

Notes


There were some tough decisions to make. In my C# wrapper, I was sub-classing algorithm for handling messages from server (unzipping message content into data structure). Within this implementation, I made a decision to prefer straightforward simplicity, even it would mean a bit of data duplication. Personally, when using this kind of functionality, I would prefer to have just one single class (one header, one implementation), instead of having several classes.

Another big issue was concerning data output coming from Bloomberg server back to a client : how to pack all requested data into data structure to be easily used by a client ? In C# implementation we had dynamic data type, which enables us to create arrays of dynamic data types. However, in C++ there are no heterogeneous data structures available, besides std::tuple. Initially, I was considering to use tuple as result data structure, but I decided to reject the idea due to material increase in general complexity. I decided to return all possible data types from Bloomberg server as std::string within result data structure. It would be then up to a client to convert these values "back to their origins", by using std::stod or some other conversion method.

For the sake of clarity, I have created type definitions for several nested data structures used in the program : Vector [std::vector<std::string>], Matrix for all types of reference data [std::vector<Vector>] and Cube for historical time-series data [std::vector<Matrix>].

Finally, in order to "make it happen", a set of header files and libraries has to be configured into Visual Studio project. Extremely well-written and complete step-by-step tutorial is available in here.

Header file


#pragma once
//
#include <memory>
#include <string>
#include <vector>
#include <algorithm>
#include <blpapi_session.h>
//
namespace MikeJuniperhillBloombergCPPDesktopAPI
{
 using namespace BloombergLP::blpapi;
 using Vector = std::vector<std::string>; // string vector
 using Matrix = std::vector<Vector>; // string matrix
 using Cube = std::vector<Matrix>; // string cube
 enum ENUM_PRICING_OPTION { PRICING_OPTION_PRICE, PRICING_OPTION_YIELD };
 enum ENUM_PERIODICITY_ADJUSTMENT { ACTUAL, CALENDAR, FISCAL };
 enum ENUM_PERIODICITY_SELECTION { DAILY, WEEKLY, MONTHLY, QUARTERLY, SEMI_ANNUALLY, YEARLY };
 enum ENUM_NON_TRADING_DAY_FILL_OPTION { NON_TRADING_WEEKDAYS, ALL_CALENDAR_DAYS, ACTIVE_DAYS_ONLY };
 enum ENUM_NON_TRADING_DAY_FILL_METHOD { PREVIOUS_VALUE, NIL_VALUE };
 //
 class BBCOMMDataRequest
 {
 public:
  BBCOMMDataRequest(std::string serverHost = "localhost", unsigned short serverPort = 8194);
  void Start();
  void GetReferenceData(Vector& securities, Vector& fields, Matrix& result,
   Vector& overrideFields = Vector(), Vector& overrideValues = Vector());
  void GetHistoricalData(Vector& securities, Vector& fields, Cube& result,
   std::string startDate, std::string endDate,
   ENUM_PRICING_OPTION pricingOption = PRICING_OPTION_PRICE,
   ENUM_PERIODICITY_SELECTION periodicitySelection = DAILY,
   ENUM_PERIODICITY_ADJUSTMENT periodicityAdjustment = ACTUAL,
   ENUM_NON_TRADING_DAY_FILL_OPTION nonTradingDayFillOption = ALL_CALENDAR_DAYS,
   ENUM_NON_TRADING_DAY_FILL_METHOD nonTradingDayFillMethod = PREVIOUS_VALUE,
   std::string overrideCurrency = std::string(),
   Vector& overrideFields = Vector(),
   Vector& overrideValues = Vector());
  void GetIntradayBarData(std::string security, Vector& fields, Matrix& result, 
   std::string eventType, int intervalInMinutes, Datetime start, Datetime end);
  void GetIntradayTickData(std::string security, Vector& fields, Matrix& result, Vector& eventTypes,
   Datetime start, Datetime end, bool includeConditionCodes = false, bool includeExchangeCodes = false);
  void Stop();
 private:
  std::unique_ptr<SessionOptions> sessionOptions;
  std::unique_ptr<Session> session;
  std::unique_ptr<Service> service;
  std::unique_ptr<Request> request;
  std::string serverHost;
  unsigned short serverPort;
 };
}
//


Implementation file


#pragma once
#include "BBCOMMDataRequest.h"
using namespace MikeJuniperhillBloombergCPPDesktopAPI;
//
BBCOMMDataRequest::BBCOMMDataRequest(std::string serverHost, unsigned short serverPort) 
 : serverHost(serverHost), serverPort(serverPort)
{
 // ctor with default parameters for server hosting and port
}
void BBCOMMDataRequest::Start()
{
 // create objects : session options and session
 sessionOptions = std::unique_ptr<SessionOptions>(new SessionOptions);
 sessionOptions->setServerHost(serverHost.c_str());
 sessionOptions->setServerPort(serverPort);
 session = std::unique_ptr<Session>(new Session(*sessionOptions));
 //
 // start session and open service, throw if not succeeded
 if (!session->start()) throw std::exception("session not started");
 if (!session->openService("//blp/refdata")) throw std::exception("service not opened");
}
void BBCOMMDataRequest::GetReferenceData(Vector& securities, Vector& fields, Matrix& result,
 Vector& overrideFields, Vector& overrideValues)
{
 // initialize result data structure with default '#N/A' for all items
 for (unsigned int i = 0; i < securities.size(); ++i)
 {
  Vector innerVector(fields.size(), "#N/A");
  result.push_back(innerVector);
 }
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("ReferenceDataRequest")));
 //
 // append securities and field requests into request object
 std::for_each(securities.begin(), securities.end(), [this](std::string s) { request->append("securities", s.c_str()); });
 std::for_each(fields.begin(), fields.end(), [this](std::string f) { request->append("fields", f.c_str()); });
 //
 // conditionally, append overrides into request object
 if (overrideFields.size() > 0)
 {
  Element requestOverrides = request->getElement("overrides");
  for (unsigned int i = 0; i < overrideFields.size(); ++i)
  {
   Element requestOverride = requestOverrides.appendElement();
   requestOverride.setElement("fieldId", overrideFields[i].c_str());
   requestOverride.setElement("value", overrideValues[i].c_str());
  }
 }
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   while (bMessageIterator.next())
   {
    // get access to message, extract all included securities
    Message bMessage = bMessageIterator.message();
    Element bSecurities = bMessage.getElement("securityData");
    int nSecurities = bSecurities.numValues();
    //
    // loop through all securities in current server response batch
    for (int i = 0; i < nSecurities; ++i)
    {
     // extract one security and all available fields for this security
     Element bSecurity = bSecurities.getValueAsElement(i);
     Element bFields = bSecurity.getElement("fieldData");
     int sequenceNumber = bSecurity.getElementAsInt32("sequenceNumber");
     int nFieldNames = fields.size();
     //
     // loop through all requested fields
     for (int j = 0; j < nFieldNames; ++j)
     {
      // assign string value only if the field is included in delivery
      if (bFields.hasElement(fields[j].c_str()))
      {
       Element bField = bFields.getElement(fields[j].c_str());
       result[sequenceNumber][j] = bField.getValueAsString();
      }
     }
    }
   }
   // when event type is response, there will be no more messages from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::GetHistoricalData(Vector& securities, Vector& fields, Cube& result,
 std::string startDate, std::string endDate,
 ENUM_PRICING_OPTION pricingOption,
 ENUM_PERIODICITY_SELECTION periodicitySelection,
 ENUM_PERIODICITY_ADJUSTMENT periodicityAdjustment,
 ENUM_NON_TRADING_DAY_FILL_OPTION nonTradingDayFillOption,
 ENUM_NON_TRADING_DAY_FILL_METHOD nonTradingDayFillMethod,
 std::string overrideCurrency,
 Vector& overrideFields,
 Vector& overrideValues)
{
 bool resultHasDimension = false;
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("HistoricalDataRequest")));
 //
 // append securities and field requests into request object
 std::for_each(securities.begin(), securities.end(), [this](std::string s) { request->append("securities", s.c_str()); });
 std::for_each(fields.begin(), fields.end(), [this](std::string f) { request->append("fields", f.c_str()); });
 //
 // conditionally, append overrides into request object
 if (overrideFields.size() > 0)
 {
  Element requestOverrides = request->getElement("overrides");
  for (unsigned int i = 0; i < overrideFields.size(); ++i)
  {
   Element requestOverride = requestOverrides.appendElement();
   requestOverride.setElement("fieldId", overrideFields[i].c_str());
   requestOverride.setElement("value", overrideValues[i].c_str());
  }
 }
 //
 // set optional parameters for historical data request
 request->set("startDate", startDate.c_str());
 request->set("endDate", endDate.c_str());
 request->set("pricingOption", pricingOption);
 request->set("periodicityAdjustment", periodicityAdjustment);
 request->set("periodicitySelection", periodicitySelection);
 request->set("nonTradingDayFillOption", nonTradingDayFillOption);
 request->set("nonTradingDayFillMethod", nonTradingDayFillMethod);
 if (!overrideCurrency.empty()) request->set("currency", overrideCurrency.c_str());
 //
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  //
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   // unzip and pack messages received from BBCOMM server
   while (bMessageIterator.next())
   {
    // receive one security per message and multiple messages per event
    Message bMessage = bMessageIterator.message();
    Element bSecurity = bMessage.getElement("securityData");
    Element bFields = bSecurity.getElement("fieldData");
    int sequenceNumber = bSecurity.getElementAsInt32("sequenceNumber");
    int nFieldNames = fields.size();
    unsigned int nObservationDates = bFields.numValues();
    //
    // the exact dimension for result data structure will be known 
    // only when the response has been received from BBCOMM server
    if (!resultHasDimension)
    {
     // initialize result data structure with default '#N/A' for all items
     for (unsigned int i = 0; i < securities.size(); ++i)
     {
      Matrix outerVector;
      for (unsigned int j = 0; j < nObservationDates; j++)
      {
       Vector innerVector(fields.size() + 1, "#N/A");
       outerVector.push_back(innerVector);
      }
      result.push_back(outerVector);
     }
     resultHasDimension = true;
    }
    // loop through all observation dates
    for (unsigned int i = 0; i < nObservationDates; ++i)
    {
     Element observationDateFields = bFields.getValueAsElement(i);
     result[sequenceNumber][i][0] = observationDateFields.getElementAsString("date");
     // loop through all requested fields for a given observation date
     // and pack results data into data structure
     for (int j = 0; j < nFieldNames; j++)
     {
      // pack field value into data structure only if such value is available
      if (observationDateFields.hasElement(fields[j].c_str()))
      {
       result[sequenceNumber][i][j + 1] = observationDateFields.getElement(fields[j].c_str()).getValueAsString();
      }
     }
    }
   }
   // when event type is response, there will be no more messages coming from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::GetIntradayBarData(std::string security, Vector& fields, Matrix& result, std::string eventType, int intervalInMinutes,
 Datetime start, Datetime end)
{
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("IntradayBarRequest")));
 request->set("security", security.c_str());
 request->set("eventType", eventType.c_str());
 request->set("interval", intervalInMinutes);
 request->set("startDateTime", start);
 request->set("endDateTime", end);
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  //
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   // unzip and pack messages received from BBCOMM server
   while (bMessageIterator.next())
   {
    // receive one security per message and multiple messages per event
    Message bMessage = bMessageIterator.message();
    Element bData = bMessage.getElement("barData").getElement("barTickData");
    int nBars = bData.numValues();
    //
    for (int i = 0; i < nBars; ++i)
    {
     Element bBar = bData.getValueAsElement(i);
     if (!bBar.isNull())
     {
      Vector innerVector;
      for (unsigned int j = 0; j < fields.size(); ++j)
      {
       if (bBar.hasElement(fields[j].c_str()))
        innerVector.push_back(bBar.getElementAsString(fields[j].c_str()));
      }
      result.push_back(innerVector);
     }
    }
   }
   // when event type is response, there will be no more messages coming from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::GetIntradayTickData(std::string security, Vector& fields, Matrix& result, Vector& eventTypes,
 Datetime start, Datetime end, bool includeConditionCodes, bool includeExchangeCodes)
{
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("IntradayTickRequest")));
 request->set("security", security.c_str());
 request->set("startDateTime", start);
 request->set("endDateTime", end);
 std::for_each(eventTypes.begin(), eventTypes.end(), [this](std::string e) { request->append("eventTypes", e.c_str()); });
 if (includeConditionCodes) request->set("includeConditionCodes", true);
 if (includeExchangeCodes) request->set("includeExchangeCodes", true);
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  //
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   // unzip and pack messages received from BBCOMM server
   while (bMessageIterator.next())
   {
    // receive one security per message and multiple messages per event
    Message bMessage = bMessageIterator.message();
    Element bData = bMessage.getElement("tickData").getElement("tickData");
    int nTicks = bData.numValues();
    //
    for (int i = 0; i < nTicks; ++i)
    {
     Element bItem = bData.getValueAsElement(i);
     if (!bItem.isNull())
     {
      Vector innerVector;
      for (unsigned int j = 0; j < fields.size(); ++j)
      {
       if (bItem.hasElement(fields[j].c_str()))
        innerVector.push_back(bItem.getElementAsString(fields[j].c_str()));
      }
      // include conditionally requested condition and exchange codes
      if (bItem.hasElement("conditionCodes") && (includeConditionCodes))
       innerVector.push_back(bItem.getElementAsString("conditionCodes"));
      if (bItem.hasElement("exchangeCodes") && (includeExchangeCodes))
       innerVector.push_back(bItem.getElementAsString("exchangeCodes"));
      result.push_back(innerVector);
     }
    }
   }
   // when event type is response, there will be no more messages coming from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::Stop()
{
 session->stop();
}


Tester file


#include <iostream>
#include "BBCOMMDataRequest.h"
//
namespace MJ = MikeJuniperhillBloombergCPPDesktopAPI;
void PrintMatrix(MJ::Matrix& result);
void PrintCube(MJ::Cube& result);
void CreateEquitySecurityList(std::vector<std::string>& securities);
void CreateRateSecurityList(std::vector<std::string>& securities);
//
int main()
{
 try
 {
  // start bloomberg session
  MJ::BBCOMMDataRequest bloomberg;
  bloomberg.Start();
  //
  std::cout << "CASE 1 : create reference data request without overrides >" << std::endl;
  // create list of rate securities and fields, request data and print result
  MJ::Vector securities;
  CreateRateSecurityList(securities);
  MJ::Vector fields{ "PARSEKYABLE_DES", "PX_LAST" };
  MJ::Matrix matrixResult;
  bloomberg.GetReferenceData(securities, fields, matrixResult);
  PrintMatrix(matrixResult);
  //
  std::cout << "CASE 2 : create reference data request with overrides >" << std::endl;
  // create list of equity securities and fields, request data and print result
  CreateEquitySecurityList(securities);
  fields.push_back("BEST_PE_RATIO");
  MJ::Vector overrideFields{ "BEST_FPERIOD_OVERRIDE" };
  MJ::Vector overrideValues{ "3FY" };
  matrixResult.clear();
  bloomberg.GetReferenceData(securities, fields, matrixResult, overrideFields, overrideValues);
  PrintMatrix(matrixResult);
  //
  std::cout << "CASE 3 : create historical data request for previous securities >" << std::endl;
  // use previous securities, re-create fields, request data and print result
  fields.clear();
  fields.push_back("PX_LAST");
  MJ::Cube cubeResult;
  // bloomberg API date format : 'YYYYMMDD'
  std::string startDate = "20170301";
  std::string endDate = "20170322";
  // request actual daily frequency, but only for weekdays and prices converted to JPY
  bloomberg.GetHistoricalData(securities, fields, cubeResult, startDate, endDate, 
   MJ::PRICING_OPTION_PRICE, MJ::DAILY, MJ::ACTUAL, MJ::NON_TRADING_WEEKDAYS, MJ::PREVIOUS_VALUE, "JPY");
  PrintCube(cubeResult);
  //
  std::cout << "CASE 4 : create historical data request for list of securities >" << std::endl;
  // create list of rate securities, use previous field, request data and print result
  CreateRateSecurityList(securities);
  cubeResult.clear();
  bloomberg.GetHistoricalData(securities, fields, cubeResult, startDate, endDate);
  PrintCube(cubeResult);
  //
  std::cout << "CASE 5 : create intraday bar data request for single security >" << std::endl;
  // create one security, request data and print result
  std::string security = "GOOGL UW Equity";
  MJ::Vector barFields{ "time", "open" , "high", "low", "close" };
  std::string eventType = "TRADE";
  int intervalInMinutes = 1;
  MJ::Datetime start;
  start.setDate(2017, 3, 22);
  start.setTime(15, 0, 0);
  MJ::Datetime end;
  end.setDate(2017, 3, 22);
  end.setTime(15, 10, 0);
  matrixResult.clear();
  bloomberg.GetIntradayBarData(security, barFields, matrixResult, eventType, intervalInMinutes, start, end);
  PrintMatrix(matrixResult);
  //
  std::cout << "CASE 6 : create intraday tick data request for single security >" << std::endl;
  // use previous security, request data and print result
  MJ::Vector tickFields{ "time", "value", "size", "type" };
  MJ::Vector eventTypes{ "TRADE" };
  matrixResult.clear();
  bloomberg.GetIntradayTickData(security, tickFields, matrixResult, eventTypes, start, end, true, true);
  PrintMatrix(matrixResult);
  //
  // stop bloomberg session
  bloomberg.Stop();
 }
 catch (std::exception& e)
 {
  std::cout << e.what() << std::endl;
 }
 return 0;
}
//
void PrintMatrix(MJ::Matrix& result)
{
 for (unsigned int i = 0; i < result.size(); ++i)
 {
  for (unsigned int j = 0; j < result[i].size(); ++j)
  {
   std::cout << result[i][j] << std::endl;
  }
 }
 std::cout << std::endl;
}
//
void PrintCube(MJ::Cube& result)
{
 for (unsigned int i = 0; i < result.size(); ++i)
 {
  for (unsigned int j = 0; j < result[i].size(); ++j)
  {
   for (unsigned int k = 0; k < result[i][j].size(); k++)
   {
    std::cout << result[i][j][k] << std::endl;
   }
  }
 }
 std::cout << std::endl;
}
//
void CreateRateSecurityList(std::vector<std::string>& securities)
{
 // EUR Euribor vs. 6M swap curve
 securities.clear();
 securities.push_back("EONIA Index"); securities.push_back("EUR001W Index");
 securities.push_back("EUR001M Index"); securities.push_back("EUR002M Index");
 securities.push_back("EUR003M Index"); securities.push_back("EUR006M Index");
 securities.push_back("EUFR0F1 Curncy"); securities.push_back("EUFR0G1A Curncy");
 securities.push_back("EUFR0H1B Curncy"); securities.push_back("EUFR0I1C Curncy");
 securities.push_back("EUFR0J1D Curncy"); securities.push_back("EUFR0K1E Curncy");
 securities.push_back("EUFR011F Curncy"); securities.push_back("EUSA2 Curncy");
 securities.push_back("EUSA3 Curncy"); securities.push_back("EUSA4 Curncy");
 securities.push_back("EUSA5 Curncy"); securities.push_back("EUSA6 Curncy");
 securities.push_back("EUSA7 Curncy"); securities.push_back("EUSA8 Curncy");
 securities.push_back("EUSA9 Curncy"); securities.push_back("EUSA10 Curncy");
 securities.push_back("EUSA11 Curncy"); securities.push_back("EUSA12 Curncy");
 securities.push_back("EUSA15 Curncy"); securities.push_back("EUSA20 Curncy");
 securities.push_back("EUSA25 Curncy"); securities.push_back("EUSA30 Curncy");
 securities.push_back("EUSA35 Curncy"); securities.push_back("EUSA40 Curncy");
 securities.push_back("EUSA45 Curncy"); securities.push_back("EUSA50 Curncy");
}
//
void CreateEquitySecurityList(std::vector<std::string>& securities)
{
 // some equities
 securities.clear();
 securities.push_back("GOOGL UW Equity");
 securities.push_back("YHOO UW Equity");
 securities.push_back("FB UW Equity");
 securities.push_back("EBAY UW Equity");
}
//

Finally, thanks a lot for reading my blog.
-Mike

Sunday, January 8, 2017

C++11 : String Replacer for Directory Files using PPL

Sequential algorithms can be easily replaced with the corresponding Parallel Algorithms. However, these algorithms can only be applied, if the loop operations are completely independent and they are not using any shared variables. The second condition can be relaxed by using concurrent data structures. This scheme is perfect for cases, in which we need to process a chain of independent items, such as modifying files in a directory.

The devil is in the details


Let us think a real-life case, in which we have a lot of CSV files in one specific directory. For example, we may have transaction files (containing cash flow tables, date schedule tables, parameters) produced by some front office system batch on a daily basis. These files are then used as feed for a third-party analytical software for some intensive calculations. The only problem is, that some of the information in feed files is not in recognizable form for third-party software. In a nutshell, feed files may contain some specific strings, which should be replaced with another string.


War stories


The next question is how to do this operation quickly and on a safe manner? Needless to say, there are pretty much as many approaches as there are people performing this heartbreaking operation. Personally, I have used

  • find-and-replace strategy with notepad++, until the amount of files to be processed and parameters to be replaced grew a bit too large.
  • custom batch script, until I realized that the script was sometimes not working as expected, by either leaving some cases out from replacement operations or resulting incorrect replacements. The scariest part was, that the script was actually working well most of the time.
  • custom Powershell script (created by someone else), which was using hard-coded configurations for all replacement strings and all strings which should be replaced. All configurations (hosted in source code file) needed to be set in a specific order. This was actually working well up to a point, where those hard-coded configurations should have been changed to correspond changes made in other systems. Moreover, execution times were a bit too high.

Finally, I decided to create my own program for handling this task.


United we fall, divided we stand


After learning a bit of parallel algorithms, I soon realized that this kind of a scheme (due to task independency) would be suitable candidate for implementing parallelism, in order to improve program execution speed. Each source file (containing transaction information) has to be processed separately and all key-value pairs (string-to-be-replaced and corresponding replacement string) can be stored in concurrent-safe concurrent unordered map as string pairs.


FileHandler.h


This header file is consisting free functions for all required file handling operations.

#pragma once
//
#include <filesystem>
#include <fstream>
#include <string>
#include <map>
#include <vector>
#include <algorithm>
#include <sstream>
//
namespace MikeJuniperhillFileSystemUtility
{
 std::string ReadFileContentToString(const std::string &filePathName)
 {
  std::string data;
  std::ifstream in(filePathName.c_str());
  std::getline(in, data, std::string::traits_type::to_char_type(
   std::string::traits_type::eof()));
  return data;
 }
 //
 void WriteStringContentToFile(const std::string& content, const std::string &filePathName)
 {
  std::ofstream out(filePathName.c_str());
  out << content;
  out.close();
 }
 //
 std::vector<std::string> GetDirectoryFiles(const std::string& folderPathName)
 {
  std::vector<std::string> directoryFiles;
  using itr = std::tr2::sys::directory_iterator;
  for (itr it = itr(folderPathName); it != itr(); ++it)
  {
   directoryFiles.push_back(static_cast<std::string>((*it).path()));
  }
  return directoryFiles;
 }
 //
 // replace all substrings found (stringToBeReplaced) with another 
 // string (replacementString) in a given string (string)
 std::string ReplaceAllSubstringsInString(const std::string& string, 
  const std::string& stringToBeReplaced, const std::string& replacementString)
 {
  std::string result;
  size_t find_len = stringToBeReplaced.size();
  size_t pos, from = 0;
  while (std::string::npos != (pos = string.find(stringToBeReplaced, from))) 
  {
   result.append(string, from, pos - from);
   result.append(replacementString);
   from = pos + find_len;
  }
  result.append(string, from, std::string::npos);
  return result;
 }
 // read key-value pairs into std::map
 // (key = string to be found and replaced, value = replacement string)
 std::map<std::string, std::string> ReadKeysFromFileToMap(const std::string& filePathName, char separator)
 {
  std::map<std::string, std::string> keys;
  std::ifstream file(filePathName);
  std::string stream;
  std::string key;
  std::string value;
  //
  while (std::getline(file, stream))
  {
   std::istringstream s(stream);
   std::getline(s, key, separator);
   std::getline(s, value, separator);
   keys.insert(std::pair<std::string, std::string>(key, value));
  }
  return keys;
 }
}
//


Tester.cpp


First, there has to be a directory containing all source files, which are going to be processed by file processing program. Also, there has to be a specific file, which contains key-value pairs (key = string to be found and replaced, value = replacement string) for all desired string replacement cases. Main program is creating file processors into vector container (there will be as many processors as there are source files to be processed). Then, main program is executing all file processors. A single file processor is first reading source file information into string, looping through all key-value pairs and checking whether any occurences are to be found, performing string replacements and finally, writing modified information back to original source file.

#include <ppl.h>
#include <concurrent_vector.h>
#include <concurrent_unordered_map.h>
#include <functional>
#include <memory>
#include <chrono>
#include <iostream>
#include "FileHandler.h"
//
namespace MJ = MikeJuniperhillFileSystemUtility;
//
using Files = std::vector<std::string>;
using Pair = std::pair<std::string, std::string>;
using Keys = std::map<std::string, std::string>;
using GlobalKeys = concurrency::concurrent_unordered_map<std::string, std::string>;
using ProcessorMethod = std::function<void(void)>;
using FileProcessor = std::shared_ptr<ProcessorMethod>;
//
// create file processor
FileProcessor Factory(const GlobalKeys& keys, const std::string& filePathName)
{
 // deferred execution
 ProcessorMethod processor = [=, &keys](void) -> void
 {
  std::string fileUnderProcessing = MJ::ReadFileContentToString(filePathName);
  for each (Pair pair in keys)
  {
   // extract key (string to be found and replaced) and value (replacement string)
   Pair keyValuePair = pair;
   std::string key = std::get<0>(keyValuePair);
   std::string value = std::get<1>(keyValuePair);
   //
   // check if key exists in a given string (containing all information in current file)
   size_t found = fileUnderProcessing.find(key);
   if (found != std::string::npos)
   {
    // if exists, replace key with value
    fileUnderProcessing = MJ::ReplaceAllSubstringsInString(fileUnderProcessing, key, value);
   }
  }
  // finally write change into original file
  MJ::WriteStringContentToFile(fileUnderProcessing, filePathName);
 };
 FileProcessor fileProcessor = FileProcessor(new ProcessorMethod(processor));
 return fileProcessor;
}
//
int main(int argc, char* argv[])
{
 // get required path strings from command line
 std::string keysFilePathName = argv[1];
 std::string fileDirectoryPathName = argv[2];
 //
 // create list of files (files to be modified) and list of key-value pairs 
 // (key = string to be found and replaced, value = replacement string)
 Files files = MJ::GetDirectoryFiles(fileDirectoryPathName);
 char separator = ',';
 Keys keys = MJ::ReadKeysFromFileToMap(keysFilePathName, separator);
 //
 // insert keys into concurrency-safe unordered map
 GlobalKeys Keys;
 std::for_each(keys.begin(), keys.end(),
  [&](Pair pair) -> void { Keys.insert(pair); });
 //
 // create file processors :
 // each processor will be processing exactly one file (given filePathName) 
 // by looping through all keys-value pairs (in GlobalKeys) and searching for 
 // all occurrences of substring (key) and replacing this with another string (value)
 std::vector<FileProcessor> processors;
 std::for_each(files.begin(), files.end(),
  [&](std::string filePathName) -> void { processors.push_back(Factory(Keys, filePathName)); });
 //
 // execute file processor method for all file processors
 auto start = std::chrono::steady_clock::now();
 concurrency::parallel_for_each(processors.begin(), processors.end(),
  [](FileProcessor fp) -> void { (*fp)(); });
 auto end = std::chrono::steady_clock::now();
 auto timeElapsed = std::chrono::duration_cast<std::chrono::seconds>(end - start);
 std::cout << "processed in " << timeElapsed.count() << " seconds" << std::endl;
 //
 return 0;
}
//


User settings


In this particular case, a picture is worth more than thousand words.

A : arguments for executable to be used with command prompt
B : directory for key-value pairs file
C : the content of key-value pairs file
D : directory for source files.























Finally, thanks a lot again for reading this blog.-Mike

Saturday, January 7, 2017

C++11 : Multi-Threaded PathGenerator using PPL

 

FINAL DESTINATION


The circle has been closed. This post is kind of an aggregation, based on the last four posts published on generating random numbers. Initially, I started just with a simple template class for distributional random generator, then continued with a path generator using any one-factor stochastic process and finally, ended up with a multi-threaded distributional random generation scheme using Parallel algorithms. This final post (hopefully) is opening up my larger goal : to be able to generate asset price paths for any one-factor process, using multi-threading scheme.


GROUNDHOG DAY


Again, I have tested the both sequential (for_each) and parallel (parallel_for_each) schemes by using four generators, 10000 paths and 250 time steps for a single run. After this, I repeated this run for 250 times. Conclusion :

  • The average running time for this sample was 17116 milliseconds (sequential) and 8209 milliseconds (parallel). So, parallel scheme will be completed about two times faster. 
  • The actual CPU usage profiles during the simulation processes are behaving exactly as reported in this post. 
  • I also analyzed processed asset price paths for parallel scheme, just to be absolutely sure there are no path duplicates (random number generation would not be independent). Based on my analysis made in Excel, all processed asset price paths are different and there are no duplicates. 

Presented scheme for path generator is again fulfilling my two initial requirements : faster creation of asset price paths following any one-factor process and independency of random generators.


RandomGenerator.h


The basic functionality of this template class has not been changed, except for construction part : second constructor is allowing a client to give any probability distribution for uniform generator from outside of this class. Even there is actually no need for having this kind of optionality in real-life (most of the stuff in Monte Carlo is randomized by using standard normal distribution), I decided to implement this optionality for the sake of completeness.

#pragma once
#include <algorithm>
#include <functional>
#include <vector>
#include <random>
#include <memory>
//
namespace MikeJuniperhillRandomGeneratorTemplate
{
 template <typename Generator = std::mt19937, typename Distribution = std::normal_distribution<double>>
 /// <summary>
 /// Template class for creating random number paths using mt19937 as default uniform 
 /// random generator and Standard Normal as default probability distribution.
 /// </summary> 
 class RandomGenerator
 {
 public:
  /// <summary>
  /// Constructor with explicit seed value
  /// </summary>
  RandomGenerator(unsigned long seed)
  {
   // construct function for processing distributional random number
   randomGenerator = [this](double x)-> double
   {
    x = distribution(uniformGenerator);
    return x;
   };
   // seed generator once
   uniformGenerator.seed(seed);
  }
  /// <summary> 
  /// Constructor for explicit seed value and client-given probability distribution.
  /// </summary>  
  RandomGenerator(unsigned long seed, const Distribution& distribution)
   // constructor delegation
   : RandomGenerator(seed)
  {
   // assign client-given probability distribution
   this->distribution = distribution;
  }
  /// <summary>
  /// Fill a given vector reference with distributional random numbers
  /// </summary> 
  void operator()(std::vector<double>& v) const
  {
   std::transform(v.begin(), v.end(), v.begin(), randomGenerator);
  }
 private:
  std::function<double(double)> randomGenerator;
  Generator uniformGenerator;
  Distribution distribution;
 };
}
//


OneFactorProcess.h


I decided to tag drift and diffusion functions with const declaration, since these functions should not modify the internal state of class data members.

#pragma once
//
namespace MikeJuniperhillOneFactorProcessLibrary
{
 /// <summary>
 /// Abstract base class for all one-factor processes for customizing 
 /// drift and diffusion functions for different types of processes.
 /// </summary>
 class OneFactorProcess
 {
 public:
  virtual double drift(double x, double t) const = 0;
  virtual double diffusion(double x, double t) const = 0;
 };
 //
 /// <summary>
 /// Implementation for Vasicek short-rate model.
 /// </summary>
 class Vasicek : public OneFactorProcess
 {
 public:
  Vasicek(double meanReversion, double longTermRate, double rateVolatility)
   : meanReversion(meanReversion), longTermRate(longTermRate), rateVolatility(rateVolatility) { }
  //
  double drift(double x, double t) const override { return meanReversion * (longTermRate - x); }
  double diffusion(double x, double t) const override { return rateVolatility; }
 private:
  double meanReversion;
  double longTermRate;
  double rateVolatility;
 };
}
//


PathGenerator.h


As in the case with RandomGenerator, the basic functionality of this template class has not been changed either, except for construction part : second constructor is allowing a client to give any probability distribution to be delivered for distributional random generator.

#pragma once
//
#include "RandomGenerator.h"
#include "OneFactorProcess.h"
namespace MJRandom = MikeJuniperhillRandomGeneratorTemplate;
namespace MJProcess = MikeJuniperhillOneFactorProcessLibrary;
//
namespace MikeJuniperhillPathGenerator
{
 template <typename Generator = std::mt19937, typename Distribution = std::normal_distribution<double>>
 class PathGenerator
 {
 public:
  /// <summary>
  /// Constructor for PathGenerator template class.
  /// </summary>
  PathGenerator(double spot, double maturity, unsigned long seed,
   const std::shared_ptr<MJProcess::OneFactorProcess>& process)
   : spot(spot), maturity(maturity), process(process)
  {
   // create random generator
   generator = std::unique_ptr<MJRandom::RandomGenerator<Generator, Distribution>>
    (new MJRandom::RandomGenerator<Generator, Distribution>(seed));
  }
  /// <summary>
  /// Constructor for PathGenerator template class, with a client-given probability distribution
  /// </summary>
  PathGenerator(double spot, double maturity, unsigned long seed,
   const std::shared_ptr<MJProcess::OneFactorProcess>& process, const Distribution& distribution)
   : spot(spot), maturity(maturity), process(process)
  {
   // create random generator with client-given probability distribution
   generator = std::unique_ptr<MJRandom::RandomGenerator<Generator, Distribution>>
    (new MJRandom::RandomGenerator<Generator, Distribution>(seed, distribution));
  }
  /// <summary> 
  /// Fill a given vector reference with asset prices, following a given stochastic process.
  /// </summary>  
  void operator()(std::vector<double>& v) const
  {
   // transform initialized vector into a path containing random numbers
   (*generator)(v);
   //
   double dt = maturity / (v.size() - 1);
   double dw = 0.0;
   double s = spot;
   double t = 0.0;
   v[0] = s; // 1st path element will always be the current spot price
   //
   // transform distributional random number vector into a path containing 
   // asset prices from a given stochastic one-factor process
   for (auto it = v.begin() + 1; it != v.end(); ++it)
   {
    t += dt;
    dw = (*it) * std::sqrt(dt);
    (*it) = s + (*process).drift(s, t) * dt + (*process).diffusion(s, t) * dw;
    s = (*it);
   }
  }
 private:
  double spot;
  double maturity;
  std::shared_ptr<MJProcess::OneFactorProcess> process;
  std::unique_ptr<MJRandom::RandomGenerator<Generator, Distribution>> generator;
 };
}
//


Tester.cpp


Tester program is closely tracking the program presented in previous post. For the sake of additional clarity, I have used new type definitions in order to improve code readability and get rid of some lengthy variable names. The program is again using simple factory method for creating PathGenerator (function wrapped in shared pointer). In this program, OneFactorProcess implementation is created and delivered for factory method for processing. Finally, there is a method for printing processed paths to console for testing purposes.

#include <iostream>
#include <chrono>
#include <ppl.h>
#include <concurrent_vector.h>
#include "PathGenerator.h"
namespace MJGenerator = MikeJuniperhillPathGenerator;
//
// type definitions
using Path = std::vector<double>;
using Paths = concurrency::concurrent_vector<Path>;
using Process = std::shared_ptr<MJProcess::OneFactorProcess>;
using Processor = std::function<void(void)>;
using PathGenerator = std::shared_ptr<Processor>;
//
// thread-safe container for storing asset price paths, processed by path generators
Paths paths;
//
// printer for generated asset price paths
void Printer()
{
 std::for_each(paths.begin(), paths.end(),
  [](Path path) -> void
 {
  std::for_each(path.begin(), path.end(),
   [](double s) -> void
  {
   std::cout << s << ",";
  });
  std::cout << std::endl;
 });
}
//
// factory method :
// return path-generating function as function wrapper
// input arguments are common for all types of generators
PathGenerator Factory(double spot, double maturity, int nPaths, 
 int nSteps, unsigned long seed, const Process& process, Paths& paths)
{
 // create function for processing one-factor paths
 auto generator = [=, &process, &paths]() -> void
 {
  MJGenerator::PathGenerator<> oneFactorProcess(spot, maturity, seed, process);
  Path path(nSteps);
  for (auto i = 0; i != nPaths; ++i)
  {
   oneFactorProcess(path);
   paths.push_back(path);
  }
 };
 // return generator function as function wrapper
 return PathGenerator(new Processor(generator));
}
//
int main()
{
 // create vasicek process
 double longTermRate = 0.05;
 double meanReversion = 0.2;
 double rateVolatility = 0.0075; 
 Process vasicek = Process(new MJProcess::Vasicek(meanReversion, longTermRate, rateVolatility));
 //
 // define parameters and seed values for path generators
 int nGenerators = 4;
 int nPaths = 100;
 int nSteps = (250 + 1);
 std::vector<unsigned long> seed = { 10322854, 65947, 387528, 772399573 };
 //
 // use factory method for creating path generators
 double spot = 0.0095;
 double maturity = 3.0;
 std::vector<PathGenerator> generators;
 for (auto i = 0; i < nGenerators; i++) generators.push_back(
  Factory(spot, maturity, nPaths, nSteps, seed[i], vasicek, paths));
 //
 // parallel processing
 auto start = std::chrono::steady_clock::now();
 concurrency::parallel_for_each(generators.begin(), generators.end(),
  [](PathGenerator pg) -> void { (*pg)(); });
 auto end = std::chrono::steady_clock::now();
 auto timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
 std::cout << timeElapsed.count() << std::endl;
 //
 // print paths
 Printer();
 return 0;
}
//


Finally, thanks again for reading this blog.
-Mike