1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| import json import logging import os import time
import akshare as ak from tqdm import tqdm
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='stock_data.log' ) logger = logging.getLogger(__name__)
def get_stock_list(): """获取A股股票列表""" try: stock_list = ak.stock_zh_a_spot_em() return [{"code": row["代码"], "name": row["名称"]} for _, row in stock_list.iterrows()] except Exception as e: logger.error(f"获取股票列表失败: {e}") raise
def get_stock_history(stock_code): """获取单只股票的历史数据""" try:
stock_data = ak.stock_zh_a_hist(symbol=stock_code, period="daily")
return [dict(row) for _, row in stock_data.iterrows()] if stock_data is not None else [] except Exception as e: logger.error(f"获取股票 {stock_code} 历史数据失败: {e}") return []
def main(): """主函数:获取全部A股历史数据并保存为单个JSON""" try: main_path = "all_stock_data" if not os.path.exists(main_path): os.mkdir(main_path)
stock_list = get_stock_list()
for stock in tqdm(stock_list, desc="获取股票数据"): stock_code = stock["code"] stock_name = stock["name"]
file_path = os.path.join(main_path, f"{stock_code}.json") if os.path.exists(file_path): logger.info(f"股票 {stock_code} 的数据已存在,跳过获取。") continue
history_data = get_stock_history(stock_code)
for item in history_data: item["stock_code"] = stock_code item["stock_name"] = stock_name item["日期"] = str(item["日期"])
with open(file_path, "w", encoding="utf-8") as f: json_str = json.dumps(history_data, ensure_ascii=False, indent=2) f.write(json_str)
time.sleep(1) except Exception as e: logger.error(f"主程序运行失败: {e}") raise
if __name__ == "__main__": main()
|