[機械学習・進化計算による株式取引最適化] No.04-03 時系列クラスタリング

このプログラムの目的

全銘柄を扱うにあたり,その銘柄を表す特徴量が欲しくなります.もちろん,銘柄コードをOne-hotデータやLabelデータとして扱っても良いのですが,この本では4つの連続的な特徴量として,銘柄を表すことにします.

そのために,全銘柄をクラスタリングし,4つのクラスタに分類します.そして,「各クラスタの中心から,どれだけ離れているか」を銘柄コードの代わりに新たに特徴量として定義します

work_share
├04_get_stock_price_ver2
  ├Dockerfile
  ├docker-compose.yml
  └src
    ├dataset
    ├original_data_2010-01-01_2023-03-01_1d
    ├time_cluster_result(自動生成)
    ├get_stock_price.py
    ├make_original_data.py
    ├make_time_cluster_dataset.py (これを作成)
    └stocks_code.xls

使用ライブラリ

時系列クラスタリングはtslearnのTimeSeriesKMeansを利用すると簡単に実装できます.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tqdm
import datetime

import os
import pickle

from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler
from tslearn.clustering import TimeSeriesKMeans

除外銘柄の検出

各銘柄を正規化してみて,外れ値が大きすぎる銘柄をチェックします.

def df_split(df):
    train_df = df.loc[df['eval_type']=='train', :].reset_index(drop=True)
    test_df = df.loc[df['eval_type']=='test', :].reset_index(drop=True)
    return train_df, test_df

def check_all_codes(args):
    df = pd.read_pickle(args['dataset_path'])
    train_df, test_df = df_split(df)

    train_df = train_df.drop('eval_type', axis=1)
    test_df = test_df.drop('eval_type', axis=1)

    scaler = StandardScaler()
    train_df = pd.DataFrame(data=scaler.fit_transform(train_df), columns=train_df.columns)
    test_df = pd.DataFrame(data=scaler.transform(test_df), columns=test_df.columns)
    train_df = train_df.fillna(0)
    test_df = test_df.fillna(0)

    drop_codes = []
    plt.cla()
    plt.clf()
    for col in test_df.columns:
        plt.plot(test_df[col])
        target = test_df[col]
        if np.any(target > 60) or np.any(target < -100):
            print(col)
            drop_codes.append(col)
    plt.xticks(rotation=30, ha='right')
    plt.savefig(f'{args["result_dir"]}/all_plot.jpg', bbox_inches='tight')

    plt.cla()
    plt.clf()
    for col in test_df.columns:
        if col not in drop_codes:
            plt.plot(test_df[col])
    plt.xticks(rotation=30, ha='right')
    plt.savefig(f'{args["result_dir"]}/all_plot_drop.jpg', bbox_inches='tight')

    return drop_codes

データの正規化

除去対象の銘柄を除去して,クラスタリングのためのデータセットを作ります.

def load_dataset(args):
    df = pd.read_pickle(args['dataset_path'])
    df = df.drop(args['drop_codes'], axis=1)

    train_df, test_df = df_split(df)
    train_df = train_df.drop('eval_type', axis=1)
    test_df = test_df.drop('eval_type', axis=1)

    scaler = StandardScaler()
    train_df = pd.DataFrame(data=scaler.fit_transform(train_df), columns=train_df.columns)
    test_df = pd.DataFrame(data=scaler.transform(test_df), columns=test_df.columns)
    train_df = train_df.fillna(0)
    test_df = test_df.fillna(0)

    return train_df, test_df, scaler

時系列クラスタリング

複数の時系列データを教師なし学習によってクラスタリングします.
最も単純な方法は,時系列データを特徴量ベクトルとして解釈し,Kmeanによってクラスタリングするだけです.以下のコードはそれを表しています.

def learning(df, args):
    X = df.T.values
    params = {
        'n_clusters':args['n_clusters'],
        'metric':args['metric'],
        'max_iter':100,
        'random_state':args['random_state'],
        'n_jobs':8
    }
    model = TimeSeriesKMeans(**params)
    model.fit(X)
    return model

もちろん,単順にKmeansを適用するだけならskleanのKMeansだけで済みます.tsleanのKMeansはmetricとして’euclidean’を指定する場合は一般的なKMeansと変わりありません.

TimeSeriesKMeansのmeatircには’dtw’や’softdtw’が指定できます.筆者の環境ではdtwやsoftdtwは実行時間がかかりすぎたため,実行不可能でした.3500系列×1600点あるので大抵のPCでは実行できないと思いますが,実行できるのでしたらそちらをお勧めします.

クラスタリングした結果

