EBPSO 알고리즘 구현 - 선택지로 추가
random 으로 분산시키는 방법 구현 - 선택지로 추가
iris 기준 98퍼센트로 나오나 정확한 결과를 지켜봐야 할것으로 보임
This commit is contained in:
jung-geun
2023-05-29 04:01:48 +09:00
parent 7a612e4ca7
commit 91c6ec965b
27 changed files with 3378 additions and 1647 deletions

280
metacode/pso_bp.py Normal file
View File

@@ -0,0 +1,280 @@
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tqdm import tqdm
class PSO(object):
"""
Class implementing PSO algorithm
"""
def __init__(self, model: keras.models, loss_method=keras.losses.MeanSquaredError(), optimizer='adam', n_particles=5):
"""
Initialize the key variables.
Args:
model : 학습할 모델 객체 (Sequential)
loss_method : 손실 함수
optimizer : 최적화 함수
n_particles(int) : 파티클의 개수
"""
self.model = model # 모델
self.n_particles = n_particles # 파티클의 개수
self.loss_method = loss_method # 손실 함수
self.optimizer = optimizer # 최적화 함수
self.model_structure = self.model.to_json() # 모델의 구조
self.init_weights = self.model.get_weights() # 검색할 차원
self.particle_depth = len(self.model.get_weights()) # 검색할 차원의 깊이
self.particles_weights = [None] * n_particles # 파티클의 위치
for _ in tqdm(range(self.n_particles), desc="init particles position"):
# particle_node = []
m = keras.models.model_from_json(self.model_structure)
m.compile(loss=self.loss_method,
optimizer=self.optimizer, metrics=["accuracy"])
self.particles_weights[_] = m.get_weights()
# print(f"shape > {self.particles_weights[_][0]}")
# self.particles_weights.append(particle_node)
# print(f"particles_weights > {self.particles_weights}")
# self.particles_weights = np.random.uniform(size=(n_particles, self.particle_depth)) \
# * self.init_pos
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 균등한 위치를 생성
# self.velocities = [None] * self.n_particles
self.velocities = [
[0 for i in range(self.particle_depth)] for n in range(n_particles)]
for i in tqdm(range(n_particles), desc="init velocities"):
# print(i)
for index, layer in enumerate(self.init_weights):
# print(f"index > {index}")
# print(f"layer > {layer.shape}")
self.velocities[i][index] = np.random.rand(
*layer.shape) / 5 - 0.10
# if layer.ndim == 1:
# self.velocities[i][index] = np.random.uniform(
# size=(layer.shape[0],))
# elif layer.ndim == 2:
# self.velocities[i][index] = np.random.uniform(
# size=(layer.shape[0], layer.shape[1]))
# elif layer.ndim == 3:
# self.velocities[i][index] = np.random.uniform(
# size=(layer.shape[0], layer.shape[1], layer.shape[2]))
# print(f"type > {type(self.velocities)}")
# print(f"velocities > {self.velocities}")
# print(f"velocities > {self.velocities}")
# for i, layer in enumerate(self.init_weights):
# self.velocities[i] = np.random.rand(*layer.shape) / 5 - 0.10
# self.velocities = np.random.uniform(
# size=(n_particles, self.particle_depth))
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 속도를 무작위로 초기화
# 최대 사이즈로 전역 최적갑 저장 - global best
self.g_best = self.model.get_weights() # 전역 최적값(최적의 가중치)
self.p_best = self.particles_weights # 각 파티클의 최적값(최적의 가중치)
self.p_best_score = [0 for i in range(
n_particles)] # 각 파티클의 최적값의 점수
self.g_best_score = 0 # 전역 최적값의 점수(초기화 - 무한대)
self.g_history = []
self.g_best_score_history = []
self.history = []
def _update_weights(self, weights, v):
"""
Update particle position
Args:
weights (array-like) : 파티클의 현재 가중치
v (array-like) : 가중치의 속도
Returns:
(array-like) : 파티클의 새로운 가중치(위치)
"""
# w = np.array(w) # 각 파티클의 위치
# v = np.array(v) # 각 파티클의 속도(방향과 속력을 가짐)
# print(f"len(w) > {len(w)}")
# print(f"len(v) > {len(v)}")
new_weights = [0 for i in range(len(weights))]
for i in range(len(weights)):
# print(f"shape > w : {np.shape(w[i])}, v : {np.shape(v[i])}")
new_weights[i] = tf.add(weights[i], v[i])
# new_w = tf.add(w, v) # 각 파티클을 랜덤한 속도만큼 진행
return new_weights # 진행한 파티클들의 위치를 반환
def _update_velocity(self, weights, v, p_best, c0=0.5, c1=1.5, w=0.75):
"""
Update particle velocity
Args:
weights (array-like) : 파티클의 현재 가중치
v (array-like) : 속도
p_best(array-like) : 각 파티클의 최적의 위치 (최적의 가중치)
c0 (float) : 인지 스케일링 상수 (가중치의 중요도 - 지역) - 지역 관성
c1 (float) : 사회 스케일링 상수 (가중치의 중요도 - 전역) - 전역 관성
w (float) : 관성 상수 (현재 속도의 중요도)
Returns:
(array-like) : 각 파티클의 새로운 속도
"""
# x = np.array(x)
# v = np.array(v)
# assert np.shape(weights) == np.shape(v), "Position and velocity must have same shape."
# 두 데이터의 shape 이 같지 않으면 오류 출력
# 0에서 1사이의 숫자를 랜덤 생성
r0 = np.random.rand()
r1 = np.random.rand()
# print(f"type > weights : {type(weights)}")
# print(f"type > v : {type(v)}")
# print(
# f"shape > weights : {np.shape(weights[0])}, v : {np.shape(v[0])}")
# print(f"len > weights : {len(weights)}, v : {len(v)}")
# p_best = np.array(p_best)
# g_best = np.array(g_best)
# 가중치(상수)*속도 + \
# 스케일링 상수*랜덤 가중치*(나의 최적값 - 처음 위치) + \
# 전역 스케일링 상수*랜덤 가중치*(전체 최적값 - 처음 위치)
# for i, layer in enumerate(weights):
new_velocity = [None] * len(weights)
for i, layer in enumerate(weights):
new_v = w*v[i]
new_v = new_v + c0*r0*(p_best[i] - layer)
new_v = new_v + c1*r1*(self.g_best[i] - layer)
new_velocity[i] = new_v
# m2 = tf.multiply(tf.multiply(c0, r0),
# tf.subtract(p_best[i], layer))
# m3 = tf.multiply(tf.multiply(c1, r1),
# tf.subtract(g_best[i], layer))
# new_v[i] = tf.add(m1, tf.add(m2, m3))
# new_v[i] = tf.add_n([m1, m2, m3])
# new_v[i] = tf.add_n(
# tf.multiply(w, v[i]),
# tf.multiply(tf.multiply(c0, r0),
# tf.subtract(p_best[i], layer)),
# tf.multiply(tf.multiply(c1, r1),
# tf.subtract(g_best[i], layer)))
# new_v = w*v + c0*r0*(p_best - weights) + c1*r1*(g_best - weights)
return new_velocity
def _get_score(self, x, y):
"""
Compute the score of the current position of the particles.
Args:
x (array-like): The current position of the particles
y (array-like): The current position of the particles
Returns:
(array-like) : 추론에 대한 점수
"""
# = self.model
# model.set_weights(weights)
score = self.model.evaluate(x, y, verbose=0)
return score
def optimize(self, x_train, y_train, x_test, y_test, maxiter=10, epochs=1, batch_size=32, c0=0.5, c1=1.5, w=0.75):
"""
Run the PSO optimization process utill the stoping critera is met.
Cas for minization. The aim is to minimize the cost function
Args:
maxiter (int): the maximum number of iterations before stopping the optimization
파티클의 최종 위치를 위한 반복 횟수
Returns:
The best solution found (array-like)
"""
for _ in range(maxiter):
loss = 0
acc = 1e-10
for i in tqdm(range(self.n_particles), desc=f"Iter {_}/{maxiter} | acc avg {round(acc/(_+1) ,4)}", ascii=True):
weights = self.particles_weights[i] # 각 파티클 추출
v = self.velocities[i] # 각 파티클의 다음 속도 추출
p_best = self.p_best[i] # 결과치 저장할 변수 지정
# 2. 속도 계산
self.velocities[i] = self._update_velocity(
weights, v, p_best, c0, c1, w)
# 다음에 움직일 속도 = 최초 위치, 현재 속도, 현재 위치, 최종 위치
# 3. 위치 업데이트
self.particles_weights[i] = self._update_weights(weights, v)
# 현재 위치 = 최초 위치 현재 속도
# Update the besst position for particle i
# 내 현재 위치가 내 위치의 최소치보다 작으면 갱신
self.model.set_weights(self.particles_weights[i].copy())
self.model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size,
verbose=0, validation_data=(x_test, y_test))
self.particles_weights[i] = self.model.get_weights()
# 4. 평가
self.model.compile(loss=self.loss_method,
optimizer='adam', metrics=['accuracy'])
score = self._get_score(x_test, y_test)
# print(score)
# print(f"score : {score}")
# print(f"loss : {loss}")
# print(f"p_best_score : {self.p_best_score[i]}")
if score[1] > self.p_best_score[i]:
self.p_best_score[i] = score[1]
self.p_best[i] = self.particles_weights[i].copy()
if score[1] > self.g_best_score:
self.g_best_score = score[1]
self.g_best = self.particles_weights[i].copy()
self.g_history.append(self.g_best)
self.g_best_score_history.append(
self.g_best_score)
self.score = score[1]
loss = loss + score[0]
acc = acc + score[1]
# if self.func(self.particles_weights[i]) < self.func(p_best):
# self.p_best[i] = self.particles_weights[i]
# if self.
# Update the best position overall
# 내 현재 위치가 전체 위치 최소치보다 작으면 갱신
# if self.func(self.particles_weights[i]) < self.func(self.g_best):
# self.g_best = self.particles_weights[i]
# self.g_history.append(self.g_best)
# print(f"{i} particle score : {score[0]}")
print(
f"loss avg : {loss/self.n_particles} | acc avg : {acc/self.n_particles} | best loss : {self.g_best_score}")
# self.history.append(self.particles_weights.copy())
# 전체 최소 위치, 전체 최소 벡터
return self.g_best, self._get_score(x_test, y_test)
"""
Returns:
현재 전체 위치
"""
def position(self):
return self.particles_weights.copy()
"""
Returns:
전체 위치 벡터 history
"""
def position_history(self):
return self.history.copy()
"""
Returns:
global best 의 갱신된 값의 변화를 반환
"""
def global_history(self):
return self.g_history.copy()
"""
Returns:
global best score 의 갱신된 값의 변화를 반환
"""
def global_score_history(self):
return self.g_best_score_history.copy()

