[機械学習・進化計算による株式取引最適化] No.03-01 市場シミュレーター環境の作成

このプログラムの目的

stock_env.pyの目的は,作成したデータセットとネットワークのやり取りを,強化学習がやり取りしやすい形の「環境」で表現することです.
また,一般的なgymの環境(env)に少し合わせて書いてありますが,銘柄コードの変更・評価モードなどの一般的ではないステップを踏むため,gymの環境(env)とは異なる動作を目的としています.

work_share
├02_get_stock_price
└03_dqn_learning
  ├Dockerfile
  ├docker-compose.yml
  └src
    ├draw_graph
    |  └draw_tools.py
    ├enviroment
    |  └stock_env.py  (これを作成)
    ├reinforcement_learning
    |  └dqn.py
    ├result(自動生成)
    └experiment01.py

使用ライブラリ

import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import StandardScaler
import gym

データの分割

生成したデータセットを学習用とテスト用に分割する関数です.データセット自体が学習用かテスト用かを判別するフラグ系列(eval)を保持しているので,基本的にはそれを利用します.

一応,train_rateを与えて自由に分割できるように実装していますが,今回はしようしません.

def data_split(data, train_rate=0.7, use_eval_mask=False):
    train_data = {}
    test_data = {}
    if use_eval_mask:
        train_mask = data['other']['eval'] == 'train'
        test_mask = data['other']['eval'] == 'test'
        for key, df in data.items():
            train_data[key] = df[train_mask].reset_index(drop=True)
            test_data[key]  = df[test_mask].reset_index(drop=True)
    else:
        train_num = int(len(data['X']) * train_rate)
        for key, df in data.items():
            train_data[key] = df.iloc[:train_num, :].reset_index(drop=True)
            test_data[key]  = df.iloc[train_num:, :].reset_index(drop=True)
    return train_data, test_data

データの変換

用意したデータセットは入力用の系列と評価用の系列が混合しているので,それを分割します.

また,入力用のデータ(X)と評価用のデータ(other)の系列名を出力し,データが混合していないことを確認します.

def preprocess(df):
    nan_mask = df['original_value'].isna() & df['next_original_value'].isna()
    df = df.loc[nan_mask != True, :].reset_index(drop=True)

    other = df[['date', 'code', 'original_value', 'next_original_value', 'eval']]
    X = df.drop(other.columns, axis=1)

    # check input columns.
    print(f'use input columns : {list(X.columns)}')
    # check other columns.
    print(f'use info columns  : {list(other.columns)}')
    return {'X':X, 'other':other}

データの型変換

上で変換されたデータはpandasのデフォルト型(小数点を含む数値の場合はnp.float64)になっていますが,pytorchで扱う型はfloat32なので変換します.
さらに,評価用のデータ(other)もpandasのDataFrameのままだと要素の指定が面倒なので(.atで指定),numpyのデータと辞書に変換します.

def to_learning_data(data):
    X = data['X'].values.astype(np.float32)
    other = {col:data['other'][col].values for col in data['other'].columns}
    return {'X':X, 'other':other}

コスト関数

株価を売買するにあたり,現実世界では手数料が発生します.多くの論文では株価の1%~0.1%程度に設定することが多いですが,日本の証券会社の手数料は%表記ではないことが多いので,それに合わせました.楽天の現物取引の手数料を実装しましたが,何を実装してもらっても構いません.

def default_cost_func(value):
    #reference to https://www.rakuten-sec.co.jp/web/commission/
    if value < 50_000:
        return 55
    if 50_000 <= value < 100_000:
        return 99
    if 100_000 <= value < 200_000:
        return 115
    if 200_000 <= value < 500_000:
        return 275
    if 500_000 <= value < 1_000_000:
        return 535
    if 1_000_000 <= value < 1_500_000:
        return 640
    if 1_500_000 <= value < 30_000_000:
        return 1013
    if 30_000_000 <= value:
        return 1070

環境クラス

強化学習で肝となる環境を実装します.

  • self.action_spaceだけgymのspaceによって実装しています.これは強化学習ライブラリpflrのε-greedy法を利用するときにgym.spaces.Discreteオブジェクトが必要となるからです.
  • moneyは現金を,total_assetsは所有総資産を, all_sold_moneyは所有株をすべて売却した時の現金(total_assetsから手数料が引かれます)です.
  • stock_holdsは手持ちの株の数を,stock_holds_valueは各株の現在の価格を保持します.
  • original_code_listは元の銘柄コードを,stock_code_listは0~Nまでに再表現された銘柄コードの一覧です.
  • max_data_lengthは現在セットしたる銘柄データの系列長です.
  • evaluate_codeseval_use_codesは多すぎる銘柄を取捨選択するために利用します.これは主に次の項のdqn.pyで利用されます.
