Fetch and ingest intraday stock data

In this step:

  • create a configuration file (optional)
  • fetch stock data
  • ingest the data into TimescaleDB

Create a configuration file

This is an optional step, but it is highly recommended that you do not store your password or other sensitive information directly in your code. Instead, create a configuration file, for example config.py, and include your database connection details and Alpha Vantage API key in it:

  1. # example content of config.py:
  2. DB_USER = 'tsdbadmin'
  3. DB_PASS = 'passwd'
  4. DB_HOST = 'xxxxxxx.xxxxxxx.tsdb.cloud.timescale.com'
  5. DB_PORT = '66666'
  6. DB_NAME = 'tsdb'
  7. APIKEY = 'alpha_vantage_apikey'

Later, whenever you need to reference any of the information from this configuration file, you need to import it:

  1. import config
  2. apikey = config.APIKEY
  3. ...

Collect ticker symbols

In order to fetch intraday stock data, you need to know which ticker symbols you want to analyze. First, let’s collect a list of symbols so that we can fetch their data later. In general, you have a few options to gather a list of ticker symbols dynamically:

  • Scrape it from a public website (example code here)
  • Use an API that has this functionality
  • Download it from an open repository

To make things easier, download this CSV file to get started:

Read symbols from CSV file

After downloading the CSV file into the project folder, create a new Python file called ingest_stock_data.py. Make sure to add this file in the same folder as the symbols.csv file. Add the following code in this file that reads the symbols.csv file into a list:

  1. # ingest_stock_data.py:
  2. import csv
  3. with open('symbols.csv') as f:
  4. reader = csv.reader(f)
  5. symbols = [row[0] for row in reader]
  6. print(symbols)

Run this code:

  1. python ingest_stock_data.py

You should see a list of symbols printed out:

  1. ['AAPL', 'MSFT', 'AMZN', 'GOOG', 'FB']

Now you have a list of ticker symbols that you can use later to make requests to the Alpha Vantage API.

Fetching intraday stock data

About the API

Alpha Vantage API provides 2 year historical intraday stock data in 1, 5, 15, or 30 minute intervals. The API outputs a lot of data in a CSV file (around 2200 rows per symbol per day, for a 1 minute interval), so it slices the dataset into one month buckets. This means that for one request for a single symbol, the most amount of data you can get is one month. The maximum amount of historical intraday data is 24 months. To fetch the maximum amount, you need to slice up your requests by month. For example, year1month1, year1month2, and so on. Keep in mind that each request can only fetch data for one symbol at a time.

Here’s an example API endpoint:

  1. https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol=IBM&interval=1min&slice=year1month1&apikey=your_apikey

Check out the Alpha Vantage API docs for more information.

Create the function

Let’s start by creating a function in the Python script called ingest_stock_data.py that you created in an earlier step. This function fetches data for one symbol and one month. The function takes these two values as parameters:

  • symbol: the ticker symbol you want to fetch data for (for example, “AMZN” for Amazon).
  • month: an integer value between 1-24 indicating which month you want to fetch data from.

Add the following piece of code to the ingest_stock_data.py file:

  1. import config
  2. import pandas as pd
  3. def fetch_stock_data(symbol, month):
  4. """Fetches historical intraday data for one ticker symbol (1-min interval)
  5. Args:
  6. symbol (string): ticker symbol
  7. Returns:
  8. candlestick data (list of tuples)
  9. """
  10. interval = '1min'
  11. # the API requires you to slice up your requests (per month)
  12. # like "year1month1", "year1month2", ..., "year2month1" etc...
  13. slice = "year1month" + str(month) if month <= 12 else "year2month1" + str(month)
  14. apikey = config.APIKEY
  15. # formulate the correct API endpoint with symbol, slice, interval and apikey
  16. CSV_URL = 'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&' \
  17. 'symbol={symbol}&interval={interval}&slice={slice}&apikey={apikey}' \
  18. .format(symbol=symbol, slice=slice, interval=interval,apikey=apikey)
  19. # read CSV file directly into a pandas dataframe
  20. df = pd.read_csv(CSV_URL)
  21. # add a new symbol column to the dataframe
  22. # this is needed as the API doesn't return the symbol value
  23. df['symbol'] = symbol
  24. # convert the time column to datetime object
  25. # this is needed so we can seamlessly insert the data into the database later
  26. df['time'] = pd.to_datetime(df['time'], format='%Y-%m-%d %H:%M:%S')
  27. # rename and reorder columns to match database schema
  28. df = df.rename(columns={'time': 'time',
  29. 'open': 'price_open',
  30. 'high': 'price_high',
  31. 'low': 'price_low'
  32. 'close': 'price_close',
  33. 'volume': 'trading_volume'})
  34. df = df[['time', 'symbol', 'price_open', 'price_close', 'price_low', 'price_high', 'trading_volume']]
  35. # convert the dataframe into a list of tuples ready to be ingested
  36. return [row for row in df.itertuples(index=False, name=None)]