137
metacode/pso_meta.py Normal file
View File

@@ -0,0 +1,137 @@
import numpy as np
class PSO(object):
"""
Class implementing PSO algorithm
"""
def __init__(self, func, init_pos, n_particles):
"""
Initialize the key variables.
Args:
fun (function): the fitness function to optimize
init_pos(array_like):
n_particles(int): the number of particles of the swarm.
"""
self.func = func
self.n_particles = n_particles
self.init_pos = init_pos # 곰샥헐 차원
self.particle_dim = len(init_pos) # 검색할 차원의 크기
self.particles_pos = np.random.uniform(size=(n_particles, self.particle_dim)) \
* self.init_pos
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 균등한 위치를 생성
self.velocities = np.random.uniform(
size=(n_particles, self.particle_dim))
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 속도를 무작위로 초기화
self.g_best = init_pos # 최대 사이즈로 전역 최적갑 저장 - global best
self.p_best = self.particles_pos # 모든 파티클의 위치 - particles best
self.g_history = []
self.history = []
def update_position(self, x, v):
"""
Update particle position
Args:
x (array-like): particle current position
v (array-like): particle current velocity
Returns:
The updated position(array-like)
"""
x = np.array(x) # 각 파티클의 위치
v = np.array(v) # 각 파티클의 속도(방향과 속력을 가짐)
new_x = x + v # 각 파티클을 랜덤한 속도만큼 진행
return new_x # 진행한 파티클들의 위치를 반환
def update_velocity(self, x, v, p_best, g_best, c0=0.5, c1=1.5, w=0.75):
"""
Update particle velocity
Args:
x(array-like): particle current position
v (array-like): particle current velocity
p_best(array-like): the best position found so far for a particle
g_best(array-like): the best position regarding all the particles found so far
c0 (float): the congnitive scaling constant, 인지 스케일링 상수
c1 (float): the social scaling constant
w (float): the inertia weight, 관성 중량
Returns:
The updated velocity (array-like).
"""
x = np.array(x)
v = np.array(v)
assert x.shape == v.shape, "Position and velocity must have same shape."
# 두 데이터의 shape 이 같지 않으면 오류 출력
# 0에서 1사이의 숫자를 랜덤 생성
r = np.random.uniform()
p_best = np.array(p_best)
g_best = np.array(g_best)
# 가중치(상수)*속도 + \
# 스케일링 상수*랜덤 가중치*(나의 최적값 - 처음 위치) + \
# 전역 스케일링 상수*랜덤 가중치*(전체 최적값 - 처음 위치)
new_v = w*v + c0*r*(p_best - x) + c1*r*(g_best - x)
return new_v
def optimize(self, maxiter=200):
"""
Run the PSO optimization process utill the stoping critera is met.
Cas for minization. The aim is to minimize the cost function
Args:
maxiter (int): the maximum number of iterations before stopping the optimization
파티클의 최종 위치를 위한 반복 횟수
Returns:
The best solution found (array-like)
"""
for _ in range(maxiter):
for i in range(self.n_particles):
x = self.particles_pos[i] # 각 파티클 추출
v = self.velocities[i] # 랜덤 생성한 속도 추출
p_best = self.p_best[i] # 결과치 저장할 변수 지정
self.velocities[i] = self.update_velocity(
x, v, p_best, self.g_best)
# 다음에 움직일 속도 = 최초 위치, 현재 속도, 현재 위치, 최종 위치
self.particles_pos[i] = self.update_position(x, v)
# 현재 위치 = 최초 위치 현재 속도
# Update the besst position for particle i
# 내 현재 위치가 내 위치의 최소치보다 작으면 갱신
if self.func(self.particles_pos[i]) < self.func(p_best):
self.p_best[i] = self.particles_pos[i]
# Update the best position overall
# 내 현재 위치가 전체 위치 최소치보다 작으면 갱신
if self.func(self.particles_pos[i]) < self.func(self.g_best):
self.g_best = self.particles_pos[i]
self.g_history.append(self.g_best)
self.history.append(self.particles_pos.copy())
# 전체 최소 위치, 전체 최소 벡터
return self.g_best, self.func(self.g_best)
"""
Returns:
현재 전체 위치
"""
def position(self):
return self.particles_pos.copy()
"""
Returns:
전체 위치 벡터 history
"""
def position_history(self):
return self.history
"""
Returns:
global best 의 갱신된 값의 변화를 반환
"""
def global_history(self):
return self.g_history.copy()

