![]() |
Backtesting a folder with csv files, problem - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: Python Coding (https://python-forum.io/forum-7.html) +--- Forum: Data Science (https://python-forum.io/forum-44.html) +--- Thread: Backtesting a folder with csv files, problem (/thread-8916.html) |
Backtesting a folder with csv files, problem - fiddelush - Mar-13-2018 Hello, I am doing a school assignment and my goal is to test 60 stocks listed on sp500. I've downloaded 60cvs files from yahoo and stored them in the folder "stocks". The code is not written by me but by our teacher, and since I am a beginner in programming it very hard to start editing in this code. I want to test them all and then get an aggregated p&l. Can I do it in this way or do I have to merge all the cvs files into one and then test it? At the moment the the problem is that I got the error code: "FileNotFoundError: File b'/Users/fiddelush/Desktop/Python/stocks/AAL.csv' does not exist" even though it does exist in that folder. Thanks for your support!! from __future__ import (absolute_import, division, print_function, unicode_literals) import numpy as np import pandas as pd import matplotlib.pyplot as plt datadir = '/Users/fiddelush/Desktop/Python/stocks/' # global constant def read_data(ticker): #datadir = 'stocks/' fn = datadir + ticker + '.csv' data = pd.read_csv(fn) data.index = pd.to_datetime(data.Date, format = '%Y-%m-%d') return data def moving_average(x, n, type='simple'): # compute an n period moving average. # type is 'simple' | 'exponential' x = np.asarray(x) if type == 'simple': weights = np.ones(n) else: weights = np.exp(np.linspace(-1., 0., n)) weights /= weights.sum() a = np.convolve(x, weights, mode='full')[:len(x)] a[:n] = a[n] return a def stochastic_oscillator(s, n): ns = len(s) a = np.zeros(ns, dtype=float) for i in range(n-1, ns): hh = s['High'][i-n+1:i+1].max() ll = s['Low'][i-n+1:i+1].min() a[i] = 100*(s['Close'][i]-ll)/(hh-ll) return a, ns ######################################################################################### # Filter the signals to conform to long-first, and no-open-position before year end ######################################################################################### def filter_signals(Stock_signals): nss = len(Stock_signals) # Assign a value to trade_direction flag = 0 trade_direction = np.zeros(nss, dtype=int) for i in range(nss): if Stock_signals.Signal[i] == 'Buy': trade_direction[i] = 1 else: trade_direction[i] = -1 # Find the first 'Buy' in Stock_signals for i in range(nss): if trade_direction[i] == 1: flag = 1 j = i+1 while j < nss: if trade_direction[j] == 1: trade_direction[j] = 0 else: flag = 0 break j += 1 # Set the first sell signal to 0 for i in range(nss): if trade_direction[i] == -1: trade_direction[i] = 0 else: break # Filter out consecutive sell for i in range(nss): if trade_direction[i] == -1: j = i+1 while j < nss: if trade_direction[j] == 1: break else: trade_direction[j] = 0 j += 1 print(trade_direction) # Carry out the filtering decisions. for i in range(nss): if trade_direction[i] == 0: Stock_signals.loc[Stock_signals.index[i] ,'Signal'] = Stock_signals.Signal[i][0] return Stock_signals ######################################################################################### # Compute the P&L ######################################################################################### def cal_PandL (Stock_signals): Long_First = pd.concat([ pd.DataFrame({"Price": Stock_signals.loc[Stock_signals["Signal"] == 'Buy', "Price"], "Signal": "Buy"}), pd.DataFrame({"Price": Stock_signals.loc[Stock_signals["Signal"] == 'Sell', "Price"], "Signal": "Sell"}), ]) Long_First.sort_index(inplace = True) # Compute the profitability of long trades Stock_long_profits = pd.DataFrame({ "Price": Long_First.loc[(Long_First["Signal"] == "Buy"), "Price"], "Profit": pd.Series(Long_First["Price"] - Long_First["Price"].shift(1)).loc[ Long_First.loc[(Long_First["Signal"].shift(1) == "Buy") ].index ].tolist(), "End Date": Long_First["Price"].loc[ Long_First.loc[(Long_First["Signal"].shift(1) == "Buy") ].index ].index }) return Stock_long_profits ######################################################################################### def trading_signals(ticker, n, m, sdate, edate): Stock = read_data(ticker) Stock = Stock.loc[sdate:edate, :] Stock['%K'], ns = stochastic_oscillator(Stock, n) Stock['%D'] = moving_average(Stock['%K'], m, type='simple') lastdate = str(Stock.tail(1).Date[0]) Stock['Signal'] = np.zeros(len(Stock), dtype = float) c = n + m -1 while Stock.Date[c] < Stock.Date.loc[lastdate]: """ Though the signal is for day c, for the purpose of calculating the P&L later, the signal is ascribed to day c+1 """ if Stock['%K'][c] < 10 and Stock['%K'][c] < Stock['%D'][c] and c+1 < ns: Stock.loc[Stock.Date[c+1] ,'Signal'] = 1 if Stock['%K'][c] > 95 and Stock['%D'][c] > 95 \ and Stock['%K'][c] > Stock['%D'][c] and c+1 < ns: Stock.loc[Stock.Date[c+1] ,'Signal'] = -1 c += 1 # Create a DataFrame with trades, including the price at the trade Stock_signals = pd.concat([ pd.DataFrame({"Price": Stock.loc[Stock["Signal"] == 1, "Close"], "Signal": "Buy"}), pd.DataFrame({"Price": Stock.loc[Stock["Signal"] == -1, "Close"], "Signal": "Sell"}), ]) Stock_signals.sort_index(inplace = True) # Implement the risk management strategy # to close out an open position by the end of period. last_day = Stock.index[-1] last_price = Stock.Close[-1] if Stock_signals.index[-1] < last_day: df = pd.DataFrame(data = {'Price': [last_price], 'Signal':['Sell']}, \ index = [last_day]) Stock_signals = Stock_signals.append(df) # It could be the signal of the last sample day is a buy if Stock_signals.Signal[-1] == 'Buy': Stock_signals.loc[Stock_signals.index[-1] ,'Signal'] = 'Sell' Stock_signals = filter_signals(Stock_signals) return Stock_signals ########################################################################################## sdate, edate = '2011-01-01', '2011-12-31' n, m = 14, 2 ticker = "AAL" Stock_signals = trading_signals(ticker, n, m, sdate, edate) print(Stock_signals) result = cal_PandL (Stock_signals) print('\n') print(result) print('\n') total = result.Profit.sum() print('Total Profit from %s through %s = %0.2f' % (sdate, edate, total)) RE: Backtesting a folder with csv files, problem - buran - Mar-13-2018 I guess you want to add drive letter, e.g. C: to pathotherwise path is relative to folder in which the script is and the script cannot find the file RE: Backtesting a folder with csv files, problem - fiddelush - Mar-13-2018 (Mar-13-2018, 09:54 AM)buran Wrote: I guess you want to add drive letter, e.g. Oh, I missed to say that I am using a macbook, so there is no "c:" RE: Backtesting a folder with csv files, problem - buran - Mar-13-2018 in any case, the problem is with path - it cannot find the file at the path specified RE: Backtesting a folder with csv files, problem - fiddelush - Mar-13-2018 (Mar-13-2018, 10:33 AM)buran Wrote: in any case, the problem is with path - it cannot find the file at the path specified Thanks for the answer. I'll take a close look to it. How about testing multiple csv files at once? Is that possible with this code or is it ment to create a merged file and then test it? BR Fred RE: Backtesting a folder with csv files, problem - buran - Mar-13-2018 you will need to amend it. as you can see the ticker is hardcoded. you will need to iterate over all tickers and aggregeta results eventually you may decide to iterate over all files in given folder (i.e. you don't know the tickers) but then you still need to amend the code but in different way RE: Backtesting a folder with csv files, problem - fiddelush - Mar-13-2018 I created a merged file but it seems like it wasnt that easy.. I find the file now but got this error: .format(self.__class__.__name__)) ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all(). RE: Backtesting a folder with csv files, problem - buran - Mar-13-2018 post your code in python tags as well as full traceback in error tags RE: Backtesting a folder with csv files, problem - fiddelush - Mar-13-2018 (Mar-13-2018, 11:42 AM)buran Wrote: post your code in python tags as well as full traceback in error tags """ Date: 2018-03-06 Author: Christopher Constraint: Must take a long position first """ from __future__ import (absolute_import, division, print_function, unicode_literals) import numpy as np import pandas as pd import matplotlib.pyplot as plt datadir = '/Users/fiddelush/Desktop/Python/etfs/' # global constant def read_data(ticker): #datadir = 'stocks/' fn = datadir + ticker + '.csv' data = pd.read_csv(fn) data.index = pd.to_datetime(data.Date, format = '%Y-%m-%d') return data def moving_average(x, n, type='simple'): # compute an n period moving average. # type is 'simple' | 'exponential' x = np.asarray(x) if type == 'simple': weights = np.ones(n) else: weights = np.exp(np.linspace(-1., 0., n)) weights /= weights.sum() a = np.convolve(x, weights, mode='full')[:len(x)] a[:n] = a[n] return a def stochastic_oscillator(s, n): ns = len(s) a = np.zeros(ns, dtype=float) for i in range(n-1, ns): hh = s['High'][i-n+1:i+1].max() ll = s['Low'][i-n+1:i+1].min() a[i] = 100*(s['Close'][i]-ll)/(hh-ll) return a, ns ######################################################################################### # Filter the signals to conform to long-first, and no-open-position before year end ######################################################################################### def filter_signals(Stock_signals): nss = len(Stock_signals) # Assign a value to trade_direction flag = 0 trade_direction = np.zeros(nss, dtype=int) for i in range(nss): if Stock_signals.Signal[i] == 'Buy': trade_direction[i] = 1 else: trade_direction[i] = -1 # Find the first 'Buy' in Stock_signals for i in range(nss): if trade_direction[i] == 1: flag = 1 j = i+1 while j < nss: if trade_direction[j] == 1: trade_direction[j] = 0 else: flag = 0 break j += 1 # Set the first sell signal to 0 for i in range(nss): if trade_direction[i] == -1: trade_direction[i] = 0 else: break # Filter out consecutive sell for i in range(nss): if trade_direction[i] == -1: j = i+1 while j < nss: if trade_direction[j] == 1: break else: trade_direction[j] = 0 j += 1 print(trade_direction) # Carry out the filtering decisions. for i in range(nss): if trade_direction[i] == 0: Stock_signals.loc[Stock_signals.index[i] ,'Signal'] = Stock_signals.Signal[i][0] return Stock_signals ######################################################################################### # Compute the P&L ######################################################################################### def cal_PandL (Stock_signals): Long_First = pd.concat([ pd.DataFrame({"Price": Stock_signals.loc[Stock_signals["Signal"] == 'Buy', "Price"], "Signal": "Buy"}), pd.DataFrame({"Price": Stock_signals.loc[Stock_signals["Signal"] == 'Sell', "Price"], "Signal": "Sell"}), ]) Long_First.sort_index(inplace = True) # Compute the profitability of long trades Stock_long_profits = pd.DataFrame({ "Price": Long_First.loc[(Long_First["Signal"] == "Buy"), "Price"], "Profit": pd.Series(Long_First["Price"] - Long_First["Price"].shift(1)).loc[ Long_First.loc[(Long_First["Signal"].shift(1) == "Buy") ].index ].tolist(), "End Date": Long_First["Price"].loc[ Long_First.loc[(Long_First["Signal"].shift(1) == "Buy") ].index ].index }) return Stock_long_profits ######################################################################################### def trading_signals(ticker, n, m, sdate, edate): Stock = read_data(ticker) Stock = Stock.loc[sdate:edate, :] Stock['%K'], ns = stochastic_oscillator(Stock, n) Stock['%D'] = moving_average(Stock['%K'], m, type='simple') lastdate = str(Stock.tail(1).Date[0]) Stock['Signal'] = np.zeros(len(Stock), dtype = float) c = n + m -1 while Stock.Date[c] < Stock.Date.loc[lastdate]: """ Though the signal is for day c, for the purpose of calculating the P&L later, the signal is ascribed to day c+1 """ if Stock['%K'][c] < 10 and Stock['%K'][c] < Stock['%D'][c] and c+1 < ns: Stock.loc[Stock.Date[c+1] ,'Signal'] = 1 if Stock['%K'][c] > 95 and Stock['%D'][c] > 95 \ and Stock['%K'][c] > Stock['%D'][c] and c+1 < ns: Stock.loc[Stock.Date[c+1] ,'Signal'] = -1 c += 1 # Create a DataFrame with trades, including the price at the trade Stock_signals = pd.concat([ pd.DataFrame({"Price": Stock.loc[Stock["Signal"] == 1, "Close"], "Signal": "Buy"}), pd.DataFrame({"Price": Stock.loc[Stock["Signal"] == -1, "Close"], "Signal": "Sell"}), ]) Stock_signals.sort_index(inplace = True) # Implement the risk management strategy # to close out an open position by the end of period. last_day = Stock.index[-1] last_price = Stock.Close[-1] if Stock_signals.index[-1] < last_day: df = pd.DataFrame(data = {'Price': [last_price], 'Signal':['Sell']}, \ index = [last_day]) Stock_signals = Stock_signals.append(df) # It could be the signal of the last sample day is a buy if Stock_signals.Signal[-1] == 'Buy': Stock_signals.loc[Stock_signals.index[-1] ,'Signal'] = 'Sell' Stock_signals = filter_signals(Stock_signals) return Stock_signals ########################################################################################## sdate, edate = '2011-01-01', '2011-12-31' n, m = 14, 2 ticker = "merge" Stock_signals = trading_signals(ticker, n, m, sdate, edate) print(Stock_signals) result = cal_PandL (Stock_signals) print('\n') print(result) print('\n') total = result.Profit.sum() print('Total Profit from %s through %s = %0.2f' % (sdate, edate, total))
I merged the cvs files so it look like a single file did before, but all stocks are coming after each other. So when the first stocks data is finish the data from the second stock begins.. like this: 2012-12-27,111.050003,111.25,110.989998,111.110001,98.365829,722100,0.0,1.0 2012-12-28,111.290001,111.300003,111.150002,111.279999,98.516319,1295800,0.0,1.0 2011-03-03,25.09,25.18,25.07,25.129998999999998,18.752764000000003,1509700,0.0,1.0 2011-03-04,25.129998999999998,25.129998999999998,25.01,25.08,18.71545,189100,0.0,1.0 |