1. Plan Center: The Plan Center is responsible for building trading strategy models.
In the Plan Center, the users can use SVM or LR algorithm to build trading strategy models.
2. R Academy: This service offers an R program development platform for the users. After the creation of the R models, the users can put the strategies on the Trade Center to test them in real time.
VI. Data Access Tier (DAT)
The DAT is used to access all data from the database or outer data sources. All processed data are sent to the DAT that stores the big data persistently with low latency.
Also, the DAT offers an order interface to connect with the external securities dealers for transmission the user orders. On the other hand, the DAT combines several market quotes every ticks into one K-Bar every seconds. The DAT also sends several type of K-Bar from different time granularity.
The DAT has three databases to store different type of data:
1. MongoDB: The MongoDB stores some user relative data, such as the user account data, the trading strategies that created by the users and website dynamic content. Most actions of the online users will be responded quickly because the access time of MongoDB is very low.
2. HBase: The State Center saves the market states data in HBase. By HBase, we can write large data in a very short period of time. If the HFT system support 1500 futures calculation, the DAT needs to store 1,977,000 market states per second (1500 * 1318 market states for every second K-Bar) and 35,586,000,000 market states per day (1500 * 1318 market states for every second K-Bar * 18,000 seconds for every day). That’s big data.
‧
3. HDFS: Before the Plan Center builds model, it reads large market states data from HBase and writes them into HDFS. Then, the Plan Center use the data by the Spark RDD to build strategy models.
In addition to above four tiers, the HFT system includes the unified messaging middleware that is implemented by Kafka. The messaging system of the HFT system is critical because the transmission of messages is high-frequency. This architecture is pretty complicated and delicate. We implement many cloud computing technologies and we will describe them in following sections.
Lightning Calculation for Market States and Low Latency Storage──State Center
State Center is in charge of calculation of real-time quotes and saving the result market states into database. In order to retain a span (at least 500 milliseconds) for following machine learning model execution, it must satisfy the requests of fast calculation that is limited to tens of milliseconds (exclude transmission overhead time).
Originally, we implement State Center by Storm as a single topology in order to calculate market states fast. Figure 5 shows the whole State Center topology initially.
In the topology, the KafkaSpout receives real-time quotes from the external RealtimeDataPublisher service that subscribes for real-time quotes from true market and continually sends the tick prices through the distributed messaging system, Kafka.
The KafkaSpout then passes the prices to following 18 ComputeStateBolt. Each ComputeStateBolt carries different ComputeStateLogic and uses it to calculate the market states that are defined in the specific TA logic. The 18 ComputeStateBolt then send the result market states to specific TA WriteDataBolt. Note that every WriteDataBolt writes corresponding TA data into HBase. For example, The MA
‧ 國
立 政 治 大 學
‧
N a tio na
l C h engchi U ni ve rs it y
(Moving Average, is a technical analysis tool) ComputeStateBolt sends market states to MAWriteDataBolt that exclusively stores MA market states. Outside the topology, all black lines are transmitted by Kafka distributed messaging system. Besides, Netty is the messaging channels inside the Storm topology.
Figure 5: State Center – A Topology
After our improvement for the State Center, we combine all ComputeStateBolt into a SymbolComputeStateBolt. The SymbolComputeStateBolt is responsible for the all TA calculation of 1 or N futures. We call that “State Center – B”. Figure 6 shows the new topology.
‧ 國
立 政 治 大 學
‧
N a tio na
l C h engchi U ni ve rs it y
Figure 6: State Center – B Topology
The performance of the “State Center – B” is better than previous “State Center A”.
We improve and simplify the messages transmission between the KafkaSpout and the SymbolComputeStateBolt, which reduce a lot of data transmission time. The Experiments chapter shows the performance result of the State Center A and B.
Build Model Fast Over Big Data Sets──Plan Center
The investment strategies, or said investment models, are very critical for HFT system. Plan Center takes charge of the creation of investment strategies. By machine learning algorithm, Plan Center uses historical market data to build investment strategies. Also, it provides many customized algorithm parameters for the users. There are two requirements of the created strategies we need to satisfy in HFT system:
1. Fast: We need to create the models before the trend changes because the trends are so transient and easy to miss in high-frequency trading.
2. Big data loading in short time: When Plan Center runs machine learning algorithm to create investment strategies, Plan Center need to load large historical data sets from HBase and analyze them simultaneously in a short time.
‧ 國
立 政 治 大 學
‧
N a tio na
l C h engchi U ni ve rs it y
To solve the fast and large data sets problem, we implement Plan Center by Apache Spark framework. By Spark RDD, Plan Center can load hundreds of gigabytes market states data into memory and analyze them across many nodes in cluster [6] [7].
Plan Center offers several machine learning algorithms for the creation of investment strategies, such as Support Vector Machine (SVM), Logistic Regression (LR) and Classification.
Forecast Market Trends Rapidly and Accurately──Trade Center
After the investment strategies are produced, the users can choose the investment strategies that they want to use on web pages. The Original architecture of Trade Center shows in Figure 7.
Figure 7: The Original Design for Trade Center
The Integration of State Center and Trade Center
The cost of time for pulling data from Kafka queues is extremely short and below tens of milliseconds commonly. But the Netty messaging system inside the Storm has
‧ 國
立 政 治 大 學
‧
N a tio na
l C h engchi U ni ve rs it y
much more rapid transmission speeds than Kafka in usually several milliseconds from one vertex to another. In order to reduce the overhead time of market states transmission, we combine State Center topology and Trade Center topology into one large topology, named HFT system.
The HFT system actually is a large topology that pulls data from Kafka queues and writes data into HBase and MongoDB. Figure 8 shows the architecture of whole HFT topology. By the combination of State Center and Trade Center, the cost time of messages transmission is reduce to several milliseconds. The complete experiments data are described in detail on chapter: Experiments.
Figure 8: The architecture of the HFT topology
Cluster Resources Management
Because of the complexity of the HFT architecture, we use YARN to launch most services and manage all resources on cluster. Also, we monitor the health of each node
‧
services, monitor our services status on web pages and operate the cluster services easy [8].EXPERIMENTS
In the previous section, we introduced the requirements of the high frequency trading and the detail of the n-tier architecture that used on the high-frequency trading. In the HFT system, the State Center computes market states and then the Trade Center uses the market states to forecast the market trends. The whole process must be finished in 1 second because the HFT system forecasts the future trends after 1 second. As a result, the State Center must do all calculation in 500 milliseconds and the Trade Center is the same. Because of the State Center needs to calculate large number of market states, we mainly examine the performance of the State Center. In this section, we present the experimental results to show that the architecture’s performance is good enough for high-frequency trading.
Experimental Environment
For evaluating the architecture’s performance, this research proposed several experiments. Because of the high-frequency and real-time requirement, this system must have the ability that handles considerable requests in a very short time. Specially, on State Center, we need to calculate millions of market states within one second. As a result, this research specifically designs experiments for the State Center. We will compare the results with different numbers of futures and figure out how powerful this architecture can be. To implement the experiments, we prepare 8 computers as the cluster and 6 of them as the supervisors to run the Storm topology. More information of execution environment is shown on table 1.
‧
Table 1: The detailed information of the cluster
To test the extreme efficiency of this architecture and find out the most appropriate configuration for the cluster, we compare the average computing time of all market states for each experiment. We add a bolt into the architecture to collect metric data of the market states computation.
Implementation of the Experiments
In order to implement this analysis, we add a new bolt, named ExpStateReceiverBolt, into our original topology to collect all computing metric data of market states. While all market states of one k-bar arrived, this bolt will sum the accumulated cost time of these k-bars and calculate the average value. The Figure 9 shows the performance result of State Center – A, the Figure 10 shows the performance result of State Center – B and the Figure 11 shows the numbers of market states under N futures. The Table 2 shows the experimental comparison between the State Center – A and the State Center – B.
Host CPU RAM HDD OS Storm
nccu-n01 Intel(R) Core(TM)2 Quad
CPU Q8400 @ 2.66GHz 3.8 GB 459.1 GB Ubuntu 12.04
Storm
Supervisor - - nccu-n02 Intel(R) Core(TM)2 Quad
CPU Q8400 @ 2.66GHz 3.8 GB 459.1 GB Ubuntu 12.04
Storm
Supervisor - - nccu-n03 Intel(R) Core(TM)2 Quad
CPU Q8400 @ 2.66GHz 3.8 GB 459.1 GB Ubuntu 12.04
Storm
Supervisor - - nccu-n04 Intel(R) Core(TM)2 Quad
CPU Q8400 @ 2.66GHz 3.8 GB 459.1 GB Ubuntu 12.04
Storm
Supervisor Follower Broker nccu-n05 Intel(R) Core(TM)2 Quad
CPU Q8400 @ 2.66GHz 3.8 GB 459.1 GB Ubuntu 12.04
Storm
Supervisor Leader Broker nccu-n06 Intel(R) Core(TM)2 Quad
CPU Q8400 @ 2.66GHz 3.8 GB 459.1 GB Ubuntu 12.04
Storm
Supervisor Follower Broker
‧
Figure 9: The average cost time of market states computation
for different number of futures – State Center A
Figure 10: The average cost time of market states computation
for different number of futures – State Center B 97.7
‧
Figure 11: The number of market states are computed for N futures per seconds
Table 2: The detailed experimental results of the State Center – A
and the State Center – B
‧
If the time granularity is one-second, the bolt will receive 1,318 market states for each future per seconds. If the number of futures is 50, the State Center need to calculate 65,900 market states for each seconds.
The ExpStateReceiverBolt will compute the average cost time of the market states computation. The cost time in the HFT system must be under 1,000 milliseconds and had better to be under 500 milliseconds because the HFT system still has to send the orders to exchange center after the computation of market states.
Low Latency for Computation
In Figure 9, we can see the average cost time of the State Center – A for each k-bar’s market states computation. When the number of futures is 10, which means the HFT system processes 20 futures’ quotes at the same time, the average cost time of market states computation is 175.0 milliseconds in State Center – A. While the number of futures is 40, the average cost time is 374.73 milliseconds. There is approximately a linear relationship between the average cost time and the number of futures. If the number of futures is 80, the average cost time of market states computation is 998.89 milliseconds. As a result, with 6 supervisor (in Storm, supervisors are responsible for the logic execution), the HFT system can handle 80 futures’ market states computation and the latency is under 1 seconds (1,000 milliseconds). Theoretically, if we want to support 1,000 futures and don’t delay than 1 seconds, we need 75 supervisors to compute the market states. If we want to decrease the latency to 500 milliseconds, we should have 150 supervisors to support the computation.
On the other hand, in Figure 10, in the State Center – B architecture, if the HFT system processes 20 futures’ quotes at the same time, the average cost time of market states computation is 59.85 milliseconds in State Center – B, which is almost one third
‧
of the State Center – A. When 40 futures, the average cost time is 95.45 milliseconds.
The State Center – B has better performance than the State Center – A. And, The State Center – B can handle more futures in the same number of nodes (supervisors). After much experiments, we decide to adopt the State Center – B solution.
One of the critical parts on low-latency computation of the State Center is the implementation of the Storm framework. The State Center is built on top of the Storm framework and benefit the State Center on streaming quotes processing. Also, with Netty messaging system built in the Storm, the Compute State Bolts emit 237,240 market states per seconds with 514.183 milliseconds latency. The latency of our architecture is pretty low. In the situation of 6 supervisors and 180 futures, the cost time of all computation of market states is 514.2 milliseconds.
CONCLUSION
In this paper, we introduce a distributed, high frequency, low latency and loose coupling architecture based on several distributed computing technologies, such as Storm, Spark, Kafka and YARN. By this architecture, we implement a high-frequency trading (HFT) system, which contains several tiers that each tier is responsible for different function:
1. The presentation tier displays information, receives user’s requests and passes requests to back-end cluster.
2. The front-end switching tier receives all the requests from front-end and passes them to back-end switching tier.
3. The back-end switching tier fetches front-end requests and dispatches them to right services in business logic tier.
‧
4. The most important tier, the business logic tier, needs to calculate market states, build trading strategies and handle user requests.
5. If services need to access database or outer data sources, they will get the data through the data access tier.
By the high-performance messaging middleware, Apache Kafka, the services of the HFT system can send considerable messages to each other.
This architecture gives a general solution for high-frequency distributed computing system in real time. In the business logic tier, the state center and the trade center are very critical because they need to process high-frequency quotes and trading orders in a very short time (under hundreds of milliseconds). By implementing the Apache Storm framework, they have great performance that can handle millions of market states calculation and order submissions within tens of milliseconds to hundreds of milliseconds.
Otherwise, the n-tier architecture of the HFT system is highly scalable. For example, if we want to enhance the front-end server’s performance, we just add some node.js server into front-end tier. If we want to support much more calculation of futures’
market states, we just easily add some machines into Storm cluster in the business logic tier. With the high scalability of the architecture, we can calculate the market states in tens of milliseconds as long as we have enough machines. Also, the performance of the HFT system is excellent based on experiments above. Under normal circumstances, this system can support the trends forecasts of 180 futures with just 6 machines and don’t delay than 1 second. The performance of this HFT system based on the n-tier architecture is good enough to forecast the short trend in high-frequency trading market.
‧ 國
立 政 治 大 學
‧
N a tio na
l C h engchi U ni ve rs it y
When we design the architecture and develop this system, we face two main obstacles. They will be experiences for other people:
1. Difficult to debug and find issues on distributed computing architecture. Maybe a log centralized management sub-system can be added to the system. The log management system can help us to collect all logs from every services and we can view the logs by the system easily.
2. The techniques in the architecture is too complicated to learn, deploy and maintain.
The n-tier distributed architecture is suitable for many use cases, such as high-frequency data processing, considerable volume of data computation and large data set model building. We proposed a two-tiers messaging system to handle the message exchanges of the real-time and non-real-time data processing subsystems at the same time. This paper proved the performance of the real-time streaming data processing and the high-frequency quotes computation in the n-tier architecture. In the future, we will add more factors and keep on testing much more detailed configuration for cluster to improve the performance of the HFT system. And, to accomplish the following goals:
1. Build a distributed system that can support the market states computation of all futures in Taiwan and reduce the latency to several milliseconds to tens of milliseconds.
2. Shorten all messages’ length in whole system by encoding the messages to decrease the bandwidth usage.
NOTES
‧
partially supported by the “The Cloud for the Strategy Trading” project with Chunghwa Telecom.REFERENCES
[1] Toshniwal, A., Taneja, S., Shukla, A., Ramasamy, K., Patel, J. M., Kulkarni, S., ...
& Ryaboy, D. (2014, June). Storm@ twitter. InProceedings of the 2014 ACM SIGMOD international conference on Management of data (pp. 147-156). ACM.
[2] Apache Storm. https://storm.apache.org/
[3] Jones, M. T. (2013). Process real-time big data with Twitter Storm. IBM Technical Library.
[4] Aarsten, A., Brugali, D., & Menga, G. (1996). Patterns for three-tier client/server applications. Proceedings of Pattern Languages of Programs (PLoP’96), 4-6.
[5] Hirschfeld, R. (1996). Three-tier distribution architecture. Pattern Languages of Programs (PloP).
[6] Zaharia, M., Chowdhury, M., Das, T., Dave, A., Ma, J., McCauley, M., ... & Stoica, I. (2012, April). Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. In Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation (pp. 2-2). USENIX Association.
[7] Zaharia, M., Chowdhury, M., Franklin, M. J., Shenker, S., & Stoica, I. (2010, June). Spark: cluster computing with working sets. In Proceedings of the 2nd USENIX conference on Hot topics in cloud computing (pp. 10-10).
[8] Hansen, C. A. (2012). Optimizing Hadoop for the cluster. Institue for Computer Science, University of Troms0, Norway, http://oss. csie. fju. edu. tw/~
‧ 國
立 政 治 大 學
‧
N a tio na
l C h engchi U ni ve rs it y
tzu98/Optimizing% 20Hadoop% 20for% 20the% 20cluster. pdf, Retrieved online October.
[9] Apach Kafka. http://kafka.apache.org/
[10] Manuel, P. D., & AlGhamdi, J. (2003). A data-centric design for n-tier architecture.
Information Sciences, 150(3), 195-206.
[11] Ding, Y. S., Hu, Z. H., & Sun, H. B. (2008). An antibody network inspired evolutionary framework for distributed object computing. Information Sciences,178(24), 4619-4631.
[12] Sumbaly, R., Kreps, J., & Shah, S. (2013, June). The big data ecosystem at linkedin. In Proceedings of the 2013 ACM SIGMOD International Conference on Management of Data (pp. 1125-1134). ACM.
[13] Kreps, J., Narkhede, N., & Rao, J. (2011, June). Kafka: A distributed messaging system for log processing. In Proceedings of 6th International Workshop on Networking Meets Databases (NetDB), Athens, Greece.
[14] Joshi, R. (2007). Data-Oriented Architecture: A Loosely-Coupled Real-Time SOA.
Real-Time Innovations, Inc, CA, Tech. Rep [15] Netty. http://netty.io/index.html
[16] TIBCO. http://www.tibco.com/
[17] Zaharia, M., Chowdhury, M., Franklin, M. J., Shenker, S., & Stoica, I. (2010, June). Spark: cluster computing with working sets. In Proceedings of the 2nd USENIX conference on Hot topics in cloud computing (pp. 10-10).
‧ 國
立 政 治 大 學
‧
N a tio na
l C h engchi U ni ve rs it y
[18] Zaharia, M., Chowdhury, M., Das, T., Dave, A., Ma, J., Mccauley, M., ... & Stoica, I. (2012). Fast and interactive analytics over Hadoop data with Spark.USENIX;
login, 37(4), 45-51.
[19] Zaharia, T. H. T. D. M., Bayen, A., Abbeel, P., & Hunter, T. Large-Scale Online Expectation Maximization with Spark Streaming. eecs. berkeley. edu, 1-5.
[20] Buyya, R., Broberg, J., & Goscinski, A. M. (Eds.). (2010). Cloud computing:
Principles and paradigms (Vol. 87). John Wiley & Sons.
[21] Cloudera. http://www.cloudera.com/content/cloudera/en/home.html
[22] Vavilapalli, V. K., Murthy, A. C., Douglas, C., Agarwal, S., Konar, M., Evans, R., ... & Baldeschwieler, E. (2013, October). Apache hadoop yarn: Yet another resource negotiator. In Proceedings of the 4th annual Symposium on Cloud Computing (p. 5). ACM.
[23] Node.js. https://nodejs.org