Files
PSO/pso/optimizer.py
jung-geun 737baf6681 chore: 업데이트된 패키지 요구사항 반영
requirements.txt 파일에서 패키지 요구사항을 업데이트했습니다.
2024-05-16 02:11:31 +09:00

760 lines
27 KiB
Python

import atexit
import gc
import json
import os
import socket
import subprocess
import sys
from datetime import datetime
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorboard.plugins.hparams import api as hp
from tensorflow import keras
from tqdm.auto import tqdm
from typing import Any, List
from .particle import Particle
def find_free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost", 0))
port = sock.getsockname()[1]
sock.close()
return port
class Optimizer:
"""
particle swarm optimization
PSO 실행을 위한 클래스
"""
def __init__(
self,
model: keras.Model,
loss: Any,
**kwargs,
):
"""
particle swarm optimization
Args:
model (keras.models): 모델 구조 - keras.models.model_from_json 을 이용하여 생성
loss (str): 손실함수 - keras.losses 에서 제공하는 손실함수 사용
n_particles (int): 파티클 개수
c0 (float): local rate - 지역 최적값 관성 수치
c1 (float): global rate - 전역 최적값 관성 수치
w_min (float): 최소 관성 수치
w_max (float): 최대 관성 수치
negative_swarm (float): 최적해와 반대로 이동할 파티클 비율 - 0 ~ 1 사이의 값
mutation_swarm (float): 돌연변이가 일어날 확률 - 0 ~ 1 사이의 값
np_seed (int | None): numpy seed. Defaults to None.
tf_seed (int | None): tensorflow seed. Defaults to None.
random_state (tuple): numpy random state. Defaults to None.
convergence_reset (bool): early stopping 사용 여부. Defaults to False.
convergence_reset_patience (int): early stopping 사용시 얼마나 기다릴지. Defaults to 10.
convergence_reset_min_delta (float): early stopping 사용시 얼마나 기다릴지. Defaults to 0.0001.
convergence_reset_monitor (str): early stopping 사용시 어떤 값을 기준으로 할지. Defaults to "loss". - "loss" or "acc" or "mse"
"""
try:
n_particles = kwargs.get("n_particles", 10)
c0 = kwargs.get("c0", 0.5)
c1 = kwargs.get("c1", 0.3)
w_min = kwargs.get("w_min", 0.1)
w_max = kwargs.get("w_max", 0.9)
negative_swarm = kwargs.get("negative_swarm", 0)
mutation_swarm = kwargs.get("mutation_swarm", 0)
np_seed = kwargs.get("np_seed", None)
tf_seed = kwargs.get("tf_seed", None)
random_state = kwargs.get("random_state", None)
convergence_reset = kwargs.get("convergence_reset", False)
convergence_reset_patience = kwargs.get("convergence_reset_patience", 10)
convergence_reset_min_delta = kwargs.get(
"convergence_reset_min_delta", 0.0001
)
convergence_reset_monitor = kwargs.get("convergence_reset_monitor", "loss")
if model is None:
raise ValueError("model is None")
elif model is not None and not isinstance(model, keras.models.Model):
raise ValueError("model is not keras.models.Model")
elif loss is None:
raise ValueError("loss is None")
elif n_particles is None:
raise ValueError("n_particles is None")
elif n_particles < 1:
raise ValueError("n_particles < 1")
elif c0 < 0 or c1 < 0:
raise ValueError("c0 or c1 < 0")
elif np_seed is not None:
np.random.seed(np_seed)
elif tf_seed is not None:
tf.random.set_seed(tf_seed)
elif random_state is not None:
np.random.set_state(random_state)
self.random_state = np.random.get_state()
model.compile(loss=loss, optimizer="adam", metrics=["accuracy", "mse"])
self.model = model # 모델 구조
self.set_shape(model.get_weights())
self.loss = loss # 손실함수
self.n_particles = n_particles # 파티클 개수
self.particles = [None] * n_particles # 파티클 리스트
self.c0 = c0 # local rate - 지역 최적값 관성 수치
self.c1 = c1 # global rate - 전역 최적값 관성 수치
self.w_min = w_min # 최소 관성 수치
self.w_max = w_max # 최대 관성 수치
self.negative_swarm = (
negative_swarm # 최적해와 반대로 이동할 파티클 비율 - 0 ~ 1 사이의 값
)
self.mutation_swarm = (
mutation_swarm # 관성을 추가로 사용할 파티클 비율 - 0 ~ 1 사이의 값
)
self.avg_score = 0 # 평균 점수
self.renewal = "acc"
self.dispersion = False
self.day = datetime.now().strftime("%Y%m%d-%H%M%S")
self.empirical_balance = False
negative_count = 0
self.train_summary_writer = [None] * self.n_particles
print(f"start running time : {self.day}")
for i in tqdm(range(self.n_particles), desc="Initializing Particles"):
self.particles[i] = Particle(
model,
self.loss,
negative=(
True if i < self.negative_swarm * self.n_particles else False
),
mutation=self.mutation_swarm,
converge_reset=convergence_reset,
converge_reset_patience=convergence_reset_patience,
converge_reset_monitor=convergence_reset_monitor,
converge_reset_min_delta=convergence_reset_min_delta,
)
if i < self.negative_swarm * self.n_particles:
negative_count += 1
gc.collect()
tf.keras.backend.reset_uids()
tf.keras.backend.clear_session()
print(f"negative swarm : {negative_count} / {n_particles}")
print(f"mutation swarm : {mutation_swarm * 100}%")
gc.collect()
tf.keras.backend.reset_uids()
tf.keras.backend.clear_session()
except KeyboardInterrupt:
print("Ctrl + C : Stop Training")
sys.exit(1)
except MemoryError:
print("Memory Error : Stop Training")
sys.exit(12)
except ValueError:
print("Value Error : Stop Training")
sys.exit(11)
except Exception as e:
print(e)
sys.exit(10)
def __del__(self):
del self.model
del self.loss
del self.n_particles
del self.particles
del self.c0
del self.c1
del self.w_min
del self.w_max
del self.negative_swarm
del self.avg_score
gc.collect()
tf.keras.backend.reset_uids()
tf.keras.backend.clear_session()
def set_shape(self, weights: list):
"""
가중치의 shape을 설정
Args:
weights (list): keras model의 가중치
"""
self.shape = [layer.shape for layer in weights]
def get_shape(self):
return self.shape
def _encode(self, weights: list):
"""
가중치를 1차원으로 풀어서 반환
Args:
weights (list) : keras model의 가중치
Returns:
(numpy array) : 가중치 - 1차원으로 풀어서 반환
(list) : 가중치의 원본 shape
(list) : 가중치의 원본 shape의 길이
"""
w_gpu = np.array([])
for layer in weights:
w_tmp = layer.reshape(-1)
w_gpu = np.append(w_gpu, w_tmp)
return w_gpu
def _decode(self, weight: np.ndarray):
"""
_encode 로 인코딩된 가중치를 원본 shape으로 복원
파라미터는 encode의 리턴값을 그대로 사용을 권장
Args:
weight (numpy array): 가중치 - 1차원으로 풀어서 반환
shape (list): 가중치의 원본 shape
length (list): 가중치의 원본 shape의 길이
Returns:
(list) : 가중치 원본 shape으로 복원
"""
weights = []
start = 0
for i in range(len(self.shape)):
end = start + np.prod(self.shape[i])
w_ = weight[start:end]
w_ = np.reshape(w_, self.shape[i])
weights.append(w_)
start = end
del start, end, w_
del weight
return weights
def _f(self, x, y, weights):
"""
EBPSO의 목적함수 (예상)
Args:
x (list): 입력 데이터
y (list): 출력 데이터
weights (list): 가중치
Returns:
(float): 목적 함수 값
"""
self.model.set_weights(weights)
score = self.model.evaluate(x, y, verbose=0) # type: ignore
if self.renewal == "loss":
score_ = score[0]
elif self.renewal == "acc":
score_ = score[1]
elif self.renewal == "mse":
score_ = score[2]
if score_ > 0:
return 1 / (1 + score_)
else:
return 1 + np.abs(score_)
def __weight_range(self):
"""
가중치의 범위를 반환
Returns:
(float): 가중치의 최소값
(float): 가중치의 최대값
"""
w_ = self._encode(self.model.get_weights())
# w_, w_s, w_l = self._encode(Particle.g_best_weights)
weight_min = np.min(w_)
weight_max = np.max(w_)
del w_
return weight_min, weight_max
class batch_generator:
def __init__(self, x, y, batch_size: int = 0):
self.index = 0
self.x = x
self.y = y
self.set_batch_size(batch_size)
def next(self):
self.index += 1
if self.index > self.max_index:
self.index = 0
self.dataset = self.__get_batch_slice(self.batch_size)
return self.dataset[self.index - 1][0], self.dataset[self.index - 1][1]
def get_length(self):
return self.get_max_index()
def get_max_index(self):
return self.max_index
def get_index(self):
return self.index
def set_index(self, index):
self.index = index
def get_batch_size(self):
return self.batch_size
def set_batch_size(self, batch_size: int = 0):
if batch_size == -1 or batch_size > len(self.x):
batch_size = len(self.x)
elif batch_size == 0:
batch_size = len(self.x) // 10
self.batch_size = batch_size
print(f"batch size : {self.batch_size}")
self.dataset = self.__get_batch_slice(self.batch_size)
self.max_index = len(self.dataset)
def __get_batch_slice(self, batch_size):
return list(
tf.data.Dataset.from_tensor_slices((self.x, self.y))
.shuffle(len(self.x))
.batch(batch_size)
)
def get_dataset(self):
return self.dataset
def fit(
self,
x,
y,
**kwargs,
):
"""
# Args:
x : numpy array,
y : numpy array,
epochs : int,
log : int - 0 : log 기록 안함, 1 : csv, 2 : tensorboard,
save_info : bool - 종료시 학습 정보 저장 여부 default : False,
save_path : str - ex) "./result",
renewal : str ex) "acc" or "loss" or "mse",
check_point : int - 저장할 위치 - None : 저장 안함
batch_size : int - batch size default : None => len(x) // 10
batch_size > len(x) : auto max batch size
validate_data : tuple - (x, y) default : None => (x, y)
back_propagation : bool - True : back propagation, False : not back propagation default : False
weight_reduction : int - 가중치 감소 초기화 주기 default : None => epochs
"""
try:
epochs = kwargs.get("epochs", 10)
log = kwargs.get("log", 0)
log_name = kwargs.get("log_name", None)
save_info = kwargs.get("save_info", False)
renewal = kwargs.get("renewal", "acc")
check_point = kwargs.get("check_point", None)
batch_size = kwargs.get("batch_size", None)
validate_data = kwargs.get("validate_data", None)
validation_split = kwargs.get("validation_split", None)
back_propagation = kwargs.get("back_propagation", False)
weight_reduction = kwargs.get("weight_reduction", None)
if x.shape[0] != y.shape[0]:
raise ValueError("x, y shape error")
if save_info is None:
save_info = False
if log not in [0, 1, 2]:
raise ValueError(
"""log not in [0, 1, 2]
0 : log 기록 안함
1 : csv
2 : tensorboard
"""
)
if renewal is None:
renewal = "loss"
elif renewal not in ["acc", "loss", "mse"]:
raise ValueError("renewal not in ['acc', 'loss', 'mse']")
if (
validate_data is not None
and validate_data[0].shape[0] != validate_data[1].shape[0]
):
raise ValueError("validate_data shape error")
else:
validate_data = [x, y]
if validation_split is not None:
if validation_split < 0 or validation_split > 1:
raise ValueError("validation_split not in [0, 1]")
[x, validate_data[0], y, validate_data[1]] = train_test_split(
x, y, test_size=validation_split, shuffle=True
)
if batch_size is not None and batch_size < 1:
raise ValueError("batch_size < 1")
if batch_size is None or batch_size > len(x):
batch_size = len(x)
if weight_reduction == None:
weight_reduction = epochs
except ValueError as ve:
print(ve)
sys.exit(11)
except Exception as e:
print(e)
sys.exit(10)
self.renewal = renewal
try:
if log_name is None:
log_name = "fit"
self.log_path = f"logs/{log_name}/{self.day}"
if log == 2:
assert log_name is not None, "log_name is None"
train_log_dir = self.log_path + "/train"
for i in range(self.n_particles):
self.train_summary_writer[i] = tf.summary.create_file_writer(
train_log_dir + f"/{i}"
)
port = find_free_port()
tensorboard_precess = subprocess.Popen(
[
"tensorboard",
"--logdir",
self.log_path,
"--port",
str(port),
]
)
tensorboard_url = f"http://localhost:{port}"
print(f"tensorboard url : {tensorboard_url}")
atexit.register(tensorboard_precess.kill)
elif check_point is not None or log == 1:
if not os.path.exists(self.log_path):
os.makedirs(self.log_path, exist_ok=True)
except ValueError as ve:
print(ve)
sys.exit(11)
except Exception as e:
print(e)
sys.exit(10)
try:
dataset = self.batch_generator(x, y, batch_size=batch_size)
if back_propagation:
model_ = keras.models.model_from_json(self.model.to_json())
model_.compile(
loss=self.loss,
optimizer="adam",
metrics=["accuracy", "mse"],
)
model_.fit(x, y, epochs=1, verbose=0) # type: ignore
score = model_.evaluate(x, y, verbose="auto")
Particle.g_best_score = score
Particle.g_best_weights = model_.get_weights()
del model_
print("best score init complete" + str(Particle.g_best_score))
epochs_pbar = tqdm(
range(epochs),
desc=f"best - loss: {Particle.g_best_score[0]:.4f} - acc: {Particle.g_best_score[1]:.4f} - mse: {Particle.g_best_score[2]:.4f}",
ascii=True,
leave=True,
position=0,
)
rng = np.random.default_rng(seed=42)
for epoch in epochs_pbar:
# 이번 epoch의 평균 점수
# particle_avg = particle_sum / self.n_particles # x_j
# particle_sum = 0
# 각 최고 점수, 최저 loss, 최저 mse
max_acc = 0
min_loss = np.inf
min_mse = np.inf
# 한번의 실행 동안 최고 점수를 받은 파티클의 인덱스
best_particle_index = 0
part_pbar = tqdm(
range(len(self.particles)),
desc=f"loss: {min_loss:.4f} acc: {max_acc:.4f} mse: {min_mse:.4f}",
ascii=True,
leave=False,
position=1,
)
w = (
self.w_max
- (self.w_max - self.w_min)
* (epoch % weight_reduction)
/ weight_reduction
)
for i in part_pbar:
for _i in tqdm(
range(dataset.get_length()),
desc="batch",
ascii=True,
leave=False,
):
part_pbar.set_description(
f"loss: {min_loss:.4f} acc: {max_acc:.4f} mse: {min_mse:.4f}"
)
x_batch, y_batch = dataset.next()
score = self.particles[i].step(
x_batch, y_batch, self.c0, self.c1, w, renewal=renewal
)
if renewal == "loss":
# 최저 loss 보다 작거나 같을 경우
if score[0] < min_loss:
# 각 점수 갱신
min_loss, max_acc, min_mse = score
best_particle_index = i
elif score[0] == min_loss:
if score[1] > max_acc:
min_loss, max_acc, min_mse = score
best_particle_index = i
elif renewal == "acc":
# 최고 점수 보다 높거나 같을 경우
if score[1] > max_acc:
# 각 점수 갱신
min_loss, max_acc, min_mse = score
best_particle_index = i
elif score[1] == max_acc:
if score[2] < min_mse:
min_loss, max_acc, min_mse = score
best_particle_index = i
elif renewal == "mse":
if score[2] < min_mse:
min_loss, max_acc, min_mse = score
best_particle_index = i
elif score[2] == min_mse:
if score[1] > max_acc:
min_loss, max_acc, min_mse = score
best_particle_index = i
if log == 2:
with self.train_summary_writer[i].as_default():
tf.summary.scalar("accuracy", score[1], step=epoch + 1)
tf.summary.scalar("loss", score[0], step=epoch + 1)
tf.summary.scalar("mse", score[2], step=epoch + 1)
if log == 1:
with open(
f"./logs/{log_name}/{self.day}/{self.n_particles}_{epochs}_{self.c0}_{self.c1}_{self.w_min}_{renewal}.csv",
"a",
) as f:
f.write(f"{score[0]}, {score[1]}, {score[2]}")
if i != self.n_particles - 1:
f.write(", ")
else:
f.write("\n")
part_pbar.refresh()
# 한번 epoch 가 끝나고 갱신을 진행해야 순간적으로 높은 파티클이 발생해도 오류가 생기지 않음
if renewal == "loss" and min_loss <= Particle.g_best_score[0]:
if min_loss < Particle.g_best_score[0]:
self.particles[best_particle_index].update_global_best()
else:
if max_acc > Particle.g_best_score[1]:
self.particles[best_particle_index].update_global_best()
elif renewal == "acc" and max_acc >= Particle.g_best_score[1]:
# 최고 점수 보다 높을 경우
if max_acc > Particle.g_best_score[1]:
# 최고 점수 갱신
self.particles[best_particle_index].update_global_best()
# 최고 점수 와 같을 경우
else:
# 최저 loss 보다 낮을 경우
if min_loss < Particle.g_best_score[0]:
self.particles[best_particle_index].update_global_best()
elif renewal == "mse" and min_mse <= Particle.g_best_score[2]:
if min_mse < Particle.g_best_score[2]:
self.particles[best_particle_index].update_global_best()
else:
if max_acc > Particle.g_best_score[1]:
self.particles[best_particle_index].update_global_best()
# 최고 점수 갱신
epochs_pbar.set_description(
f"best - loss: {Particle.g_best_score[0]:.4f} - acc: {Particle.g_best_score[1]:.4f} - mse: {Particle.g_best_score[2]:.4f}"
)
if check_point is not None and epoch % check_point == 0:
os.makedirs(
f"./logs/{log_name}/{self.day}",
exist_ok=True,
)
self._check_point_save(f"./logs/{log_name}/{self.day}/ckpt-{epoch}")
tf.keras.backend.reset_uids()
tf.keras.backend.clear_session()
gc.collect()
return Particle.g_best_score
except KeyboardInterrupt:
print("Ctrl + C : Stop Training")
except MemoryError:
print("Memory Error : Stop Training")
except Exception as e:
print(e)
finally:
self.model_save(validate_data)
print("model save")
if save_info:
self.save_info()
print("save info")
def get_best_model(self):
"""
최고 점수를 받은 모델을 반환
Returns:
(keras.models): 모델
"""
model = keras.models.model_from_json(self.model.to_json())
if Particle.g_best_weights is not None:
model.set_weights(self._decode(Particle.g_best_weights))
model.compile(
loss=self.loss,
optimizer="adam",
metrics=["accuracy", "mse"],
)
return model
else:
return None
def get_best_score(self):
"""
최고 점수를 반환
Returns:
(float): 점수
"""
return Particle.g_best_score
def get_best_weights(self):
"""
최고 점수를 받은 가중치를 반환
Returns:
(float): 가중치
"""
return Particle.g_best_weights
def save_info(self):
"""
학습 정보를 저장
Args:
path (str, optional): 저장 위치. Defaults to "./result".
"""
json_save = {
"name": f"{self.day}/{self.n_particles}_{self.c0}_{self.c1}_{self.w_min}.h5",
"n_particles": self.n_particles,
"score": Particle.g_best_score,
"c0": self.c0,
"c1": self.c1,
"w_min": self.w_min,
"w_max": self.w_max,
"loss_method": self.loss,
"empirical_balance": self.empirical_balance,
"dispersion": self.dispersion,
"negative_swarm": self.negative_swarm,
"mutation_swarm": self.mutation_swarm,
"random_state_0": self.random_state[0],
"random_state_1": self.random_state[1].tolist(),
"random_state_2": self.random_state[2],
"random_state_3": self.random_state[3],
"random_state_4": self.random_state[4],
"renewal": self.renewal,
}
with open(
f"./{self.log_path}/{self.loss}_{Particle.g_best_score}.json",
"a",
) as f:
json.dump(json_save, f, indent=4)
def _check_point_save(self, save_path: str = "./result/check_point"):
"""
중간 저장
Args:
save_path (str, optional): checkpoint 저장 위치 및 이름. Defaults to f"./result/check_point".
"""
model = self.get_best_model()
model.save_weights(save_path)
def model_save(self, valid_data: List):
"""
최고 점수를 받은 모델 저장
Args:
save_path (str, optional): 모델의 저장 위치. Defaults to "./result".
Returns:
(keras.models): 모델
"""
x, y = valid_data
model = self.get_best_model()
if model is None:
return None
score = model.evaluate(x, y, verbose=1) # type: ignore
print(f"model score - loss: {score[0]} - acc: {score[1]} - mse: {score[2]}")
if self.renewal == "loss":
index = 0
elif self.renewal == "acc":
index = 1
else:
index = 2
model.save(f"./{self.log_path}/model_{score[index]}.h5")
return model