Capturing Tick Data via C#, Interactive Brokers, and MySQL

March 3, 2012
By

(This article was first published on Adventures in Statistical Computing, and kindly contributed to R-bloggers)

Interactive Brokers is a discount brokerage that provides a good API for programatically accessing their platform.  The purpose of this post is to create an application that will capture tick level data and save that data into a database for future use.

I started to use the IBrokers package in R to do this post.  However, as R is NOT easily threaded and the IB API is heavily threaded, well... oil and water.

Instead I went with the C# port of the API from DinosaurTech.  It's good and it's free.

Luckily you do not need an account with Interactive Brokers for this project.  They have a demo environment available on their webpage.  The data is FAKE, but it's good enough to test connectivity.  Simply run the demo, then go to Configure->API->Settings and insure that the "Enable ActiveX and Socket Clients" is checked and the port is set to 7496.

To follow along in this post, you will need
  1. MySQL, MySQL Workbench, and MySQL Connector for .NET (all available here).
  2. VS2010 with C#.  You can download the free Express version here.
  3. The libraries from DinosaurTech available at the link above.
  4. A basic understanding of C# (available here [Pro C# 2010 and the .NET 4 Platform], here [Beginning Visual C# 2010 (Wrox Programmer to Programmer)], or here[Google] ).
To start, install all of the above items.  In MySQL workbench, run the following SQL to create the table to store ticks.
SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0;
SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0;
SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='TRADITIONAL';

DROP SCHEMA IF EXISTS `tick_db` ;
CREATE SCHEMA IF NOT EXISTS `tick_db` DEFAULT CHARACTER SET latin1 COLLATE latin1_swedish_ci ;
USE `tick_db` ;

-- -----------------------------------------------------
-- Table `ticks`
-- -----------------------------------------------------
DROP TABLE IF EXISTS `ticks` ;

CREATE  TABLE IF NOT EXISTS `ticks` (
  `idticks` INT NOT NULL ,
  `symbol` VARCHAR(8) NOT NULL ,
  `date` DATE NOT NULL  ,
  `time` TIME NOT NULL  ,
  `value` FLOAT NOT NULL ,
  `type` VARCHAR(12) NOT NULL ,
  PRIMARY KEY (`idticks`, `date`) )
ENGINE = InnoDB PARTITION BY KEY(date) PARTITIONS 1;

CREATE INDEX `Symbol` ON `ticks` (`symbol` ASC) ;

SET SQL_MODE=@OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;
SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;
You will note that we are partitioning the table by date.  The goal of this is so that as the data grows, day over day, that we can still query quickly.

If you do not not have a user in the database, create one and give it permissions on this table (my user is dom and the password, a super secure dom123).

Before we go on, let's discuss how the IB API works.  Your programming environment connects through their trading platform called TWS (Trader Work Station) as it is running on your computer.  It uses the TWS connections to the IB servers to make your requests.  When you request market tick data for a stock, the data arrives whenever it wants.  There is no synchronous get() function.  You ask for data and the API fires events when that data comes back.  In this, you can have multiple requests running at the same time.  You just have to keep straight what equity data you are processing at any given time.

This threaded nature means that we can process a large number of requests.  It also means that we need to free up the threads that are receiving data as quickly as possible.  I.E.  they shouldn't be writing to the database.  We will create a queue of ticks in a separate thread that will post those to the database as quickly as it can.

Ticks are reported as updates to BID, BID_SIZE, ASK, ASK_SIZE, LAST, LAST_SIZE, VOLUME, HIGH, LOW, and CLOSE.

Here is the plan of action:

  1. Create a new console application in C#.
  2. Create a Tick class for holding the tick and give it a method that returns the SQL INSERT statement to put that tick into our database.
  3. Create a TickQueue class that will process ticks that are inserted into its Queue object as fast as possible.
  4. Create a TickMain class that will request all the market data and insert the tick events into the TickQueue.
  5. Call the TickMain class from a separate thread in the console application and watch the magic happen.
First, our Tick class
public class Tick
{
    public String type = "";
    public decimal value = 0;
    public String date = System.DateTime.Now.ToString("yyyy-MM-dd");
    public String time = System.DateTime.Now.ToString("H:mm:ss.ss");
    public Stringsymbol = "";
    public Int32 index = 0;

    public Tick(Stringtype, decimal value, Stringsymbol, Int32 index)
    {
        this.type = type;
        this.value = value;
        this.symbol = symbol;
        this.index = index;
    }

    public Tick() { }

