[機械学習・進化計算による株式取引最適化] No.04-04 類似データの計算

このプログラムの目的

複数の時系列データを扱うとき,ある銘柄は別の銘柄を先行している場合があります(ある銘柄の影響が遅れて別の銘柄に伝わること).

これらの関係性を計算することで,株価予測に対して有益な特徴を算出できます.

work_share
├04_get_stock_price_ver2
  ├Dockerfile
  ├docker-compose.yml
  └src
    ├dataset
    ├original_data_2010-01-01_2023-03-01_1d
    ├time_cluster_result
    ├get_stock_price.py
    ├make_original_data.py
    ├make_time_cluster_dataset.py
    ├calculate_nearest_codes.py (これを作成)
    └stocks_code.xls

使用ライブラリ

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tqdm
import joblib

import os
import pickle
import json

from sklearn.preprocessing import StandardScaler
import warnings
warnings.simplefilter('ignore')

データセットのロード

データセットを読み込みます.除去対象の銘柄を除去することを忘れないでください.

def df_split(df):
    train_df = df.loc[df['eval_type']=='train', :].reset_index(drop=True)
    test_df = df.loc[df['eval_type']=='test', :].reset_index(drop=True)
    return train_df, test_df

def load_dataset(args):
    df = pd.read_pickle(args['dataset_path'])

    train_df, test_df = df_split(df)
    train_df = train_df.drop('eval_type', axis=1)
    test_df = test_df.drop('eval_type', axis=1)

    scaler = StandardScaler()
    train_df = pd.DataFrame(data=scaler.fit_transform(train_df), columns=train_df.columns)
    test_df = pd.DataFrame(data=scaler.transform(test_df), columns=test_df.columns)
    train_df = train_df.fillna(0)
    test_df = test_df.fillna(0)

    drop_codes = []
    for col in test_df.columns:
        target = test_df[col]
        if np.any(target > 60) or np.any(target < -100):
            drop_codes.append(col)

    print(f'drop codes :{drop_codes}')
    train_df = train_df.drop(drop_codes, axis=1)
    test_df = test_df.drop(drop_codes, axis=1)

    return train_df, test_df, scaler

類似銘柄の算出

対象銘柄(target_col)のt時間後のデータに対して最も類似している銘柄を取得します.
また,最大max_next_time時間後までのデータを計算します.

今回は系列間の類似度をピアソン相関係数として算出しました.

def process_corr(target_col, df, max_next_time):
    warnings.simplefilter('ignore')
    col_ret = {}
    for t in range(1, max_next_time+1):
        target = df[target_col].shift(-t).values
        nearest_corr = 0
        nearest_code = None
        for col in df.columns:
            if col != target_col:
                corr = abs(np.corrcoef(target[:-t], df[col][:-t].values)[0, 1])
                if corr > nearest_corr:
                    nearest_corr = corr
                    nearest_code = col
        col_ret[t] = {
            'nearest_code':nearest_code,
            'corr':float(nearest_corr)
        }
    return target_col, col_ret

全銘柄に対する類似銘柄の算出

全銘柄に対して類似度を計算する割と時間がかかるので,joblibを利用して並列計算させます.

計算結果は{銘柄:[t1後の類似銘柄, t2後の類似銘柄, ...,tN後の類似銘柄]}という辞書で返します.

def calculate_corr_joblib(df, args):
    print(f'all task num : {len(df.columns)}')
    result = joblib.Parallel(n_jobs=-1, verbose=10)(joblib.delayed(process_corr)(
        target_col, df, args['max_next_time']) for target_col in df.columns)

    ret_dict = {target_col:col_ret for target_col, col_ret in result}
    return ret_dict

実行

本章では30日後までの類似銘柄を算出しました.

def run(args):
    train_df, test_df, scaler = load_dataset(args)
    ret = calculate_corr_joblib(train_df, args)
    with open(args['save_ret_path'], 'w') as f:
        json.dump(ret, f, indent=4, ensure_ascii=False)

def experiment():
    dataset_dir = 'dataset'
    args = {
        'dataset_path':f'./{dataset_dir}/original_dataset_Close.dfpkl',
        'save_ret_path':f'./{dataset_dir}/nearest_calculate_result.json',
        'max_next_time':30
    }
    run(args)

if __name__ == '__main__':
    experiment()
タイトルとURLをコピーしました