[機械学習・進化計算による株式取引最適化] No.06-01 市場シミュレーター環境の作成

このプログラムの目的

stock_env.pyの目的は,作成したデータセットとネットワークのやり取りを,強化学習がやり取りしやすい形の「環境」で表現することです.

このプログラムはNo.02章といくつかの相違点があります.

  • 関数を減らし,環境の呼び出しを複雑にしないようにした
  • 学習中は一定期間をサンプリングして利用
  • 2023年に近いほどサンプリングされやすい
  • アクション(行動)は総資産の何%を銘柄にするか(何%を現金で持つか)
  • 何個の銘柄を利用するか
work_share
├06_sampling_dqn_learning
  ├Dockerfile
  ├docker-compose.yml
  └src
    ├draw_graph
    | └draw_tools.py
    ├enviroment
    | └stock_env.py (これを作成)
    ├reinforcement_learning
    | └dqn.py
    └experiment01.py

使用ライブラリ

import pandas as pd
import numpy as np
import torch
import tqdm
import gym

コスト関数

コスト関数は楽天の手数料と,コストなしを実装しました.

def default_cost_func(value):
    #reference to https://www.rakuten-sec.co.jp/web/commission/
    if value < 50_000:
        return 55
    if 50_000 <= value < 100_000:
        return 99
    if 100_000 <= value < 200_000:
        return 115
    if 200_000 <= value < 500_000:
        return 275
    if 500_000 <= value < 1_000_000:
        return 535
    if 1_000_000 <= value < 1_500_000:
        return 640
    if 1_500_000 <= value < 30_000_000:
        return 1013
    if 30_000_000 <= value:
        return 1070

def zero_cost_func(value):
    return 0

環境クラス

強化学習で肝となる環境を実装します.

class stock_env:
    def __init__(self, dataset_dir1=None, dataset_dir2=None, init_money=10000000,
                trade_cost_func='default', n_split_of_hold_rate=100,
                sampling_t=30, reward_last_only=True,
                eval_score_size=10, n_code_select=10, sampling_alpha=0.001):
        self.init_money = init_money

        if trade_cost_func == 'default':
            self.trade_cost_func = default_cost_func
        if trade_cost_func == 'zero':
            self.trade_cost_func = zero_cost_func
        self.np_cost_func = np.frompyfunc(self.trade_cost_func, 1, 1)

        self.n_split_of_hold_rate = n_split_of_hold_rate
        self.action_num = n_split_of_hold_rate+1
        self.action_space = gym.spaces.Discrete(self.action_num)
        self.input_num = None

        self.sampling_t = sampling_t
        self.reward_last_only = reward_last_only

        self.eval_score_size = eval_score_size
        self.n_code_select = n_code_select
        self.sampling_alpha = sampling_alpha

        self.dataset = None
        self.data = None

        self.selected_code = None
        self.is_train = None
        self.is_sample = None

        self.t = 0
        self.money = init_money
        self.start_money = init_money
        self.total_assets = init_money
        self.stock_code_list = []
        self.hold_num = 0

        self.mean_value = None
        self.eval_code_info = {}

        if dataset_dir1 == None or dataset_dir2 == None:
            print(f'dataset dir : {dataset_dir1}, {dataset_dir2}')
            exit()
        self.set_dataset(dataset_dir1, dataset_dir2)

stock_envのメンバー関数: データセットの登録

環境オフジェクトにデータセットを登録します.このときinput_numが決定されます.
input_numはデータセットの入力データ(X)の次数に銘柄を表す4次元ベクトル,環境の状態を表す4次元ベクトルを加えた38次元ベクトルとなります.

    def set_dataset(self, dataset_dir1, dataset_dir2):
        print('load dataset')
        self.dataset = {
            'original_value':pd.read_pickle(f'{dataset_dir1}/original_value.dfpkl'),
            'code_vec':pd.read_pickle(f'{dataset_dir2}/clusterling_dataset_dist.dfpkl')
        }
        _max = np.max(self.dataset['code_vec'].values)
        _min = np.min(self.dataset['code_vec'].values)
        self.dataset['code_vec'] = (self.dataset['code_vec'] - _min)/(_max - _min)

        self.input_time_length = 30
        for t in range(1, self.input_time_length+1):
            self.dataset[f'pred_-t{t}'] = pd.read_pickle(f'{dataset_dir1}/pred_codes_-t{t}.dfpkl')

        for key, df in self.dataset.items():
            if 'date' in df.columns:
                df = df.query('date > "2018-01-01 00:00+09:00"')
                self.dataset[key] = df.reset_index(drop=True)

        self.input_num = 30 + 4 + 4 #pred + vec + env_state
        self.stock_code_list = [code for code in self.dataset['original_value'].columns if code not in ['is_train_data', 'date']]

stock_envのメンバー関数: 環境の状態の初期化

