Stream Live Forex OHLC Data via Python WebSocket

Rahul Khanna
Nerd For Tech
Published in
4 min readApr 29, 2024

--

In our previous tutorial, we discussed how to stream forex data in 8 lines of code. As a natural progression, we’ll now show the subsequent stage of processing this data.

I would advise looking at that stream Forex OHLC data tutorial before progressing.

We’ve often received requests for developers and business analysts to provide minute OHLC data via WebSocket. Consequently, we believe it’s beneficial to demonstrate how to calculate this dynamically. This initiative not only fulfills users’ immediate needs but also serves as valuable content, particularly for beginners to intermediate developers who are new to the forex and market data domain.

Now having discussed the need, let’s see what is needed as a starting point to calculate your Forex OHLC data.

Prerequisites: Before delving into the tutorial, ensure you have:

  1. Python installed
  2. Basic understanding of Python
  3. Forex WebSocket API Key

For the 3rd requirement, you can sign up for a trial to access forex data for 14 days. Obtain your API key from your dashboard. Comprehensive documentation with example codes is available to assist you.

Let’s Code

The code leverages the TraderMade module to stream real-time forex data. Its primary objective is to process this streaming data, calculating and storing OHLC prices for different currency pairs at specified intervals. Processed data is printed to the console and saved to a text file.

Here’s a breakdown of each section:

Step 1: Installing the TraderMade package

This line installs the TraderMade package version 0.8.0. It grants access to TraderMade’s Forex and Cryptocurrency data via their API.

pip install tradermade==0.8.0

Step 2: Importing necessary modules

We will use 4 libraries and Tradermade’s stream function will do most of the heavy lifting including connecting to the WebSocket and requesting the data.

from tradermade import stream
import json
from datetime import datetime
from collections import defaultdict

Step 3: Initializing variables

We will now declare the objects and variables to aggregate our OHLC bar in memory. There is no need to know the details of each object and variable as it will become clear shortly.

ohlc_data = defaultdict(lambda: defaultdict(lambda: {'open': 0, 'high': 0, 'low': 0, 'close': 0}))
previous_interval = None
count = 0
interval = 2
format = '%Y-%m-%d %H:%M'
output_file = 'ohlc_output.txt'

Step 4: Defining utility functions

The round_timestamp function is defined to round the given timestamp to the nearest minute based on the specified interval. This is done so our OHLC_data has one timestamp per period as key and another dictionary as value.

It is generally not advised to do longer timeframes beyond a few minutes without a system to deal with reconnection. A drop in connection will create issues with the aggregates while the connection is down.

def round_timestamp(timestamp, interval_minutes):
rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes
rounded_time = timestamp.replace(second=0, microsecond=0, minute=rounded_minutes)
return rounded_time

The next code is simple to understand as it's a simple command to write to a file.

def save_to_file(data):
with open(output_file, 'a') as f:
f.write(data + '\n')

The process_data function takes in data and then parses it into the variables we defined in step 3. Also, it declares local variables such as currency and mid and passes it to update_ohlc_data and process interval function. Finally, it resets the previous_interval to the current_interval once the data is stored in the file to create a new OHLC. This way we only use a small amount of memory needed to calculate or OHLC Forex.

def process_data(data):
global previous_interval, count
if data != "Connected":
try:
data = json.loads(data)
timestamp = datetime.fromtimestamp(int(data['ts']) / 1000)
current_interval = round_timestamp(timestamp, interval).strftime(format)
currency = data['symbol']
mid = round(float((data['bid']) + float(data['ask'])) / 2, 5)
update_ohlc_data(current_interval, currency, mid)
process_interval(current_interval)
if count == 0:
previous_interval = current_interval
count = 1
except Exception as e:
print(f"Error processing data: {e}")
else:
print("Connected to WebSocket")

The update_ohlc_data function takes in three variables and updates the global ohlc_data object as new data comes in. The global ohlc_data object aggregates OHLC data for the respective timestamp and currency.

def update_ohlc_data(current_interval, currency, mid):
if current_interval not in ohlc_data:
ohlc_data[current_interval] = defaultdict(lambda: {'open': mid, 'high': mid, 'low': mid, 'close': mid})
elif currency not in ohlc_data[current_interval]:
ohlc_data[current_interval][currency] = {'open': mid, 'high': mid, 'low': mid, 'close': mid}
else:
ohlc_data[current_interval][currency]['high'] = max(ohlc_data[current_interval][currency]['high'], mid)
ohlc_data[current_interval][currency]['low'] = min(ohlc_data[current_interval][currency]['low'], mid)
ohlc_data[current_interval][currency]['close'] = mid

Once the time moves the process_interval function writes the previous time-period OHLC to the file and sets the count to 0. It checks this by the length of the ohlc_data object if it has more than one timestamp the previous timestamp item is deleted.

def process_interval(current_interval):
global previous_interval, count
if len(ohlc_data) > 1:
output = f"{previous_interval:<10} {currency:<10} {ohlc['open']:<10.5f} {ohlc['high']:<10.5f} {ohlc['low']:<10.5f} {ohlc['close']:<10.5f}"
print(output)
save_to_file(output)
count = 0
del ohlc_data[previous_interval]

Step 5: Setting up and connecting to the WebSocket

Once we define our functions we set the API key and symbols we want to request using the stream module we imported. Finally, we pass our initial function process_data into the stream.stream_data function and start the connection using connect.

stream.set_ws_key("API_KEY")  # Replace with your actual API key
stream.set_symbols("USDJPY,EURGBP")
stream.stream_data(process_data)
stream.connect()

Voila, we now have our OHLC bar getting populated in the output file using little memory.

Please visit our Python SDK GitHub page for the full code.

Final Thoughts Feel free to modify the functions for custom output parsing and timestamp formatting. Additionally, consider handling reconnections and missed data during service downtime, such as saving data in a file for later retrieval.

Please subscribe and clap our work as it goes a long way in helping create more content. Thanks for reading.

--

--