268
metacode/pso_tf.py Normal file
View File

@@ -0,0 +1,268 @@
import os
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import datetime
import gc
import cupy as cp
class PSO(object):
"""
Class implementing PSO algorithm
"""
def __init__(self, model: keras.models, loss_method=keras.losses.MeanSquaredError(), n_particles: int = 5):
"""
Initialize the key variables.
Args:
model : 학습할 모델 객체 (Sequential)
loss_method : 손실 함수
n_particles(int) : 파티클의 개수
"""
self.model = model # 모델
self.n_particles = n_particles # 파티클의 개수
self.loss_method = loss_method # 손실 함수
model_structure = self.model.to_json() # 모델의 구조 정보
self.init_weights = self.model.get_weights() # 검색할 차원
self.particle_depth = len(self.model.get_weights()) # 검색할 차원의 깊이
self.particles_weights = [None] * n_particles # 파티클의 위치
for _ in tqdm(range(self.n_particles), desc="init particles position"):
m = keras.models.model_from_json(model_structure)
m.compile(loss=self.loss_method,
optimizer="adam", metrics=["accuracy"])
self.particles_weights[_] = m.get_weights()
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 균등한 위치를 생성
self.velocities = [
[0 for i in range(self.particle_depth)] for n in range(n_particles)]
for i in tqdm(range(n_particles), desc="init velocities"):
self.init_weights = self.model.get_weights()
w_,s_,l_ = self._encode(self.init_weights)
w_ = np.random.rand(len(w_)) / 5 - 0.10
self.velocities[i] = self._decode(w_,s_,l_)
# for index, layer in enumerate(self.init_weights):
# self.velocities[i][index] = np.random.rand(
# *layer.shape) / 5 - 0.10
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 속도를 무작위로 초기화
# 최대 사이즈로 전역 최적갑 저장 - global best
self.p_best = self.particles_weights # 각 파티클의 최적값(최적의 가중치)
self.g_best=self.model.get_weights() # 전역 최적값(최적의 가중치) | 초기값은 모델의 가중치
# 각 파티클의 최적값의 점수
self.p_best_score = [0 for i in range(n_particles)]
# 전역 최적값의 점수(초기화 - 0)
self.g_best_score = 0
def __del__(self):
del self.model
del self.n_particles
del self.loss_method
del self.init_weights
del self.particles_weights
del self.velocities
del self.p_best
del self.g_best
del self.p_best_score
del self.g_best_score
def _encode(self,weights: list):
# w_gpu = cp.array([])
w_gpu = np.array([])
lenght = []
shape = []
for layer in weights:
shape.append(layer.shape)
w_ = layer.reshape(-1)
lenght.append(len(w_))
w_gpu = np.append(w_gpu, w_)
# w_gpu = cp.append(w_gpu, w_)
return w_gpu, shape, lenght
def _decode(self,weight, shape, lenght):
weights = []
start = 0
for i in range(len(shape)):
end = start + lenght[i]
# print(f"{start} ~ {end}")
# print(f"{shape[i]}")
w_ = weight[start:end]
w_ = np.reshape(w_, shape[i])
# w_ = w_.reshape(shape[i])
weights.append(w_)
start = end
return weights
def _update_weights(self, weights, v):
"""
Update particle position
Args:
weights (array-like) : 파티클의 현재 가중치
v (array-like) : 가중치의 속도
Returns:
(array-like) : 파티클의 새로운 가중치(위치)
"""
# w = np.array(w) # 각 파티클의 위치
# v = np.array(v) # 각 파티클의 속도(방향과 속력을 가짐)
# new_weights = [0 for i in range(len(weights))]
# print(f"weights : {weights}")
encode_w, w_sh, w_len = self._encode(weights = weights)
encode_v, _, _ = self._encode(weights = v)
new_w = encode_w + encode_v
new_weights = self._decode(new_w, w_sh, w_len)
# for i in range(len(weights)):
# new_weights[i] = tf.add(weights[i], v[i])
# new_w = tf.add(w, v) # 각 파티클을 랜덤한 속도만큼 진행
return new_weights # 진행한 파티클들의 위치를 반환
def _update_velocity(self, weights, v, p_best, c0=0.5, c1=1.5, w=0.75):
"""
Update particle velocity
Args:
weights (array-like) : 파티클의 현재 가중치
v (array-like) : 속도
p_best(array-like) : 각 파티클의 최적의 위치 (최적의 가중치)
c0 (float) : 인지 스케일링 상수 (가중치의 중요도 - 지역) - 지역 관성
c1 (float) : 사회 스케일링 상수 (가중치의 중요도 - 전역) - 전역 관성
w (float) : 관성 상수 (현재 속도의 중요도)
Returns:
(array-like) : 각 파티클의 새로운 속도
"""
# x = np.array(x)
# v = np.array(v)
# assert np.shape(weights) == np.shape(v), "Position and velocity must have same shape."
# 두 데이터의 shape 이 같지 않으면 오류 출력
# 0에서 1사이의 숫자를 랜덤 생성
r0 = np.random.rand()
r1 = np.random.rand()
# p_best = np.array(p_best)
# g_best = np.array(g_best)
# 가중치(상수)*속도 + \
# 스케일링 상수*랜덤 가중치*(나의 최적값 - 처음 위치) + \
# 전역 스케일링 상수*랜덤 가중치*(전체 최적값 - 처음 위치)
encode_w, w_sh, w_len = self._encode(weights = weights)
encode_v, _, _ = self._encode(weights = v)
encode_p, _, _ = self._encode(weights = p_best)
encode_g, _, _ = self._encode(weights = self.g_best)
new_v = encode_w * encode_v + c0*r0*(encode_p - encode_w) + c1*r1*(encode_g - encode_w)
new_velocity = self._decode(new_v, w_sh, w_len)
# new_velocity = [None] * len(weights)
# for i, layer in enumerate(weights):
# new_v = w*v[i]
# new_v = new_v + c0*r0*(p_best[i] - layer)
# new_v = new_v + c1*r1*(self.g_best[i] - layer)
# new_velocity[i] = new_v
# new_v = w*v + c0*r0*(p_best - weights) + c1*r1*(g_best - weights)
return new_velocity
def _get_score(self, x, y):
"""
Compute the score of the current position of the particles.
Args:
x (array-like): The current position of the particles
y (array-like): The current position of the particles
Returns:
(array-like) : 추론에 대한 점수
"""
score = self.model.evaluate(x, y, verbose=0)
return score
def optimize(self, x_, y_, maxiter=10, c0=0.5, c1=1.5, w=0.75, save=False, save_path="./result/history"):
"""
Run the PSO optimization process utill the stoping critera is met.
Cas for minization. The aim is to minimize the cost function
Args:
maxiter (int): the maximum number of iterations before stopping the optimization
파티클의 최종 위치를 위한 반복 횟수
Returns:
The best solution found (array-like)
"""
if save:
os.makedirs(save_path, exist_ok=True)
day = datetime.datetime.now().strftime('%m-%d-%H-%M')
for _ in range(maxiter):
for i in tqdm(range(self.n_particles), desc=f"Iter {_}/{maxiter} ", ascii=True):
weights = self.particles_weights[i] # 각 파티클 추출
v = self.velocities[i] # 각 파티클의 다음 속도 추출
p_best = self.p_best[i] # 결과치 저장할 변수 지정
# 2. 속도 계산
self.velocities[i] = self._update_velocity(
weights, v, p_best, c0, c1, w)
# 다음에 움직일 속도 = 최초 위치, 현재 속도, 현재 위치, 최종 위치
# 3. 위치 업데이트
self.particles_weights[i] = self._update_weights(weights, v)
# 현재 위치 = 이전 위치 + 현재 속도
# 내 현재 위치가 내 위치의 최소치보다 작으면 갱신
self.model.set_weights(self.particles_weights[i])
# self.particles_weights[i] = self.model.get_weights()
# 4. 평가
self.model.compile(loss=self.loss_method,
optimizer='sgd', metrics=['accuracy'])
score = self._get_score(x_, y_)
if score[1] > self.p_best_score[i]:
self.p_best_score[i] = score[1]
self.p_best[i] = self.particles_weights[i]
if score[1] > self.g_best_score:
self.g_best_score = score[1]
self.g_best = self.particles_weights[i]
if save:
with open(f"{save_path}/{day}_{self.n_particles}_{maxiter}_{c0}_{c1}_{w}.csv",'a')as f:
f.write(f"{score[0]}, {score[1]}")
if i != self.n_particles - 1:
f.write(",")
if save:
with open(f"{save_path}/{day}_{self.n_particles}_{maxiter}_{c0}_{c1}_{w}.csv",'a')as f:
f.write("\n")
print(
f"loss avg : {score[0]/self.n_particles} | acc avg : {score[1]/self.n_particles} | best score : {self.g_best_score}")
gc.collect()
# 전체 최소 위치, 전체 최소 벡터
return self.g_best, self._get_score(x_, y_)
"""
Returns:
최종 가중치
"""
def best_weights(self):
return self.g_best
"""
Returns:
최종 가중치의 스코어
"""
def best_score(self):
return self.g_best_score