環境の状態を初期化します.これは所持金,株数等を初期化することです.
また,環境の最初の状態をreturnします.

  • サンプリングモードか否かを指定できるようにしています
  • 学習データを利用するか否かを指定できるようにしています
  • 銘柄を指定できるようにしています
  • 初期金・初期保有数は,サンプリングモード時はランダムに決定されます
  • サンプリングの重みは時間軸xに対して$e^{-\alpha x}+\alpha$に従います
    def reset(self, is_sample=True, is_train=True, code=None):
        if code == None:
            code = np.random.choice(self.stock_code_list)
        self.selected_code = code
        ### select code###
        other = pd.DataFrame({
            'original_value':self.dataset['original_value'][code],
            'next_original_value':self.dataset['original_value'][code].shift(-1).reset_index(drop=True),
            'date':self.dataset['original_value']['date']
        })
        X = pd.DataFrame({
            f'pred_-t{t}':self.dataset[f'pred_-t{t}'][code]
                for t in range(1, self.input_time_length+1)
        })
        for n in range(len(self.dataset['code_vec'])):
            X[f'code_vec_{n}'] = self.dataset['code_vec'].at[f'class {n}', str(code)]

        ### mask train or test###
        if is_train:
            mask = self.dataset['original_value']['is_train_data']
            other = other[mask].reset_index(drop=True)
            X = X[mask].reset_index(drop=True)

        self.data = {
            'X':X.reset_index(drop=True),
            'other':other.reset_index(drop=True)
        }
        self.is_train = is_train

        ### front na delete
        mask = other['original_value'].isna() == False
        self.data['X'] = self.data['X'][mask].reset_index(drop=True)
        self.data['other'] = self.data['other'][mask].reset_index(drop=True)
        ### sampling dataset###
        if is_sample:
            df = pd.DataFrame({
                'date':self.data['other']['date']
            }).sort_values('date', ascending=False).reset_index(drop=True)

            df = df.reset_index().rename(columns={'index':'x'})
            df['p'] = np.exp(-1*df['x']*self.sampling_alpha) + 1.0*self.sampling_alpha
            df = df.sort_values('date').reset_index(drop=True).reset_index()
            indexes = df['index'].values[:len(self.data['X'])-self.sampling_t-1]
            p = df['p'].values[:len(self.data['X'])-self.sampling_t-1]
            p = p/np.sum(p)

            start_i = np.random.choice(indexes, p=p)
            end_i = start_i + self.sampling_t
            self.data = {
                'X':self.data['X'][start_i:end_i].reset_index(drop=True),
                'other':self.data['other'][start_i:end_i].reset_index(drop=True)
            }
        self.is_sample = is_sample

        self.t = 0
        state = self.data['X'].iloc[self.t, :].values
        if is_sample:
            self.money = self.init_money * np.random.uniform(0.5, 2.0)
            value = self.data['other']['original_value'][self.t].astype(np.float32)

            hold_rate = np.random.randint(0, self.n_split_of_hold_rate+1)/self.n_split_of_hold_rate
            self.hold_num = int((hold_rate*self.money)//value)
            self.money = self.money - (self.hold_num*value)
            self.total_assets = self.money + (self.hold_num * value)
            self.start_money = self.total_assets
            self.mean_value = np.mean(self.data['other']['original_value'])
            state_add = [
                hold_rate,
                (self.money - self.init_money)/self.init_money,
                (self.total_assets - self.init_money)/self.init_money,
                0
            ]
        else:
            self.money = self.init_money
            value = self.data['other']['original_value'][self.t].astype(np.float32)
            #hold_rate = 0.5
            hold_rate = 0.0
            self.hold_num = int((hold_rate*self.money)//value)
            self.money = self.money - (self.hold_num*value) - self.trade_cost_func(self.hold_num*value)
            self.total_assets = self.money + (self.hold_num * value)
            self.hold_num = 0
            self.total_assets = self.init_money
            self.start_money = self.total_assets
            self.mean_value = np.mean(self.data['other']['original_value'])
            state_add = [hold_rate, 0, 0, 0]
        state = np.append(state, state_add).astype(np.float32)
        state = torch.from_numpy(state)

        return state

stock_envのメンバー関数: 環境の状態を進める

環境の状態を進める関数です.アクションを引数として受け取り,次のステップで計算します.

  1. 保有比率を決定
  2. 現保有比率と新保有比率の差から株の購入数を決定
  3. 所持金などの再計算
  4. 総資産などの再計算
  5. 終了条件を満たすか
  6. 次の状態に進める
  7. 次の状態を計算
  8. 報酬の計算

また学習中と評価モードの両方で利用するために,sub_window_teval_code_modeを指定できます.

  • ‘sub_window_t’ : 評価モード中のステップ数.
  • ‘eval_code_mode’ : 学習中にTrue.その銘柄のスコアを記録.上位スコア銘柄のみを評価モードで利用する.
    def step(self, action, sub_window_t=None, eval_code_mode=True):
        value = self.data['other']['original_value'][self.t].astype(np.float32)
        next_value = self.data['other']['next_original_value'][self.t].astype(np.float32)
        code = self.selected_code
        date = self.data['other']['date'][self.t]
        info = {'value':value, 'code':code, 'date':date}

        hold_rate = action/self.n_split_of_hold_rate
        new_hold_num = int((hold_rate*self.total_assets)//value)
        if self.hold_num > new_hold_num:
            sell_num = self.hold_num - new_hold_num
            self.money += sell_num * value
            self.money -= self.trade_cost_func(sell_num * value)
        if self.hold_num < new_hold_num:
            buy_num = new_hold_num - self.hold_num
            self.money -= buy_num * value
            self.money -= self.trade_cost_func(buy_num * value)

        self.hold_num = new_hold_num
        self.total_assets = self.money + (self.hold_num * value)

        self.t += 1
        if sub_window_t == None:
            sub_window_t = self.t

        done = sub_window_t == self.sampling_t-1
        if done:
            self.money += value*self.hold_num
            self.money -= self.trade_cost_func(value*self.hold_num)
            self.hold_num = 0
            if eval_code_mode:
                if code not in self.eval_code_info:
                    self.eval_code_info[code] = []

                score = (self.total_assets-self.start_money)/self.start_money
                self.eval_code_info[code].append(score)
                if len(self.eval_code_info[code]) > self.eval_score_size:
                    del self.eval_code_info[code][0]
        state = self.data['X'].iloc[self.t, :].values
        state_add = [
            hold_rate,
            (self.money - self.start_money)/self.start_money,
            (self.total_assets - self.start_money)/self.start_money,
            sub_window_t/(self.sampling_t-1)
        ]
        state = np.append(state, state_add).astype(np.float32)
        state = torch.from_numpy(state)

        if self.reward_last_only:
            if done:
                reward = (self.total_assets - self.start_money)/self.start_money
            else:
                reward = 0
        else:
            next_total_assets = self.money + self.hold_num * next_value
            reward = (next_total_assets - self.start_money)/self.start_money
        if np.isfinite(reward) == False:
            reward = 0
        return state, reward, done, info

stock_envのメンバー関数: 評価モード

データセットを評価する関数.

  1. 学習中のスコアが平均th以上の銘柄のみを評価する.また,最低5回は学習した銘柄のみを利用する.
  2. エージェントを評価モードにして,任意の銘柄について評価する.評価は10回サンプリングした時の最低利益率
  3. 評価値が高いn個の銘柄を最終的に評価する

n個の銘柄についての総資産の推移をDataFrameとして返す.

    def eval_dataset(self, agent, th):
        use_code = []
        for code, scores in self.eval_code_info.items():
            mean_score = np.mean(scores)
            if mean_score > th and len(scores) > 5:
                use_code.append(code)
        if len(use_code) == 0:
            return pd.DataFrame()

        with agent.eval_mode():
            print(f'can use codes num : {len(use_code)}')
            print('select codes')
            n_trial = 10
            code_score = []
            for code in tqdm.tqdm(use_code):
                min_score = 100000
                sum_score = 0
                for _ in range(n_trial):
                    state = self.reset(is_sample=True, is_train=True, code=code)
                    for _ in range(len(self.data['X'])-1):
                        action = agent.act(state)
                        state, reward, done, info = self.step(action, eval_code_mode=False)
                    score = (self.total_assets-self.start_money)/self.start_money
                    min_score = min(min_score, score)
                    sum_score += score

                code_score.append([code, min_score])

            code_score.sort(key=lambda obj:obj[1], reverse=True)
            final_use_code = [code for code, score in code_score[:self.n_code_select]]

            if len(final_use_code) == 0:
                return pd.DataFrame()
            print(f'eval use codes num : {len(final_use_code)}')

            date_x = self.dataset['original_value']['date']
            is_train_data_x = self.dataset['original_value']['is_train_data']
            total_assets_df = pd.DataFrame({'is_train_data':is_train_data_x.values}, index=date_x)
            #win_rate_df = pd.DataFrame({'is_train_data':is_train_data_x.values}, index=data_x)
            print('evaluating dataset')
            for code in tqdm.tqdm(final_use_code):
                state = self.reset(is_sample=False, is_train=False, code=code)
                sub_window_t = 0
                line = pd.Series(index=date_x)
                line.iat[0] = self.total_assets
                for _ in range(len(self.data['X'])-1):
                    action = agent.act(state)
                    state, reward, done, info = self.step(action, sub_window_t, eval_code_mode=False)
                    sub_window_t += 1
                    line[info['date']] = self.total_assets
                    #win_rate_line[info['date']] = (self.total_assets - self.init_money)/self.init_money
                    if done:
                        sub_window_t = 0
                total_assets_df[code] = line
            total_assets_df = total_assets_df.fillna(method='ffill')
        return total_assets_df
タイトルとURLをコピーしました