Run this script:

  1. python ingest_stock_data.py

This function downloads data from the Alpha Vantage API and prepares it for database ingestion. Add this piece of code after the function definition to test if it works:

  1. def test_stock_download():
  2. test_stock_data = fetch_stock_data("MSFT", 1)
  3. print(test_stock_data)
  4. test_stock_download()

Run the script:

  1. python ingest_stock_data.py

You should see a huge list of tuples printed out, each containing the timestamp value and the price data (candlestick):

  1. [
  2. (Timestamp('2022-05-23 04:04:00'), 255.8, 255.95, 255.8, 255.8, 771, 'MSFT'),
  3. (Timestamp('2022-05-23 04:01:00'), 255.01, 256.5, 255.01, 256.5, 1235, 'MSFT')
  4. ...
  5. ]

Remove the test_stock_download() so it doesn’t get invoked unnecessarily when you run the script in the future.

Ingest data into TimescaleDB

When you have the fetch_stock_data function working, and you can fetch the candlestick from the API, you can insert it into the database.

To make the ingestion faster, use pgcopy instead of ingesting data row by row. TimescaleDB is packaged as an extension to PostgreSQL, meaning all the PostgreSQL tools you know and love already work with TimescaleDB.

Ingest data fast with pgcopy

Install psycopg2 and pgcopy so you can connect to the database and ingest data.

Install psycopg2

  1. pip install psycopg2-binary

Install pgcopy

  1. pip install pgcopy

Add the following code at the bottom of the ingest_stock_data.py script:

Ingest with pgcopy

  1. from pgcopy import CopyManager
  2. import config, psycopg2
  3. # establish database connection
  4. conn = psycopg2.connect(database=config.DB_NAME,
  5. host=config.DB_HOST,
  6. user=config.DB_USER,
  7. password=config.DB_PASS,
  8. port=config.DB_PORT)
  9. # column names in the database (pgcopy needs it as a parameter)
  10. COLUMNS = ('time', 'symbol', 'price_open', 'price_close', 'price_low', 'price_high', 'trading_volume')
  11. # iterate over the symbols list
  12. for symbol in symbols:
  13. # specify a time range (max 24 months)
  14. time_range = range(1, 2) # (last 1 months)
  15. # iterate over the specified time range
  16. for month in time_range:
  17. # fetch stock data for the given symbol and month
  18. # using the function you created before
  19. stock_data = fetch_stock_data(symbol, month)
  20. # create a copy manager instance
  21. mgr = CopyManager(conn, 'stocks_intraday', COLUMNS)
  22. # insert data and commit transaction
  23. mgr.copy(stock_data)
  24. conn.commit()

This starts ingesting data for each symbol, one month at a time. You can modify the time_range if you want to download more data.

  1. time |symbol|price_open|price_close|price_low|price_high|trading_volume|
  2. -------------------+------+----------+-----------+---------+----------+--------------+
  3. 2022-06-21 22:00:00|AAPL | 135.66| 135.6| 135.55| 135.69| 14871|
  4. 2022-06-21 21:59:00|AAPL | 135.64| 135.64| 135.64| 135.64| 567|
  5. 2022-06-21 21:58:00|AAPL | 135.67| 135.67| 135.67| 135.67| 1611|
  6. 2022-06-21 21:57:00|AAPL | 135.66| 135.7| 135.66| 135.7| 972|
  7. 2022-06-21 21:56:00|AAPL | 135.6401| 135.6401| 135.6401| 135.6401| 441|
  8. 2022-06-21 21:54:00|AAPL | 135.66| 135.66| 135.66| 135.66| 550|
  9. 2022-06-21 21:53:00|AAPL | 135.66| 135.66| 135.66| 135.66| 269|
  10. 2022-06-21 21:52:00|AAPL | 135.67| 135.67| 135.67| 135.67| 2298|
  11. 2022-06-21 21:50:00|AAPL | 135.67| 135.63| 135.63| 135.67| 1086|
  12. 2022-06-21 21:49:00|AAPL | 135.65| 135.65| 135.65| 135.65| 458|
  13. 2022-06-21 21:48:00|AAPL | 135.64| 135.64| 135.64| 135.64| 1006|
  14. 2022-06-21 21:47:00|AAPL | 135.63| 135.63| 135.63| 135.63| 512|
  15. 2022-06-21 21:45:00|AAPL | 135.61| 135.61| 135.61| 135.61| 441|
  16. 2022-06-21 21:44:00|AAPL | 135.62| 135.62| 135.62| 135.62| 526|