class stock_env:
    def __init__(self, init_money=10000000,
                trade_cost_func=None, purchase_max=10,
                reward_last_only=True):
        self.init_money = init_money

        if trade_cost_func == None:
            self.trade_cost_func = default_cost_func
        else:
            self.trade_cost_func = trade_cost_func
        self.np_cost_func = np.frompyfunc(self.trade_cost_func, 1, 1)

        self.reward_last_only = reward_last_only

        self.train_data = None
        self.test_data = None
        self.scaler = None

        self.action_num = purchase_max*2 + 1#[-k, ..., -1, 0, 1, ..., k]
        self.action_space = gym.spaces.Discrete(self.action_num)
        self.input_num = None

        self.t = 0
        self.money = init_money
        self.total_assets = init_money
        self.all_sold_money = init_money
        self.stock_holds = None
        self.original_stock_code_list = []
        self.stock_code_list = []
        self.stock_holds_value = None

        self.max_data_length = None
        self.evaluate_codes = {}
        self.eval_use_codes = []

stock_envのメンバー関数: データセットの登録

環境オフジェクトにデータセットを登録します.このときinput_numが決定されます.
input_numはデータセットの入力データ(X)の次数に3つ足したものです.3つはそれぞれ環境がもつ状態(所有株数,所持金,総資産)です.

    def set_df(self, df):
        data = preprocess(df)
        # new_code start 0, 1, 2, ...
        new_code, code_to = pd.factorize(data['other']['code'], sort=True)
        self.original_stock_code_list = code_to
        data['other']['code'] = new_code
        self.stock_code_list = np.unique(new_code)

        self.evaluate_codes = {code:(0, 0) for code in self.stock_code_list}

        self.train_data, self.test_data = data_split(data, use_eval_mask=True)
        print(f'train size : {len(self.train_data["X"])}, test size : {len(self.test_data["X"])}')

        train_df = self.train_data['X']
        test_df = self.test_data['X']
        scaler = StandardScaler()

        train_df = pd.DataFrame(data=scaler.fit_transform(train_df), columns=train_df.columns)
        test_df = pd.DataFrame(data=scaler.transform(test_df), columns=test_df.columns)
        train_df = train_df.fillna(0)
        test_df = test_df.fillna(0)

        self.scaler = scaler
        self.train_data['X'] = train_df.astype(np.float32)
        self.test_data['X'] = test_df.astype(np.float32)

        self.input_num = data['X'].shape[1] + 3

stock_envのメンバー関数: 銘柄の変更

銘柄の変更をする関数です.基本的にはcode=Noneが引数で与えられ,ランダムな銘柄コードが選択されます.一応毎回dateでソートしていますが,おそらく必要ないです.

    def resample(self, code=None):
        if code == None:
            code = np.random.choice(self.stock_code_list)
        self.selected_code = code

        code_mask = self.train_data['other']['code'] == code
        date = self.train_data['other'][code_mask]['date'].reset_index(drop=True)

        data = {}
        for key, df in self.train_data.items():
            data[key] = df[code_mask].reset_index(drop=True)
            data[key]['sort_key'] = date.copy()
            data[key] = data[key].sort_values('sort_key').reset_index(drop=True)
            data[key] = data[key].drop('sort_key', axis=1).reset_index(drop=True)

        self.data = to_learning_data(data)
        self.max_data_length = len(self.data['X']) - 1

stock_envのメンバー関数: 総資産・手数料を引いた総資産の計算

総資産を計算します.この計算の高速化のために銘柄コードが0~Nとなるように再割り当てし,stock_holdsとstock_holds_valueをnumpy.arrayとしました.

    def get_total_assets(self):
        total_stock_value = np.sum(self.stock_holds * self.stock_holds_value)
        total_assets = self.money + total_stock_value
        return total_assets

    def get_money_when_all_sell(self, total_assets):
        values = self.stock_holds * self.stock_holds_value
        money = total_assets - np.sum(self.np_cost_func(values[values>0]))
        return money

stock_envのメンバー関数: 使用する銘柄コードの取捨選択