210
metacode/pso_tf_bak.py Normal file
View File

@@ -0,0 +1,210 @@
import os
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow import keras
class PSO(object):
"""
Class implementing PSO algorithm
"""
def __init__(self, model: keras.models, loss_method=keras.losses.MeanSquaredError(), n_particles: int = 5):
"""
Initialize the key variables.
Args:
model : 학습할 모델 객체 (Sequential)
loss_method : 손실 함수
n_particles(int) : 파티클의 개수
"""
self.model = model # 모델
self.n_particles = n_particles # 파티클의 개수
self.loss_method = loss_method # 손실 함수
self.model_structure = self.model.to_json() # 모델의 구조 정보
self.init_weights = self.model.get_weights() # 검색할 차원
self.particle_depth = len(self.model.get_weights()) # 검색할 차원의 깊이
self.particles_weights = [None] * n_particles # 파티클의 위치
for _ in tqdm(range(self.n_particles), desc="init particles position"):
m = keras.models.model_from_json(self.model_structure)
m.compile(loss=self.loss_method,
optimizer="adam", metrics=["accuracy"])
self.particles_weights[_] = m.get_weights()
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 균등한 위치를 생성
self.velocities = [
[0 for i in range(self.particle_depth)] for n in range(n_particles)]
for i in tqdm(range(n_particles), desc="init velocities"):
for index, layer in enumerate(self.init_weights):
self.velocities[i][index] = np.random.rand(
*layer.shape) / 5 - 0.10
# 입력받은 파티클의 개수 * 검색할 차원의 크기 만큼의 속도를 무작위로 초기화
# 최대 사이즈로 전역 최적갑 저장 - global best
self.p_best = self.particles_weights # 각 파티클의 최적값(최적의 가중치)
self.g_best = self.model.get_weights() # 전역 최적값(최적의 가중치) | 초기값은 모델의 가중치
# 각 파티클의 최적값의 점수
self.p_best_score = [0 for i in range(n_particles)]
# 전역 최적값의 점수(초기화 - 0)
self.g_best_score = 0
self.loss_history = [[] for i in range(n_particles)] # 각 파티클의 손실값 변화
self.acc_history = [[] for i in range(n_particles)] # 각 파티클의 정확도 변화
self.g_best_score_history = [] # 전역 최적값의 점수 변화
def _update_weights(self, weights, v):
"""
Update particle position
Args:
weights (array-like) : 파티클의 현재 가중치
v (array-like) : 가중치의 속도
Returns:
(array-like) : 파티클의 새로운 가중치(위치)
"""
# w = np.array(w) # 각 파티클의 위치
# v = np.array(v) # 각 파티클의 속도(방향과 속력을 가짐)
new_weights = [0 for i in range(len(weights))]
for i in range(len(weights)):
new_weights[i] = tf.add(weights[i], v[i])
# new_w = tf.add(w, v) # 각 파티클을 랜덤한 속도만큼 진행
return new_weights # 진행한 파티클들의 위치를 반환
def _update_velocity(self, weights, v, p_best, c0=0.5, c1=1.5, w=0.75):
"""
Update particle velocity
Args:
weights (array-like) : 파티클의 현재 가중치
v (array-like) : 속도
p_best(array-like) : 각 파티클의 최적의 위치 (최적의 가중치)
c0 (float) : 인지 스케일링 상수 (가중치의 중요도 - 지역) - 지역 관성
c1 (float) : 사회 스케일링 상수 (가중치의 중요도 - 전역) - 전역 관성
w (float) : 관성 상수 (현재 속도의 중요도)
Returns:
(array-like) : 각 파티클의 새로운 속도
"""
# x = np.array(x)
# v = np.array(v)
# assert np.shape(weights) == np.shape(v), "Position and velocity must have same shape."
# 두 데이터의 shape 이 같지 않으면 오류 출력
# 0에서 1사이의 숫자를 랜덤 생성
r0 = np.random.rand()
r1 = np.random.rand()
# p_best = np.array(p_best)
# g_best = np.array(g_best)
# 가중치(상수)*속도 + \
# 스케일링 상수*랜덤 가중치*(나의 최적값 - 처음 위치) + \
# 전역 스케일링 상수*랜덤 가중치*(전체 최적값 - 처음 위치)
new_velocity = [None] * len(weights)
for i, layer in enumerate(weights):
new_v = w*v[i]
new_v = new_v + c0*r0*(p_best[i] - layer)
new_v = new_v + c1*r1*(self.g_best[i] - layer)
new_velocity[i] = new_v
# new_v = w*v + c0*r0*(p_best - weights) + c1*r1*(g_best - weights)
return new_velocity
def _get_score(self, x, y):
"""
Compute the score of the current position of the particles.
Args:
x (array-like): The current position of the particles
y (array-like): The current position of the particles
Returns:
(array-like) : 추론에 대한 점수
"""
score = self.model.evaluate(x, y, verbose=0)
return score
def optimize(self, x_, y_, maxiter=10, c0=0.5, c1=1.5, w=0.75):
"""
Run the PSO optimization process utill the stoping critera is met.
Cas for minization. The aim is to minimize the cost function
Args:
maxiter (int): the maximum number of iterations before stopping the optimization
파티클의 최종 위치를 위한 반복 횟수
Returns:
The best solution found (array-like)
"""
for _ in range(maxiter):
for i in tqdm(range(self.n_particles), desc=f"Iter {_}/{maxiter} ", ascii=True):
weights = self.particles_weights[i] # 각 파티클 추출
v = self.velocities[i] # 각 파티클의 다음 속도 추출
p_best = self.p_best[i] # 결과치 저장할 변수 지정
# 2. 속도 계산
self.velocities[i] = self._update_velocity(
weights, v, p_best, c0, c1, w)
# 다음에 움직일 속도 = 최초 위치, 현재 속도, 현재 위치, 최종 위치
# 3. 위치 업데이트
self.particles_weights[i] = self._update_weights(weights, v)
# 현재 위치 = 이전 위치 + 현재 속도
# 내 현재 위치가 내 위치의 최소치보다 작으면 갱신
self.model.set_weights(self.particles_weights[i].copy())
# self.particles_weights[i] = self.model.get_weights()
# 4. 평가
self.model.compile(loss=self.loss_method,
optimizer='adam', metrics=['accuracy'])
score = self._get_score(x_, y_)
if score[1] > self.p_best_score[i]:
self.p_best_score[i] = score[1]
self.p_best[i] = self.particles_weights[i].copy()
if score[1] > self.g_best_score:
self.g_best_score = score[1]
self.g_best = self.particles_weights[i].copy()
self.g_best_score_history.append(
self.g_best_score)
self.loss_history[i].append(score[0])
self.acc_history[i].append(score[1])
print(
f"loss avg : {score[0]/self.n_particles} | acc avg : {score[1]/self.n_particles} | best score : {self.g_best_score}")
# 전체 최소 위치, 전체 최소 벡터
return self.g_best, self._get_score(x_, y_)
"""
Returns:
최종 가중치
"""
def best_weights(self):
return self.g_best
"""
Returns:
최종 가중치의 스코어
"""
def best_score(self):
return self.g_best_score
"""
Returns:
global best score 의 갱신된 값의 변화를 반환
"""
def global_score_history(self):
return self.g_best_score_history.copy()
"""
Returns:
모든 파티클의 손실값과 정확도의 변화를 반환
"""
def all_history(self):
return self.loss_history, self.acc_history.copy()