import time date = time.strftime("%Y-%m-%d") date = date.replace("'", "") from WindPy import * import pandas as pd import numpy as np from pymongo import MongoClient w.start() w.isconnected() import time from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor, as_completed import json
鏈接數據庫web
host ='xxx.xxx.x.xx' port='27017' user='xxxx' code='utf8utf8' uri= "mongodb://"+user+":"+code+"@"+host+":"+port+"/"+"?authMechanism=SCRAM-SHA-1" client=MongoClient(uri) db = client['CN_Stock'] collection = db['Wind_Close_Price']
獲取Wind中A股公司列表mongodb
A_list = w.wset("sectorconstituent","date=2019-07-30;sectorid=a001010100000000",usedf=True) A_list = A_list[1]#取dataframe部分
多線程數據庫
df = pd.DataFrame(columns = ['wind_code','close','volume','amt','total_shares'] ) df['wind_code'] = A_list['wind_code'] df_list = A_list.copy() #定義多線程函數 def applyParallel(dfGrouped, func): ppe = ThreadPoolExecutor(10)#設置線程池中最多能同時運行的線程數目 futures = [] results = [] for name, group in dfGrouped: p = ppe.submit(func, group)#提交函數和變量,當即返回 futures.append(p) for future in as_completed(futures): r = future.result() results.append(r) return pd.concat(results)#合併表 #定義獲取股票價格函數 def get_price(df_list): stk = df_list['wind_code'].iloc[0] collection = db['Wind_Data_Raw_Price_Daily'] Adjust_Price_Daily = pd.DataFrame() try: data2 = w.wsd(stk, "close,industry_csrc12_n", "2018-01-10", "2019-08-08","industryType=1;PriceAdj=F",usedf=True) data2 = data2[1]#前復權價格 data2['wind_code']= stk Adjust_Price_Daily = pd.concat([Adjust_Price_Daily,data2]) time.sleep(0.5) print("done2_"+stk) except: print('error_no_record'+stk) df['state']= 'error_no_record' return Adjust_Price_Daily df_list['rank'] = df_list.index t1=time.time() Adjust_Price_Daily_temp = applyParallel(df_list.groupby('rank'), get_price) t2=time.time() print(t2-t1) Adjust_Price_Daily.index = range(len(Adjust_Price_Daily)) #某隻股票的價格走勢圖 Adjust_Price_Daily[(Adjust_Price_Daily['wind_code'] == '000001.SZ')].plot(y=['CLOSE']) collection.drop() collection.insert_many(json.loads(Adjust_Price_Daily.T.to_json()).values())
清洗數據,標準化json
Adjust_Price_Daily['date'] = Adjust_Price_Daily.index Adjust_Price_Daily.dropna(axis=0, how='any', inplace=True)#去掉有空值的行 Adjust_Price = Adjust_Price_Daily.pivot( index='date',values='CLOSE', columns='wind_code')#將SQL格式轉化爲DataFrame格式 Adjust_Price.dropna(axis=1, how='any', inplace=True) df_r = Adjust_Price.iloc[:,:]/Adjust_Price.iloc[:,:].shift(1)-1#得到每隻股票天天的回報率 df_m = Adjust_Price.iloc/Adjust_Price.iloc.shift(30)#得到股票30天動量 df_m.dropna(axis=0, how='any', inplace=True) df_r.dropna(axis=0, how='any', inplace=True)
將動量做爲因子多線程
data_all = pd.DataFrame() for i in range(len(df_m)): print (i) df_1= df_m.iloc[i:i+1,:] df_1 = df_1.T df_1.columns = ['m_fz'] q0,q1,q2,q3,q4,q5 = df_1['m_fz'].quantile([0,0.2,0.4,0.6,0.8,1.0])#獲取分位點 bins = [q0-1,q1, q2, q3, q4, q5+1] labels=['1', '2', '3', '4', '5'] df_1 = pd.cut(df_1.m_fz, bins,right=True,labels=labels) #將股票分爲五組 df_2= df_r.iloc[i+15:i+16,:] df_2 = df_2.T df = pd.concat([df_1,df_2],axis=1)#得到一個df,有組別和return值 data = df.groupby('m_fz').mean() #加入5-1,4-2 data_51 = data.iloc[4]-data.iloc[0] data_42 = data.iloc[3]-data.iloc[1] data = data.append(data_51,ignore_index=True) data = data.append(data_42,ignore_index=True) data_all = pd.concat([data_all,data],axis = 1) data_all.index = ['1','2','3','4','5','5-1','4-2'] data_all = data_all.T pic = (1+data_all).cumprod().plot()#畫圖
df_vol = df_r.rolling(30).std()*(252**0.5)#年化利率 df_vol.dropna(axis=0, how='any', inplace=True) data_all = pd.DataFrame() for i in range(len(df_vol)): print (i) df_1= df_vol.iloc[i+15:i+16,:] df_1 = df_1.T df_1.columns = ['v_fz'] q0,q1,q2,q3,q4,q5 = df_1['v_fz'].quantile([0,0.2,0.4,0.6,0.8,1.0]) bins = [q0-1,q1, q2, q3, q4, q5+1] labels=['1', '2', '3', '4', '5'] df_1 = pd.cut(df_1.v_fz, bins,right=True,labels=labels) df_2= df_r.iloc[i:i+1,:] df_2 = df_2.T df = pd.concat([df_1,df_2],axis=1) data = df.groupby('v_fz').mean() data_51 = data.iloc[4]-data.iloc[0] data_42 = data.iloc[3]-data.iloc[1] data = data.append(data_51,ignore_index=True) data = data.append(data_42,ignore_index=True) data_all = pd.concat([data_all,data],axis = 1) data_all = data_all.T data_all.columns = ['1','2','3','4','5','5-1','4-2'] data_all.head() (1+data_all).cumprod().plot()#畫圖
df_Industry = pd.read_excel('首先有一個行業列表的excel.xlsx',encoding='utf-8')#讀取行業,固然這個也是wind下的 df_Industry.columns=['wind_code','Industry'] df_Industry.dropna(axis=0, how='any', inplace=True) #result = df_Industry.values.tolist() Adjust_Price_Daily['date'] = Adjust_Price_Daily.index Adjust_Price_Daily = pd.merge(Adjust_Price_Daily,df_Industry,on='wind_code') Adjust_Price_Daily.dropna(axis=0, how='any', inplace=True) #獲取行業list Industry = df_Industry['Industry'].unique() Industry = Industry.tolist() #根據行業分組,選取每一個行業表現最好的一組 data_all = pd.DataFrame() data_al = pd.DataFrame() for ind in Industry: print(ind) df = Adjust_Price_Daily[Adjust_Price_Daily['Industry']==ind] df = df.pivot( index='date',values='CLOSE', columns='wind_code') df.dropna(axis=1, how='any', inplace=True) df_r = df.iloc[:,:]/df.iloc[:,:].shift(1)-1 df_m = df/df.shift(30) df_m.dropna(axis=0, how='any', inplace=True) df_r.dropna(axis=0, how='any', inplace=True) for i in range(len(df_m)): print (i) df_1= df_m.iloc[i:i+1,:] df_1 = df_1.T df_1.columns = ['m_fz'] if len(df_1)<5: data = df_r.max else: q0,q1,q2,q3,q4,q5 = df_1['m_fz'].quantile([0,0.2,0.4,0.6,0.8,1.0]) bins = [q0-1,q1, q2, q3, q4, q5+1] labels=['1', '2', '3', '4', '5'] df_1 = pd.cut(df_1['m_fz'], bins,right=True,labels=labels) df_2= df_r.iloc[i+15:i+16,:] df_2 = df_2.T df_3 = pd.concat([df_1,df_2],axis=1) data = df_3.groupby('m_fz').mean() data = data.iloc[4] data_al = pd.concat([data_all,data],axis=1) data_all = pd.concat([data_all,data_al],axis=1) data_all.index = ['1','2','3','4','5','5-1','4-2'] data_all = data_all.T (1+data_all).cumprod().plot()