If you want to do something good, do it yourself.
Ferdinand Porsche
df = df.set_index(['dt', 'symbol'], drop=False).sort_index() df.tail(len(df.index.levels[1]) * 2)
# agg_rules = { 'dt': 'last', 'symbol': 'last', 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'adj': 'last' } level_values = df.index.get_level_values # fig = plt.figure(figsize=(15, 3), facecolor='white') df.groupby([pd.Grouper(freq='W', level=0)] + [level_values(i) for i in [1]]).agg( agg_rules).set_index(['dt', 'symbol'], drop=False ).close.unstack(1).plot(ax=fig.add_subplot(131), title="Weekly") df.groupby([pd.Grouper(freq='M', level=0)] + [level_values(i) for i in [1]]).agg( agg_rules).set_index(['dt', 'symbol'], drop=False ).close.unstack(1).plot(ax=fig.add_subplot(132), title="Monthly") df.groupby([pd.Grouper(freq='Y', level=0)] + [level_values(i) for i in [1]]).agg( agg_rules).set_index(['dt', 'symbol'], drop=False ).close.unstack(1).plot(ax=fig.add_subplot(133), title="Yearly") plt.show()
monthly = df.groupby([pd.Grouper(freq='M', level=0), level_values(1)]).agg(agg_rules) \ .set_index(['dt', 'symbol'], drop=False)
monthly.loc[(slice(None), ['QQQ']), :] #
monthly = df.groupby([pd.Grouper(freq='M', level=0), level_values(1)]).agg( agg_rules).set_index(['dt', 'symbol'], drop=False) # . . monthly['pct_close'] = monthly.groupby(level=1)['close'].pct_change().fillna(0) # ax = monthly.pct_close.unstack(1).plot(title="Monthly", figsize=(15, 4)) ax.axhline(0, color='k', linestyle='--', lw=0.5) plt.show()
rolling_prod = lambda x: x.rolling(len(x), min_periods=1).apply(np.prod) # monthly = df.groupby([pd.Grouper(freq='M', level=0), level_values(1)]).agg( agg_rules).set_index(['dt', 'symbol'], drop=False) # . . 1. monthly['pct_close'] = monthly.groupby(level=1)['close'].pct_change().fillna(0) + 1 # DataFrame 2007 fltr = monthly.dt >= '2007-01-01' test = monthly[fltr].copy().set_index(['dt', 'symbol'], drop=False) # dataframe test.loc[test.index.levels[0][0], 'pct_close'] = 1 # 1 # test['performance'] = test.groupby(level=1)['pct_close'].transform(rolling_prod) - 1 # ax = test.performance.unstack(1).plot(title="Performance (Monthly) from 2007-01-01", figsize=(15, 4)) ax.axhline(0, color='k', linestyle='--', lw=0.5) plt.show() # test.tail(len(test.index.levels[1])).sort_values('performance', ascending=False)
def rebalance_simple(x): # data = x.unstack(1) return (data.pct_close * data['size']).sum() / data['size'].sum() def rebalance_sma(x): # , SMA50 > SMA200 data = x.unstack(1) fltr = data['sma50'] > data['sma200'] if not data[fltr]['size'].sum(): return 1 # , return (data[fltr].pct_close * data[fltr]['size']).sum() / data[fltr]['size'].sum() def rebalance_rsi(x): # , RSI100 > 50 data = x.unstack(1) fltr = data['rsi100'] > 50 if not data[fltr]['size'].sum(): return 1 # , return (data[fltr].pct_close * data[fltr]['size']).sum() / data[fltr]['size'].sum() def rebalance_custom(x, df=None): # data = x.unstack(1) for s in data.index: if data['dt'][s]: fltr_dt = df['dt'] < data['rebalance_dt'][s] # values = df[fltr_dt].loc[(slice(None), [s]), 'close'].values data.loc[s, 'custom'] = 0 # if len(values) > len(values[np.isnan(values)]): # RSI 100 data.loc[s, 'custom'] = talib.RSI(values, timeperiod=100)[-1] fltr = data['custom'] > 50 if not data[fltr]['size'].sum(): return 1 # , return (data[fltr].pct_close * data[fltr]['size']).sum() / data[fltr]['size'].sum() def drawdown(chg, is_max=False): # total = len(chg.index) rolling_max = chg.rolling(total, min_periods=1).max() daily_drawdown = chg/rolling_max - 1.0 if is_max: return daily_drawdown.rolling(total, min_periods=1).min() return daily_drawdown
# 1 , df['sma50'] = df.groupby(level=1)['close'].transform(lambda x: talib.SMA(x.values, timeperiod=50)).shift(1) df['sma200'] = df.groupby(level=1)['close'].transform(lambda x: talib.SMA(x.values, timeperiod=200)).shift(1) df['rsi100'] = df.groupby(level=1)['close'].transform(lambda x: talib.RSI(x.values, timeperiod=100)).shift(1)
# : , , portfolios = [ {'symbols': [('SPY', 0.8), ('AGG', 0.2)], 'func': rebalance_sma, 'name': 'Portfolio 80/20 SMA50x200'}, {'symbols': [('SPY', 0.8), ('AGG', 0.2)], 'func': rebalance_rsi, 'name': 'Portfolio 80/20 RSI100>50'}, {'symbols': [('SPY', 0.8), ('AGG', 0.2)], 'func': partial(rebalance_custom, df=df), 'name': 'Portfolio 80/20 Custom'}, {'symbols': [('SPY', 0.8), ('AGG', 0.2)], 'func': rebalance_simple, 'name': 'Portfolio 80/20'}, {'symbols': [('SPY', 0.4), ('AGG', 0.6)], 'func': rebalance_simple, 'name': 'Portfolio 40/60'}, {'symbols': [('SPY', 0.2), ('AGG', 0.8)], 'func': rebalance_simple, 'name': 'Portfolio 20/80'}, {'symbols': [('DIA', 0.2), ('QQQ', 0.3), ('SPY', 0.2), ('IWM', 0.2), ('AGG', 0.1)], 'func': rebalance_simple, 'name': 'Portfolio DIA & QQQ & SPY & IWM & AGG'}, ] for p in portfolios: # rebalance['size'] = 0. for s, pct in p['symbols']: # rebalance.loc[(slice(None), [s]), 'size'] = pct # rebalance_perf = rebalance.stack().unstack([1, 2]).apply(p['func'], axis=1) # p['performance'] = (rebalance_perf.rolling(len(rebalance_perf), min_periods=1).apply(np.prod) - 1) # p['drawdown'] = drawdown(p['performance'] + 1, is_max=True)
fig = plt.figure(figsize=(15, 4), facecolor='white') ax_perf = fig.add_subplot(121) ax_dd = fig.add_subplot(122) for p in portfolios: p['performance'].rename(p['name']).plot(ax=ax_perf, legend=True, title='Performance') p['drawdown'].rename(p['name']).plot(ax=ax_dd, legend=True, title='Max drawdown') # print(f"{p['name']}: {p['performance'][-1]*100:.2f}% / {p['drawdown'][-1]*100:.2f}%") # SPY, rebalance.loc[(slice(None), ['SPY']), :].set_index('dt', drop=False).performance. \ rename('SPY').plot(ax=ax_perf, legend=True) drawdown(rebalance.loc[(slice(None), ['SPY']), :].set_index('dt', drop=False).performance + 1, is_max=True).rename('SPY').plot(ax=ax_dd, legend=True) ax_perf.axhline(0, color='k', linestyle='--', lw=0.5) ax_dd.axhline(0, color='k', linestyle='--', lw=0.5) plt.show()
Source: https://habr.com/ru/post/419979/
All Articles