使用する銘柄コードを取捨選択します.なぜなら,すべての銘柄を利用して,取引のシミュレーションを行うと,とても時間がかかるためです.
利益が上がる銘柄のみを選択し,評価で利用するように設定します.

  • evaluate_codesは利益率の試行平均を計算するために試行合計と試行数を保持しています.
  • borderlineは1.1が設定されていますが,実際の実験では1.01が設定されています.
    def set_eval_use_codes(self, borderline=1.1):
        use_codes = []
        for key, (sum_rate, count) in self.evaluate_codes.items():
            if count != 0 and (sum_rate / count) > borderline:
                use_codes.append(key)
        self.eval_use_codes = use_codes

stock_envのメンバー関数: 評価モード

複数の銘柄を利用したシミュレーションを行うために,データをセットする関数です.

    def eval_mode(self, eval_type):
        if eval_type == 'train':
            use_data = self.train_data
        if eval_type == 'test':
            use_data = self.test_data

        mask = use_data['other']['code'].isin(self.eval_use_codes)
        data = {}
        for key, df in use_data.items():
            data[key] = df[mask].reset_index(drop=True)

        self.data = to_learning_data(data)
        self.max_data_length = len(self.data['X']) - 1

stock_envのメンバー関数: 環境の状態の初期化

環境の状態を初期化します.これは所持金,株数等を初期化することです.
また,環境の最初の状態をreturnします.

    def reset(self):
        self.t = 0
        self.money = self.init_money
        self.stock_holds = np.zeros(len(self.stock_code_list), dtype=np.int32)
        self.stock_holds_value = np.zeros(len(self.stock_code_list), dtype=np.float64)

        X = self.data['X'][self.t]

        scaled_hold_num = 0
        scaled_money = 0
        scaled_total_assets = 0
        X = np.append(X, [scaled_hold_num, scaled_money, scaled_total_assets]).astype(np.float32)
        state = torch.from_numpy(X)
        return state

stock_envのメンバー関数: 環境の状態を進める

環境の状態を進める関数です.アクションを引数として受け取り,

  1. 株の購入数を決定
  2. 所持金などの再計算
  3. 総資産などの再計算
  4. 終了条件を満たすか
  5. 次の状態に進める
  6. 次の状態を計算
  7. 報酬の計算
    を行います.報酬は総資産です(所持金を報酬とするコードはコメントアウトしてあります).
    また,報酬を最後に与えるか否かをreward_last_onlyで決定しています.
    def step(self, action):
        value = self.data['other']['original_value'][self.t].astype(np.float32)
        next_value = self.data['other']['next_original_value'][self.t].astype(np.float32)
        code = self.data['other']['code'][self.t]
        date = self.data['other']['date'][self.t]
        info = {'value':value, 'code':code, 'date':date}

        purchase = action - (self.action_num - 1)/2
        purchase_num = abs(purchase)

        #if purchase == 0: # hold
        if purchase < 0 and (self.money - value*purchase_num) > 0: # buy
            self.money -= value*purchase_num
            self.stock_holds[code] += purchase_num
            self.money -= self.trade_cost_func(value*purchase_num)

        if purchase > 0 and self.stock_holds[code] > 0: # sell
            self.money += value*purchase_num
            self.stock_holds[code] -= purchase_num
            self.money -= self.trade_cost_func(value*purchase_num)

        if self.stock_holds[code] > 0:
            self.stock_holds_value[code] = value

        self.total_assets = self.get_total_assets()
        self.all_sold_money = self.get_money_when_all_sell(self.total_assets)

        done = self.t == self.max_data_length -2
        self.t += 1
        X = self.data['X'][self.t]
        next_code = self.data['other']['code'][self.t]
        scaled_hold_num = self.stock_holds[next_code] / 10
        scaled_money = (self.money - self.init_money) / self.init_money
        scaled_total_assets = (self.total_assets - self.init_money) / self.init_money
        X = np.append(X, [scaled_hold_num, scaled_money, scaled_total_assets]).astype(np.float32)
        state = torch.from_numpy(X)

        if self.reward_last_only:
            if done:
                reward = (self.total_assets - self.init_money)/self.init_money
                #reward = (self.money - self.init_money)/self.init_money
            else:
                reward = 0
        else:
            reward = (self.total_assets - self.init_money)/self.init_money
            #reward = (self.money - self.init_money)/self.init_money
        return state, reward, done, info
タイトルとURLをコピーしました