このプログラムの目的
stock_env.pyの目的は,作成したデータセットとネットワークのやり取りを,強化学習がやり取りしやすい形の「環境」で表現することです.
このプログラムはNo.02
章といくつかの相違点があります.
- 関数を減らし,環境の呼び出しを複雑にしないようにした
- 学習中は一定期間をサンプリングして利用
- 2023年に近いほどサンプリングされやすい
- アクション(行動)は総資産の何%を銘柄にするか(何%を現金で持つか)
- 何個の銘柄を利用するか
work_share
├06_sampling_dqn_learning
├Dockerfile
├docker-compose.yml
└src
├draw_graph
| └draw_tools.py
├enviroment
| └stock_env.py (これを作成)
├reinforcement_learning
| └dqn.py
└experiment01.py
使用ライブラリ
import pandas as pd
import numpy as np
import torch
import tqdm
import gym
コスト関数
コスト関数は楽天の手数料と,コストなしを実装しました.
def default_cost_func(value):
#reference to https://www.rakuten-sec.co.jp/web/commission/
if value < 50_000:
return 55
if 50_000 <= value < 100_000:
return 99
if 100_000 <= value < 200_000:
return 115
if 200_000 <= value < 500_000:
return 275
if 500_000 <= value < 1_000_000:
return 535
if 1_000_000 <= value < 1_500_000:
return 640
if 1_500_000 <= value < 30_000_000:
return 1013
if 30_000_000 <= value:
return 1070
def zero_cost_func(value):
return 0
環境クラス
強化学習で肝となる環境を実装します.
class stock_env:
def __init__(self, dataset_dir1=None, dataset_dir2=None, init_money=10000000,
trade_cost_func='default', n_split_of_hold_rate=100,
sampling_t=30, reward_last_only=True,
eval_score_size=10, n_code_select=10, sampling_alpha=0.001):
self.init_money = init_money
if trade_cost_func == 'default':
self.trade_cost_func = default_cost_func
if trade_cost_func == 'zero':
self.trade_cost_func = zero_cost_func
self.np_cost_func = np.frompyfunc(self.trade_cost_func, 1, 1)
self.n_split_of_hold_rate = n_split_of_hold_rate
self.action_num = n_split_of_hold_rate+1
self.action_space = gym.spaces.Discrete(self.action_num)
self.input_num = None
self.sampling_t = sampling_t
self.reward_last_only = reward_last_only
self.eval_score_size = eval_score_size
self.n_code_select = n_code_select
self.sampling_alpha = sampling_alpha
self.dataset = None
self.data = None
self.selected_code = None
self.is_train = None
self.is_sample = None
self.t = 0
self.money = init_money
self.start_money = init_money
self.total_assets = init_money
self.stock_code_list = []
self.hold_num = 0
self.mean_value = None
self.eval_code_info = {}
if dataset_dir1 == None or dataset_dir2 == None:
print(f'dataset dir : {dataset_dir1}, {dataset_dir2}')
exit()
self.set_dataset(dataset_dir1, dataset_dir2)
stock_envのメンバー関数: データセットの登録
環境オフジェクトにデータセットを登録します.このときinput_num
が決定されます.input_num
はデータセットの入力データ(X
)の次数に銘柄を表す4次元ベクトル,環境の状態を表す4次元ベクトルを加えた38次元ベクトルとなります.
def set_dataset(self, dataset_dir1, dataset_dir2):
print('load dataset')
self.dataset = {
'original_value':pd.read_pickle(f'{dataset_dir1}/original_value.dfpkl'),
'code_vec':pd.read_pickle(f'{dataset_dir2}/clusterling_dataset_dist.dfpkl')
}
_max = np.max(self.dataset['code_vec'].values)
_min = np.min(self.dataset['code_vec'].values)
self.dataset['code_vec'] = (self.dataset['code_vec'] - _min)/(_max - _min)
self.input_time_length = 30
for t in range(1, self.input_time_length+1):
self.dataset[f'pred_-t{t}'] = pd.read_pickle(f'{dataset_dir1}/pred_codes_-t{t}.dfpkl')
for key, df in self.dataset.items():
if 'date' in df.columns:
df = df.query('date > "2018-01-01 00:00+09:00"')
self.dataset[key] = df.reset_index(drop=True)
self.input_num = 30 + 4 + 4 #pred + vec + env_state
self.stock_code_list = [code for code in self.dataset['original_value'].columns if code not in ['is_train_data', 'date']]
stock_envのメンバー関数: 環境の状態の初期化
環境の状態を初期化します.これは所持金,株数等を初期化することです.
また,環境の最初の状態をreturnします.
- サンプリングモードか否かを指定できるようにしています
- 学習データを利用するか否かを指定できるようにしています
- 銘柄を指定できるようにしています
- 初期金・初期保有数は,サンプリングモード時はランダムに決定されます
- サンプリングの重みは時間軸xに対して$e^{-\alpha x}+\alpha$に従います
def reset(self, is_sample=True, is_train=True, code=None):
if code == None:
code = np.random.choice(self.stock_code_list)
self.selected_code = code
### select code###
other = pd.DataFrame({
'original_value':self.dataset['original_value'][code],
'next_original_value':self.dataset['original_value'][code].shift(-1).reset_index(drop=True),
'date':self.dataset['original_value']['date']
})
X = pd.DataFrame({
f'pred_-t{t}':self.dataset[f'pred_-t{t}'][code]
for t in range(1, self.input_time_length+1)
})
for n in range(len(self.dataset['code_vec'])):
X[f'code_vec_{n}'] = self.dataset['code_vec'].at[f'class {n}', str(code)]
### mask train or test###
if is_train:
mask = self.dataset['original_value']['is_train_data']
other = other[mask].reset_index(drop=True)
X = X[mask].reset_index(drop=True)
self.data = {
'X':X.reset_index(drop=True),
'other':other.reset_index(drop=True)
}
self.is_train = is_train
### front na delete
mask = other['original_value'].isna() == False
self.data['X'] = self.data['X'][mask].reset_index(drop=True)
self.data['other'] = self.data['other'][mask].reset_index(drop=True)
### sampling dataset###
if is_sample:
df = pd.DataFrame({
'date':self.data['other']['date']
}).sort_values('date', ascending=False).reset_index(drop=True)
df = df.reset_index().rename(columns={'index':'x'})
df['p'] = np.exp(-1*df['x']*self.sampling_alpha) + 1.0*self.sampling_alpha
df = df.sort_values('date').reset_index(drop=True).reset_index()
indexes = df['index'].values[:len(self.data['X'])-self.sampling_t-1]
p = df['p'].values[:len(self.data['X'])-self.sampling_t-1]
p = p/np.sum(p)
start_i = np.random.choice(indexes, p=p)
end_i = start_i + self.sampling_t
self.data = {
'X':self.data['X'][start_i:end_i].reset_index(drop=True),
'other':self.data['other'][start_i:end_i].reset_index(drop=True)
}
self.is_sample = is_sample
self.t = 0
state = self.data['X'].iloc[self.t, :].values
if is_sample:
self.money = self.init_money * np.random.uniform(0.5, 2.0)
value = self.data['other']['original_value'][self.t].astype(np.float32)
hold_rate = np.random.randint(0, self.n_split_of_hold_rate+1)/self.n_split_of_hold_rate
self.hold_num = int((hold_rate*self.money)//value)
self.money = self.money - (self.hold_num*value)
self.total_assets = self.money + (self.hold_num * value)
self.start_money = self.total_assets
self.mean_value = np.mean(self.data['other']['original_value'])
state_add = [
hold_rate,
(self.money - self.init_money)/self.init_money,
(self.total_assets - self.init_money)/self.init_money,
0
]
else:
self.money = self.init_money
value = self.data['other']['original_value'][self.t].astype(np.float32)
#hold_rate = 0.5
hold_rate = 0.0
self.hold_num = int((hold_rate*self.money)//value)
self.money = self.money - (self.hold_num*value) - self.trade_cost_func(self.hold_num*value)
self.total_assets = self.money + (self.hold_num * value)
self.hold_num = 0
self.total_assets = self.init_money
self.start_money = self.total_assets
self.mean_value = np.mean(self.data['other']['original_value'])
state_add = [hold_rate, 0, 0, 0]
state = np.append(state, state_add).astype(np.float32)
state = torch.from_numpy(state)
return state
stock_envのメンバー関数: 環境の状態を進める
環境の状態を進める関数です.アクションを引数として受け取り,次のステップで計算します.
- 保有比率を決定
- 現保有比率と新保有比率の差から株の購入数を決定
- 所持金などの再計算
- 総資産などの再計算
- 終了条件を満たすか
- 次の状態に進める
- 次の状態を計算
- 報酬の計算
また学習中と評価モードの両方で利用するために,sub_window_t
とeval_code_mode
を指定できます.
- ‘sub_window_t’ : 評価モード中のステップ数.
- ‘eval_code_mode’ : 学習中にTrue.その銘柄のスコアを記録.上位スコア銘柄のみを評価モードで利用する.
def step(self, action, sub_window_t=None, eval_code_mode=True):
value = self.data['other']['original_value'][self.t].astype(np.float32)
next_value = self.data['other']['next_original_value'][self.t].astype(np.float32)
code = self.selected_code
date = self.data['other']['date'][self.t]
info = {'value':value, 'code':code, 'date':date}
hold_rate = action/self.n_split_of_hold_rate
new_hold_num = int((hold_rate*self.total_assets)//value)
if self.hold_num > new_hold_num:
sell_num = self.hold_num - new_hold_num
self.money += sell_num * value
self.money -= self.trade_cost_func(sell_num * value)
if self.hold_num < new_hold_num:
buy_num = new_hold_num - self.hold_num
self.money -= buy_num * value
self.money -= self.trade_cost_func(buy_num * value)
self.hold_num = new_hold_num
self.total_assets = self.money + (self.hold_num * value)
self.t += 1
if sub_window_t == None:
sub_window_t = self.t
done = sub_window_t == self.sampling_t-1
if done:
self.money += value*self.hold_num
self.money -= self.trade_cost_func(value*self.hold_num)
self.hold_num = 0
if eval_code_mode:
if code not in self.eval_code_info:
self.eval_code_info[code] = []
score = (self.total_assets-self.start_money)/self.start_money
self.eval_code_info[code].append(score)
if len(self.eval_code_info[code]) > self.eval_score_size:
del self.eval_code_info[code][0]
state = self.data['X'].iloc[self.t, :].values
state_add = [
hold_rate,
(self.money - self.start_money)/self.start_money,
(self.total_assets - self.start_money)/self.start_money,
sub_window_t/(self.sampling_t-1)
]
state = np.append(state, state_add).astype(np.float32)
state = torch.from_numpy(state)
if self.reward_last_only:
if done:
reward = (self.total_assets - self.start_money)/self.start_money
else:
reward = 0
else:
next_total_assets = self.money + self.hold_num * next_value
reward = (next_total_assets - self.start_money)/self.start_money
if np.isfinite(reward) == False:
reward = 0
return state, reward, done, info
stock_envのメンバー関数: 評価モード
データセットを評価する関数.
- 学習中のスコアが平均
th
以上の銘柄のみを評価する.また,最低5回は学習した銘柄のみを利用する. - エージェントを評価モードにして,任意の銘柄について評価する.評価は10回サンプリングした時の最低利益率
- 評価値が高いn個の銘柄を最終的に評価する
n個の銘柄についての総資産の推移をDataFrameとして返す.
def eval_dataset(self, agent, th):
use_code = []
for code, scores in self.eval_code_info.items():
mean_score = np.mean(scores)
if mean_score > th and len(scores) > 5:
use_code.append(code)
if len(use_code) == 0:
return pd.DataFrame()
with agent.eval_mode():
print(f'can use codes num : {len(use_code)}')
print('select codes')
n_trial = 10
code_score = []
for code in tqdm.tqdm(use_code):
min_score = 100000
sum_score = 0
for _ in range(n_trial):
state = self.reset(is_sample=True, is_train=True, code=code)
for _ in range(len(self.data['X'])-1):
action = agent.act(state)
state, reward, done, info = self.step(action, eval_code_mode=False)
score = (self.total_assets-self.start_money)/self.start_money
min_score = min(min_score, score)
sum_score += score
code_score.append([code, min_score])
code_score.sort(key=lambda obj:obj[1], reverse=True)
final_use_code = [code for code, score in code_score[:self.n_code_select]]
if len(final_use_code) == 0:
return pd.DataFrame()
print(f'eval use codes num : {len(final_use_code)}')
date_x = self.dataset['original_value']['date']
is_train_data_x = self.dataset['original_value']['is_train_data']
total_assets_df = pd.DataFrame({'is_train_data':is_train_data_x.values}, index=date_x)
#win_rate_df = pd.DataFrame({'is_train_data':is_train_data_x.values}, index=data_x)
print('evaluating dataset')
for code in tqdm.tqdm(final_use_code):
state = self.reset(is_sample=False, is_train=False, code=code)
sub_window_t = 0
line = pd.Series(index=date_x)
line.iat[0] = self.total_assets
for _ in range(len(self.data['X'])-1):
action = agent.act(state)
state, reward, done, info = self.step(action, sub_window_t, eval_code_mode=False)
sub_window_t += 1
line[info['date']] = self.total_assets
#win_rate_line[info['date']] = (self.total_assets - self.init_money)/self.init_money
if done:
sub_window_t = 0
total_assets_df[code] = line
total_assets_df = total_assets_df.fillna(method='ffill')
return total_assets_df