すべての系列データを4つに分類し,それぞれのクラスの平均異動線を見てみます.

def make_timeline_plot(scaler, model, args, save_path):
    df = pd.read_pickle(args['dataset_path'])
    df = df.reset_index()
    date = pd.to_datetime(df['Date'])
    eval_type = df['eval_type']
    df = df.drop(['Date', 'eval_type'], axis=1)
    df = df.drop(args['drop_codes'], axis=1)
    df.loc[:, :] = scaler.transform(df)

    _min = 100
    _max = -100

    labels = model.labels_
    plt.cla()
    plt.clf()
    for c in range(args['n_clusters']):
        class_mask = labels == c
        class_mean = df.loc[:, class_mask].mean(axis=1)
        plt.plot(date, class_mean, label=f'class {c}')

        _min = min(_min, class_mean.min())
        _max = max(_max, class_mean.max())


    train_num = np.sum(eval_type=='train')
    plt.vlines(date.iat[train_num], _min, _max, colors='black', linestyles='dashed')
    plt.ylim([_min, _max])
    plt.xlabel('date')
    plt.legend()
    plt.xticks(rotation=30, ha='right')
    plt.savefig(save_path, bbox_inches='tight')

これらの分類情報を元に,各銘柄のユニークな特徴量を算出します

クラス平均と線銘柄の特徴量

上記の4つのクラス平均線も学習に加えたほうが良いと思うので,保存します.

また,クラスタリングの結果から,各銘柄が各クラスの中心点からどれだけ離れているかを算出します.この4つの特徴量が銘柄を表す特徴となります.より詳細に銘柄を表現したい場合には,分類するクラス数を増やしてください.

def make_dataset(scaler, model, args):
    df = pd.read_pickle(args['dataset_path'])
    df = df.reset_index()
    date = pd.to_datetime(df['Date'])
    eval_type = df['eval_type']
    df = df.drop(['Date', 'eval_type'], axis=1)
    df = df.drop(args['drop_codes'], axis=1)
    df.loc[:, :] = scaler.transform(df)
    labels = model.labels_

    mean_df = pd.DataFrame()
    for c in range(args['n_clusters']):
        class_mask = labels == c
        class_mean = df.loc[:, class_mask].mean(axis=1)
        mean_df[f'class {c}'] = class_mean

    mean_df.to_pickle(args['save_dataset_path_mean'])
    print(mean_df)

    train_num = np.sum(eval_type=='train')

    dist_df = {}
    for col in  tqdm.tqdm(df.columns):
        col_dist = ((mean_df.loc[:train_num].T - df.loc[:train_num, col].T)**2).sum(axis=1)**0.5
        dist_df[col] = col_dist
    dist_df = pd.DataFrame(dist_df)
    dist_df.to_pickle(args['save_dataset_path_dist'])
    print(dist_df)

各関数の実行

このページの関数の実行順およびパラメータです.

def run(args):
    drop_codes = check_all_codes(args)
    args['drop_codes'] = drop_codes
    train_df, test_df, scaler = load_dataset(args)
    model = learning(train_df, args)

    graph_save_path = f'{args["result_dir"]}/timeline.jpg'
    make_timeline_plot(scaler, model, args, graph_save_path)

    ret = {
        'model':model,
        'scaler':scaler
    }

    ret_save_path = f'{args["result_dir"]}/ret'
    with open(ret_save_path, 'wb') as f:
        pickle.dump(ret, f)

    make_dataset(scaler, model, args)

def experiment():
    dataset_dir = 'dataset'
    for metric in ['euclidean']:#, 'softdtw']:
        result_dir = f'time_cluster_result/experiment_{metric}/'
        os.makedirs(result_dir, exist_ok=True)
        #for run_seed in range(1):
        for n_clusters in [4]:
            result_seed_dir = f'{result_dir}/class_{n_clusters}'
            os.makedirs(result_seed_dir, exist_ok=True)
            args = {
                'dataset_path':f'./{dataset_dir}/original_dataset_Close.dfpkl',
                'n_clusters':n_clusters,
                'metric':metric,
                'random_state':0,
                'result_dir':result_seed_dir,
                'save_dataset_path_mean':f'./{dataset_dir}/clusterling_dataset_mean.dfpkl',
                'save_dataset_path_dist':f'./{dataset_dir}/clusterling_dataset_dist.dfpkl',
            }
            run(args)

if __name__ == '__main__':
    experiment()

ここでは学習データとテストデータ7:3の割合で分割することにしました.また,クラス数を増やしても,クラス平均線が大きく分かれなかったため,4つとしました.

タイトルとURLをコピーしました