    public StringtoInsert()
    {
        String output = "insert into ticks (idticks,symbol,date,time,value,type) values (" +
                            index +
                            ",'" + symbol +
                            "', DATE('" + date +
                            "'),TIME('" + time + "')," +
                            value +
                            ",'" + type + "')";

        return output;
    }
}
This class is basic.  We simply hold the information about the tick, and create a method that will create our SQL INSERT statement.

Next is the TickQueue.  This is more involved...

public class TickQueue
{
    // Internal queue of db inserts
    private Queue<Tick> queue = newQueue<Tick>();
       
    // Locking Object.  Necessary as multiple threads will be
    // accessing this object simultanously
    private ObjectqLockObj = new object();

    //Connection object to MySQL
    public MySqlConnectionconn = null;
    //Flag to stop processing
    public bool stop = false;

    //Method to enqueue a write
    public void Add(Tick item)
    {
        //Lock for single access only
        lock(qLockObj)
        {
            queue.Enqueue(item);
        }
    }

    //Method to write items as they are enqueued
    public void Run()
    {
        int n = 0;
        //Loop while the stop flag is false
        while (!stop)
        {
            //Lock and get a count of the object in the queue
            lock (qLockObj)
            {
                n = queue.Count;
            }

            //If there are objects in the queue, then process them
            if (n > 0)
            {
                process();
            }

            //Sleep for .1 seconds before looping again
            System.Threading.Thread.Sleep(100);
        }

        //When the shutdown flag is received, write any
        //values still in the queue and then stop
        Console.WriteLine("Shutting Down TickQueue; " + queue.Count + " items left");
        process();
    }

    //Method to process items in the queue
    private voidprocess()
    {
        List<Tick> inserts = new List<Tick>();
        int i = 0;
        //Loop through the items in the queue and put them in a list
        lock (qLockObj)
        {
            for (i = 0; i < queue.Count; i++)
                inserts.Add(queue.Dequeue());
        }

        Console.WriteLine("Processing " + i + " items into database");

        //call insert for each item.
        foreach (Tick t in inserts)
            insert(t);
    }

    //Method to insert a tick
    private void insert(Tick t)
    {
        using (MySqlCommandcmd = conn.CreateCommand())
        {
            cmd.CommandText = t.toInsert();
            try
            {
                cmd.ExecuteNonQuery();
            }
            catch (Exceptionexp)
            {
                Console.WriteLine("OOPS " + exp.Message);
            }
        }
    }
}
You will notice that I am not using threads in this object.  The TickMain object will have the worker thread that call the Run() method in TickQueue.  Because multiple threads will be accessing the object, I've surrounded all access points to the actual queue with a lock() statement.  That will insure that only 1 thread at a time gets access.

I've tried to comment enough to give an idea of what is happening in there.  If not, let me know and I will expand.  The same goes for TickMain below.

Now the  TickMain class:
public class TickMain
{
    //Private and public accessor for the IBClient object
    private IBClient_client = null;
    public IBClientclient
    {
        get { return _client; }
        set
        {
            _client = value;
            //If the client is connected, then set the queue.stop = false
            if (_client.Connected)
                queue.stop = false;
            else
                queue.stop = true;
        }

    }

    public List<String> stockList = newList<string>();
    public MySqlConnectionconn = null;
    public bool doGet = true;

    private TickQueuequeue = new TickQueue();
    private BackgroundWorkerbg = new BackgroundWorker();
    private Dictionary<int, String> tickId = new Dictionary<int, string>();
    private int tickIndex = 0;
    private objectlockObj = new object();

    //Constructors
    public TickMain()
    {
        initialize();
    }

    public TickMain(IBClientclient, List<String> stockList, MySqlConnection conn)
    {
        this.client = client;
        this.stockList = stockList;
        this.conn = conn;
        initialize();
    }

    //Initialization method
    private voidinitialize()
    {
        //Setup the background worker to run the queue
        bg.DoWork += new DoWorkEventHandler(bg_DoWork);

        //Connect to MySQL if we haven't already
        if (conn.State != System.Data.ConnectionState.Open)
            conn.Open();
        //Don't process the queue if not connected to IB
        if (!client.Connected)
            queue.stop = true;

        //Set the MySQL connection for hte queue
        queue.conn = conn;

        //Get the next value of the queue index
        using (MySqlCommandcmd = conn.CreateCommand())
        {
            cmd.CommandText = "select coalesce(max(idticks),0) from ticks";
            MySqlDataReader Reader;

            Reader = cmd.ExecuteReader();
            Reader.Read();
            tickIndex = Reader.GetInt32(0) + 1;
            Reader.Close();
        }
    }

