このプログラムの目的
stock_env.pyの目的は,作成したデータセットとネットワークのやり取りを,強化学習がやり取りしやすい形の「環境」で表現することです.
また,一般的なgymの環境(env)に少し合わせて書いてありますが,銘柄コードの変更・評価モードなどの一般的ではないステップを踏むため,gymの環境(env)とは異なる動作を目的としています.
work_share
├02_get_stock_price
└03_dqn_learning
├Dockerfile
├docker-compose.yml
└src
├draw_graph
| └draw_tools.py
├enviroment
| └stock_env.py (これを作成)
├reinforcement_learning
| └dqn.py
├result(自動生成)
└experiment01.py
使用ライブラリ
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import StandardScaler
import gym
データの分割
生成したデータセットを学習用とテスト用に分割する関数です.データセット自体が学習用かテスト用かを判別するフラグ系列(eval
)を保持しているので,基本的にはそれを利用します.
一応,train_rate
を与えて自由に分割できるように実装していますが,今回はしようしません.
def data_split(data, train_rate=0.7, use_eval_mask=False):
train_data = {}
test_data = {}
if use_eval_mask:
train_mask = data['other']['eval'] == 'train'
test_mask = data['other']['eval'] == 'test'
for key, df in data.items():
train_data[key] = df[train_mask].reset_index(drop=True)
test_data[key] = df[test_mask].reset_index(drop=True)
else:
train_num = int(len(data['X']) * train_rate)
for key, df in data.items():
train_data[key] = df.iloc[:train_num, :].reset_index(drop=True)
test_data[key] = df.iloc[train_num:, :].reset_index(drop=True)
return train_data, test_data
データの変換
用意したデータセットは入力用の系列と評価用の系列が混合しているので,それを分割します.
また,入力用のデータ(X
)と評価用のデータ(other
)の系列名を出力し,データが混合していないことを確認します.
def preprocess(df):
nan_mask = df['original_value'].isna() & df['next_original_value'].isna()
df = df.loc[nan_mask != True, :].reset_index(drop=True)
other = df[['date', 'code', 'original_value', 'next_original_value', 'eval']]
X = df.drop(other.columns, axis=1)
# check input columns.
print(f'use input columns : {list(X.columns)}')
# check other columns.
print(f'use info columns : {list(other.columns)}')
return {'X':X, 'other':other}
データの型変換
上で変換されたデータはpandasのデフォルト型(小数点を含む数値の場合はnp.float64)になっていますが,pytorchで扱う型はfloat32なので変換します.
さらに,評価用のデータ(other
)もpandasのDataFrameのままだと要素の指定が面倒なので(.at
で指定),numpyのデータと辞書に変換します.
def to_learning_data(data):
X = data['X'].values.astype(np.float32)
other = {col:data['other'][col].values for col in data['other'].columns}
return {'X':X, 'other':other}
コスト関数
株価を売買するにあたり,現実世界では手数料が発生します.多くの論文では株価の1%~0.1%程度に設定することが多いですが,日本の証券会社の手数料は%表記ではないことが多いので,それに合わせました.楽天の現物取引の手数料を実装しましたが,何を実装してもらっても構いません.
def default_cost_func(value):
#reference to https://www.rakuten-sec.co.jp/web/commission/
if value < 50_000:
return 55
if 50_000 <= value < 100_000:
return 99
if 100_000 <= value < 200_000:
return 115
if 200_000 <= value < 500_000:
return 275
if 500_000 <= value < 1_000_000:
return 535
if 1_000_000 <= value < 1_500_000:
return 640
if 1_500_000 <= value < 30_000_000:
return 1013
if 30_000_000 <= value:
return 1070
環境クラス
強化学習で肝となる環境を実装します.
self.action_space
だけgymのspaceによって実装しています.これは強化学習ライブラリpflrのε-greedy法を利用するときにgym.spaces.Discreteオブジェクトが必要となるからです.money
は現金を,total_assets
は所有総資産を,all_sold_money
は所有株をすべて売却した時の現金(total_assetsから手数料が引かれます)です.stock_holds
は手持ちの株の数を,stock_holds_value
は各株の現在の価格を保持します.original_code_list
は元の銘柄コードを,stock_code_list
は0~Nまでに再表現された銘柄コードの一覧です.max_data_length
は現在セットしたる銘柄データの系列長です.evaluate_codes
やeval_use_codes
は多すぎる銘柄を取捨選択するために利用します.これは主に次の項のdqn.pyで利用されます.
class stock_env:
def __init__(self, init_money=10000000,
trade_cost_func=None, purchase_max=10,
reward_last_only=True):
self.init_money = init_money
if trade_cost_func == None:
self.trade_cost_func = default_cost_func
else:
self.trade_cost_func = trade_cost_func
self.np_cost_func = np.frompyfunc(self.trade_cost_func, 1, 1)
self.reward_last_only = reward_last_only
self.train_data = None
self.test_data = None
self.scaler = None
self.action_num = purchase_max*2 + 1#[-k, ..., -1, 0, 1, ..., k]
self.action_space = gym.spaces.Discrete(self.action_num)
self.input_num = None
self.t = 0
self.money = init_money
self.total_assets = init_money
self.all_sold_money = init_money
self.stock_holds = None
self.original_stock_code_list = []
self.stock_code_list = []
self.stock_holds_value = None
self.max_data_length = None
self.evaluate_codes = {}
self.eval_use_codes = []
stock_envのメンバー関数: データセットの登録
環境オフジェクトにデータセットを登録します.このときinput_num
が決定されます.input_num
はデータセットの入力データ(X
)の次数に3つ足したものです.3つはそれぞれ環境がもつ状態(所有株数,所持金,総資産)です.
def set_df(self, df):
data = preprocess(df)
# new_code start 0, 1, 2, ...
new_code, code_to = pd.factorize(data['other']['code'], sort=True)
self.original_stock_code_list = code_to
data['other']['code'] = new_code
self.stock_code_list = np.unique(new_code)
self.evaluate_codes = {code:(0, 0) for code in self.stock_code_list}
self.train_data, self.test_data = data_split(data, use_eval_mask=True)
print(f'train size : {len(self.train_data["X"])}, test size : {len(self.test_data["X"])}')
train_df = self.train_data['X']
test_df = self.test_data['X']
scaler = StandardScaler()
train_df = pd.DataFrame(data=scaler.fit_transform(train_df), columns=train_df.columns)
test_df = pd.DataFrame(data=scaler.transform(test_df), columns=test_df.columns)
train_df = train_df.fillna(0)
test_df = test_df.fillna(0)
self.scaler = scaler
self.train_data['X'] = train_df.astype(np.float32)
self.test_data['X'] = test_df.astype(np.float32)
self.input_num = data['X'].shape[1] + 3
stock_envのメンバー関数: 銘柄の変更
銘柄の変更をする関数です.基本的にはcode=None
が引数で与えられ,ランダムな銘柄コードが選択されます.一応毎回dateでソートしていますが,おそらく必要ないです.
def resample(self, code=None):
if code == None:
code = np.random.choice(self.stock_code_list)
self.selected_code = code
code_mask = self.train_data['other']['code'] == code
date = self.train_data['other'][code_mask]['date'].reset_index(drop=True)
data = {}
for key, df in self.train_data.items():
data[key] = df[code_mask].reset_index(drop=True)
data[key]['sort_key'] = date.copy()
data[key] = data[key].sort_values('sort_key').reset_index(drop=True)
data[key] = data[key].drop('sort_key', axis=1).reset_index(drop=True)
self.data = to_learning_data(data)
self.max_data_length = len(self.data['X']) - 1
stock_envのメンバー関数: 総資産・手数料を引いた総資産の計算
総資産を計算します.この計算の高速化のために銘柄コードが0~Nとなるように再割り当てし,stock_holdsとstock_holds_valueをnumpy.arrayとしました.
def get_total_assets(self):
total_stock_value = np.sum(self.stock_holds * self.stock_holds_value)
total_assets = self.money + total_stock_value
return total_assets
def get_money_when_all_sell(self, total_assets):
values = self.stock_holds * self.stock_holds_value
money = total_assets - np.sum(self.np_cost_func(values[values>0]))
return money
stock_envのメンバー関数: 使用する銘柄コードの取捨選択
使用する銘柄コードを取捨選択します.なぜなら,すべての銘柄を利用して,取引のシミュレーションを行うと,とても時間がかかるためです.
利益が上がる銘柄のみを選択し,評価で利用するように設定します.
evaluate_codes
は利益率の試行平均を計算するために試行合計と試行数を保持しています.- borderlineは1.1が設定されていますが,実際の実験では1.01が設定されています.
def set_eval_use_codes(self, borderline=1.1):
use_codes = []
for key, (sum_rate, count) in self.evaluate_codes.items():
if count != 0 and (sum_rate / count) > borderline:
use_codes.append(key)
self.eval_use_codes = use_codes
stock_envのメンバー関数: 評価モード
複数の銘柄を利用したシミュレーションを行うために,データをセットする関数です.
def eval_mode(self, eval_type):
if eval_type == 'train':
use_data = self.train_data
if eval_type == 'test':
use_data = self.test_data
mask = use_data['other']['code'].isin(self.eval_use_codes)
data = {}
for key, df in use_data.items():
data[key] = df[mask].reset_index(drop=True)
self.data = to_learning_data(data)
self.max_data_length = len(self.data['X']) - 1
stock_envのメンバー関数: 環境の状態の初期化
環境の状態を初期化します.これは所持金,株数等を初期化することです.
また,環境の最初の状態をreturnします.
def reset(self):
self.t = 0
self.money = self.init_money
self.stock_holds = np.zeros(len(self.stock_code_list), dtype=np.int32)
self.stock_holds_value = np.zeros(len(self.stock_code_list), dtype=np.float64)
X = self.data['X'][self.t]
scaled_hold_num = 0
scaled_money = 0
scaled_total_assets = 0
X = np.append(X, [scaled_hold_num, scaled_money, scaled_total_assets]).astype(np.float32)
state = torch.from_numpy(X)
return state
stock_envのメンバー関数: 環境の状態を進める
環境の状態を進める関数です.アクションを引数として受け取り,
- 株の購入数を決定
- 所持金などの再計算
- 総資産などの再計算
- 終了条件を満たすか
- 次の状態に進める
- 次の状態を計算
- 報酬の計算
を行います.報酬は総資産です(所持金を報酬とするコードはコメントアウトしてあります).
また,報酬を最後に与えるか否かをreward_last_only
で決定しています.
def step(self, action):
value = self.data['other']['original_value'][self.t].astype(np.float32)
next_value = self.data['other']['next_original_value'][self.t].astype(np.float32)
code = self.data['other']['code'][self.t]
date = self.data['other']['date'][self.t]
info = {'value':value, 'code':code, 'date':date}
purchase = action - (self.action_num - 1)/2
purchase_num = abs(purchase)
#if purchase == 0: # hold
if purchase < 0 and (self.money - value*purchase_num) > 0: # buy
self.money -= value*purchase_num
self.stock_holds[code] += purchase_num
self.money -= self.trade_cost_func(value*purchase_num)
if purchase > 0 and self.stock_holds[code] > 0: # sell
self.money += value*purchase_num
self.stock_holds[code] -= purchase_num
self.money -= self.trade_cost_func(value*purchase_num)
if self.stock_holds[code] > 0:
self.stock_holds_value[code] = value
self.total_assets = self.get_total_assets()
self.all_sold_money = self.get_money_when_all_sell(self.total_assets)
done = self.t == self.max_data_length -2
self.t += 1
X = self.data['X'][self.t]
next_code = self.data['other']['code'][self.t]
scaled_hold_num = self.stock_holds[next_code] / 10
scaled_money = (self.money - self.init_money) / self.init_money
scaled_total_assets = (self.total_assets - self.init_money) / self.init_money
X = np.append(X, [scaled_hold_num, scaled_money, scaled_total_assets]).astype(np.float32)
state = torch.from_numpy(X)
if self.reward_last_only:
if done:
reward = (self.total_assets - self.init_money)/self.init_money
#reward = (self.money - self.init_money)/self.init_money
else:
reward = 0
else:
reward = (self.total_assets - self.init_money)/self.init_money
#reward = (self.money - self.init_money)/self.init_money
return state, reward, done, info