今天的文章是我的是一位朋友理明(笔名)先生的投稿,研究的是如何借助Excel+Python来分析大量的股票样本数据,从而避免投资者因小数定律影响而造成的决策错误。在此特别感谢理明先生!在股票投资中,投资者常因观察到特定现象(如MACD金叉)而预期股价上涨,但实际效果往往差强人意,这体现了小数定律的影响:少量数据或个别事件不能准确反映整体规律或概率,易被误解为普遍现象。在投资决策中尤其需警惕,避免基于有限信息做出判断。类似地,分析其他特殊现象(如股价倍量突破60日移动平均线)时,也需谨慎考虑其统计意义和普遍性。例如,上图案例股票上升前,股价在倍量突破60日均线后表现强劲,且具有更好的成本优势,但这单一案例能否作为普遍买进信号?为验证这类现象的可靠性,需要基于大样本量进行统计分析。比如以沪深300成分股,分析过去200个交易日的数据,这个工作量对于个人来说简直是噩梦级别的,因此需借助计算机辅助。接下来我们就以倍量上穿MA60作为现象梳理分析思路。特别声明:本文旨在探讨分析思路,提醒投资者注意:任何单一现象或策略的有效性需经广泛验证。文中提及的个股仅为案例分析,不构成投资建议,仅供学习参考。股市有风险,入市需谨慎。我希望对沪深300成分股在过去200个交易日中每一次出现倍量上穿MA60的现象进行记录,并分别记录上穿之后T+1/T+2/T+3/T+4/T+5/T+10/T+20的收益情况,最后分别计算正收益次数与现象次数的比值,得到该现象的胜率。本案例中涉及的计算并不复杂,用Excel函数、VBA、Python都可以实现,难点在于需要处理的数据量太大:股票数量300只;每只股票周期数200个;每只股票至少要用到交易日、收盘价、成交量三个数据指标。数据总数=300*200*3 共计18万个数据……Excel函数直接pass吧,VBA也行,但VBA运行的效率和Python比起来就弱得多,最关键的是VBA运行期间电脑基本没法做别的,所以上Python!一、获取股票数据,获取股票数据的方法有很多,比如从API接口获取,爬虫、Excel中的PQ等都可以获取,本例中由于我长期做量化分析,因此有保存股票数据的习惯,所以我直接从FTP服务器获取数据。
def read_stock_codes(file_path):
df = pd.read_excel(file_path, usecols=[0])
list = df['stock_code'].tolist()
return list
def fetch_stock_data(ftp_host, ftp_user, ftp_pass, stock_codes, remote_base_path):
ftp = FTP(ftp_host)
ftp.login(user=ftp_user, passwd=ftp_pass)
all_dfs = []
for code in stock_codes:
remote_file_path = f'{remote_base_path}/{code}_daily.csv'
try:
data = io.BytesIO()
ftp.retrbinary(f'RETR {remote_file_path}', data.write)
data.seek(0)
df = pd.read_csv(data, encoding='utf-8')
df['stock_code'] = code
return df
这里多插一句,由于我们涉及到300只股票,因此我们可以用for循环来分别获取每一只股票的数据,通过Python的pandas库读取并转换dataframe,然后将所有股票数据拼接成一个完整的数据表。(由于涉及不同股票的信号计算,拼接前先将数据进行计算加工会更加合理)
def fetch_stock_data(ftp_host, ftp_user, ftp_pass, stock_codes, remote_base_path):
ftp = FTP(ftp_host)
ftp.login(user=ftp_user, passwd=ftp_pass)
all_dfs = []
for code in stock_codes:
remote_file_path = f'{remote_base_path}/{code}_daily.csv'
try:
data = io.BytesIO()
ftp.retrbinary(f'RETR {remote_file_path}', data.write)
data.seek(0)
df = pd.read_csv(data, encoding='utf-8')
df['stock_code'] = code
df = df[['stock_code', 'stime', 'close', 'volume']]
df = calculate_ma(df, 60)
df = cross_over(df)
df = delta_volume(df)
df['buy_price'] = df.apply(lambda row: row['close'] if row['cross_over'] == 'True' and row['delta_vol'] == 'True' and row['hist'] > 0 else 0, axis=1)
all_dfs.append(df)
print(f'{code}数据处理结束')
except Exception as e:
print(f'数据处理遇到错误 {code}: {e}')
ftp.quit()
二、数据加工,获取数据后我们需要分别计算MA60均线的数据以及成交量相较于上一周期的变化值,因此我们定义两个函数分别计算这两个数据:def calculate_ma(df, ma_period):
df['ma'] = round(df['close'].rolling(window=ma_period).mean(), 3)
return df
def delta_volume(df):
df['delta_vol'] = 'none'
for i in range(len(df)):
if i < 1:
continue
elif df['volume'].iloc[i - 1] * 2 <= df['volume'].iloc[i]:
df.iloc[i, df.columns.get_loc('delta_vol')] = 'True'
else:
df.iloc[i, df.columns.get_loc('delta_vol')] = 'False'
return df
def cross_over(df):
df['cross_over'] = 'none'
for i in range(len(df)):
if i < 1:
elif df['close'].iloc[i - 1] <= df['ma'].iloc[i - 1] and df['close'].iloc[i] > df['ma'].iloc[i]:
df.iloc[i, df.columns.get_loc('cross_over')] = 'True'
else:
df.iloc[i, df.columns.get_loc('cross_over')] = 'False'
return df
我们在fetch_stock_data函数中依次调用上面的几个函数完成数据计算,然后记录cross_over和delta_vol都为真时的收盘价,记录满足条件后T+1/T+2/T+3/T+4/T+5/T+10/T+20的收益率,最后将整个DataFrame保存为csv文件,文件部分截图如下:看到这个数据表,相信武老师的忠实粉丝们一定有更多、更好的方法对上述数据进行不同形式的统计分析,这个环节我认为是无需赘述了。从上图可以看出,倍量上穿MA60后,正收益的比例随着行情发展逐渐降低。虽然胜率最高只有45.96%,但作为单因子模型,这个胜率是可以被采纳的,之后我们需要通过其他因子的叠加来提升策略的整体收益率。此外,该因子在T+5之后胜率显著降低,因此也可以作为一个短线策略的进场因子备用。以上观点仅供参考学习,不构成投资建议,操作风险自担,股市有风险,入市需谨慎。如果认为无法完成,也可以在评论区留言,我会在后续的文章中单独对数据分析过程进行讲解。import pandas as pd
from ftplib import FTP
import io
import numpy as np
def read_stock_codes(file_path):
df = pd.read_excel(file_path, usecols=[0])
list = df['stock_code'].tolist()
return list
def calculate_ma(df, ma_period):
df['ma'] = round(df['close'].rolling(window=ma_period).mean(), 3)
return df
def cross_over(df):
df['cross_over'] = 'none'
for i in range(len(df)):
if i < 1:
continue
elif df['close'].iloc[i - 1] <= df['ma'].iloc[i - 1] and df['close'].iloc[i] > df['ma'].iloc[i]:
df.iloc[i, df.columns.get_loc('cross_over')] = 'True'
else:
df.iloc[i, df.columns.get_loc('cross_over')] = 'False'
return df
def delta_volume(df):
df['delta_vol'] = 'none'
for i in range(len(df)):
if i < 1:
continue
elif df['volume'].iloc[i - 1] * 2 <= df['volume'].iloc[i]:
df.iloc[i, df.columns.get_loc('delta_vol')] = 'True'
else:
df.iloc[i, df.columns.get_loc('delta_vol')] = 'False'
return df
def fetch_stock_data(ftp_host, ftp_user, ftp_pass, stock_codes, remote_base_path):
ftp = FTP(ftp_host)
ftp.login(user=ftp_user, passwd=ftp_pass)
all_dfs = []
for code in stock_codes:
remote_file_path = f'{remote_base_path}/{code}_daily.csv'
try:
data = io.BytesIO()
ftp.retrbinary(f'RETR {remote_file_path}', data.write)
data.seek(0)
df = pd.read_csv(data, encoding='utf-8')
df['stock_code'] = code
df = df[['stock_code', 'stime', 'close', 'volume']]
df = calculate_ma(df, 60)
df = cross_over(df)
df = delta_volume(df)
df['buy_price'] = df.apply(lambda row: row['close'] if row['cross_over'] == 'True' and row['delta_vol'] == 'True' and row['hist'] > 0 else 0, axis=1)
all_dfs.append(df)
print(f'{code}数据处理结束')
except Exception as e:
print(f'数据处理遇到错误 {code}: {e}')
ftp.quit()
if all_dfs:
combined_df = pd.concat(all_dfs, ignore_index=True)
for i in range(len(combined_df)):
if combined_df.iloc[i]['buy_price'] > 0:
for offset in [1, 2, 3, 4, 5, 10, 20]:
if i + offset < len(combined_df) and combined_df.iloc[i + offset]['stock_code'] == combined_df.iloc[i]['stock_code']:
column_name = f'T+{offset}'
combined_df.at[i, column_name] = round(combined_df.iloc[i + offset]['close'] / combined_df.iloc[i]['buy_price'] - 1, 4)
combined_df.to_csv('D:\\策略想法验证\\hs300list_combined.csv', index=False, header=True)
print("Data saved to hs300list_combined.csv")
if __name__ == '__main__':
ftp_host = '88.88.88.88'
ftp_user = 'user'
ftp_pass = '*****'
excel_file_path = 'D:\\策略想法验证\\hs300list.xlsx'
remote_base_path = '/stock_data/'
stock_codes = read_stock_codes(excel_file_path)
fetch_stock_data(ftp_host, ftp_user, ftp_pass, stock_codes, remote_base_path)