tip

Fetching and ingesting intraday data can take a while, so if you want to see results quickly, reduce the number of months, or limit the number of symbols.

This is what the final version of ingest_stock_data.py looks like:

  1. # ingest_stock_data.py:
  2. import csv
  3. import config
  4. import pandas as pd
  5. from pgcopy import CopyManager
  6. import psycopg2
  7. with open('symbols.csv') as f:
  8. reader = csv.reader(f)
  9. symbols = [row[0] for row in reader]
  10. print(symbols)
  11. def fetch_stock_data(symbol, month):
  12. """Fetches historical intraday data for one ticker symbol (1-min interval)
  13. Args:
  14. symbol (string): ticker symbol
  15. Returns:
  16. candlestick data (list of tuples)
  17. """
  18. interval = '1min'
  19. # the API requires you to slice up your requests (per month)
  20. # like "year1month1", "year1month2", ..., "year2month1" etc...
  21. slice = "year1month" + str(month) if month <= 12 else "year2month1" + str(month)
  22. apikey = config.APIKEY
  23. # formulate the correct API endpoint with symbol, slice, interval and apikey
  24. CSV_URL = 'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&' \
  25. 'symbol={symbol}&interval={interval}&slice={slice}&apikey={apikey}' \
  26. .format(symbol=symbol, slice=slice, interval=interval,apikey=apikey)
  27. # read CSV file directly into a pandas dataframe
  28. df = pd.read_csv(CSV_URL)
  29. # add a new symbol column to the dataframe
  30. # this is needed as the API doesn't return the symbol value
  31. df['symbol'] = symbol
  32. # convert the time column to datetime object
  33. # this is needed so we can seamlessly insert the data into the database later
  34. df['time'] = pd.to_datetime(df['time'], format='%Y-%m-%d %H:%M:%S')
  35. # rename and reorder columns to match database schema
  36. df = df.rename(columns={'open': 'price_open',
  37. 'high': 'price_high',
  38. 'low': 'price_low',
  39. 'close': 'price_close',
  40. 'volume': 'trading_volume'}
  41. )
  42. df = df[['time', 'symbol', 'price_open', 'price_close', 'price_low', 'price_high', 'trading_volume']]
  43. # convert the dataframe into a list of tuples ready to be ingested
  44. return [row for row in df.itertuples(index=False, name=None)]
  45. # establish database connection
  46. conn = psycopg2.connect(database=config.DB_NAME,
  47. host=config.DB_HOST,
  48. user=config.DB_USER,
  49. password=config.DB_PASS,
  50. port=config.DB_PORT)
  51. # column names in the database (pgcopy needs it as a parameter)
  52. COLUMNS = ('time', 'symbol', 'price_open', 'price_close', 'price_low', 'price_high', 'trading_volume')
  53. # iterate over the symbols list
  54. for symbol in symbols:
  55. # specify a time range (max 24 months)
  56. time_range = range(1, 2) # (last 1 months)
  57. # iterate over the specified time range
  58. for month in time_range:
  59. # fetch stock data for the given symbol and month
  60. # using the function you created before
  61. stock_data = fetch_stock_data(symbol, month)
  62. print(stock_data)
  63. # create a copy manager instance
  64. mgr = CopyManager(conn, 'stocks_intraday', COLUMNS)
  65. # insert data and commit transaction
  66. mgr.copy(stock_data)
  67. conn.commit()