    //Method for getting market prices
    public void Run()
    {
        if (client.Connected)
        {
            //Set up the event handlers for the ticks
            client.TickPrice += new EventHandler<TickPriceEventArgs>(client_TickPrice);
            client.TickSize += newEventHandler<TickSizeEventArgs>(client_TickSize);
               
            //Initialize a counter for stock symbols
            int i = 1;
               
            //Start the queue
            bg.RunWorkerAsync();

            //Request market data for each stock in the stockList
            foreach (Stringstr in stockList)
            {
                tickId.Add(i, str);
                client.RequestMarketData(i, new Equity(str), null, false, false);
                i++;
            }

            //Hang out until told otherwise
            while (doGet)
            {
                System.Threading.Thread.Sleep(100);
            }

            //Remove event handlers
            Console.WriteLine("Shutting Down TickMain");
            client.TickPrice -= new EventHandler<TickPriceEventArgs>(client_TickPrice);
            client.TickSize -= new EventHandler<TickSizeEventArgs>(client_TickSize);
            queue.stop = true;
        }
    }

    //Event handler for TickSize events
    void client_TickSize(objectsender, TickSizeEventArgs e)
    {
        //Get the symbol from the dictionary
        String symbol = tickId[e.TickerId];
        int i = 0;
           
        //As this is asynchronous, lock and get the current tick index
        lock (lockObj)
        {
            i = tickIndex;
            tickIndex++;
        }

        //Create a tick object and enqueue it
        Tick tick = new Tick(EnumDescConverter.GetEnumDescription(e.TickType),
            e.Size, symbol, i);
        queue.Add(tick);
    }

    //Event Handler for TickPrice events
    void client_TickPrice(objectsender, TickPriceEventArgs e)
    {
        //Get the symbol from the dictionary
        Stringsymbol = tickId[e.TickerId];
        int i = 0;

        //As this is asynchronous, lock and get the current tick index
        lock (lockObj)
        {
            i = tickIndex;
            tickIndex++;
        }

        //Create a tick object and enqueue it
        Tick tick = new Tick(EnumDescConverter.GetEnumDescription(e.TickType),
            e.Price, symbol, i);
        queue.Add(tick);
    }

    //BackgroundWorker delegate to run the queue.
    private voidbg_DoWork(object sender, DoWorkEventArgs e)
    {
        queue.Run();
    }

}
Stock requests are given a unique ID.  The tick events have this ID, not the symbol.  So we keep track of the ID and symbol pairs in a Dictionary.  You will note that we have a BackgroundWorker in here that calls and run the database write queue. The tick event handlers process each tick, assign it a unique index ID (another source of possible thread contention, hence the lock() around the index creation).

Finally the program Class in the console application
class Program
{
    static MySqlConnectionconn =
        new MySqlConnection("server=LOCALHOST;DATABASE=tick_db;USER=dom;PASSWORD=dom123");
    static TickMainmain = null;

    static void Main(string[] args)
    {
        //Open the connections.
        conn.Open();
        IBClient client = newIBClient();
        client.Connect("localhost", 7496, 2);

        //List of stock ticks to get
        List<String> stockList = new List<string>();
        stockList.Add("GOOG");
        stockList.Add("SPY");
        stockList.Add("SH");
        stockList.Add("DIA");

        //Initialize the TickMain object
        main = new TickMain(client, stockList, conn);
        main.doGet = true;

        //Setup a worker to call main.Run() asycronously.
        BackgroundWorker bg = newBackgroundWorker();
        bg.DoWork += new DoWorkEventHandler(bg_DoWork);
        bg.RunWorkerAsync();
           
        //Chill until the user hits enter then stop the TickMain object
        Console.ReadLine();
        main.doGet = false;

        //disconnect
        client.Disconnect();
           
        Console.WriteLine("Hit Enter to Continue");
        Console.ReadLine();
    }

    //Delegate for main.Run()
    static void bg_DoWork(objectsender, DoWorkEventArgs e)
    {
        main.Run();
    }
  
}
The application will run, scrolling updates on how many records are being written to the database until you hit the enter key.  After that, the system shuts down and you are prompted to hit enter once more to exit the application.

The entire VS2010 project and SQL for the table can be found here.

My plan is to set this up to run all next week with live data, pull it into R next weekend, and see what we can see.

To leave a comment for the author, please follow the link and comment on his blog: Adventures in Statistical Computing.

R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more...



If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.