mirror of
https://github.com/jung-geun/policy-routing.git
synced 2025-12-20 02:34:39 +09:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 20fc34a3d7 | |||
| 0a1ff6ba9e | |||
|
|
d4db302c28 | ||
| 2fabaf865a | |||
| 556a4f43a9 | |||
| 5faf22452d | |||
| 045255e495 | |||
| 4d39fb678d | |||
| 8b8690ba79 |
@@ -6,24 +6,26 @@
|
|||||||
# Note that environment variables can be set in several places
|
# Note that environment variables can be set in several places
|
||||||
# See https://docs.gitlab.com/ee/ci/variables/#cicd-variable-precedence
|
# See https://docs.gitlab.com/ee/ci/variables/#cicd-variable-precedence
|
||||||
stages:
|
stages:
|
||||||
- build
|
- build
|
||||||
- test
|
- build-sonar
|
||||||
- deploy
|
- test
|
||||||
- review
|
- deploy
|
||||||
- dast
|
- review
|
||||||
- staging
|
- dast
|
||||||
- canary
|
- staging
|
||||||
- production
|
- canary
|
||||||
- incremental rollout 10%
|
- production
|
||||||
- incremental rollout 25%
|
- incremental rollout 10%
|
||||||
- incremental rollout 50%
|
- incremental rollout 25%
|
||||||
- incremental rollout 100%
|
- incremental rollout 50%
|
||||||
- performance
|
- incremental rollout 100%
|
||||||
- cleanup
|
- performance
|
||||||
|
- cleanup
|
||||||
sast:
|
sast:
|
||||||
stage: test
|
stage: test
|
||||||
include:
|
include:
|
||||||
- template: Auto-DevOps.gitlab-ci.yml
|
- template: Auto-DevOps.gitlab-ci.yml
|
||||||
|
- local: .gitlab/ci/*.gitlab-ci.yml
|
||||||
|
|
||||||
python_tests:
|
python_tests:
|
||||||
stage: test
|
stage: test
|
||||||
|
|||||||
26
.gitlab/ci/sonarqube.gitlab-ci.yml
Normal file
26
.gitlab/ci/sonarqube.gitlab-ci.yml
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
image:
|
||||||
|
name: sonarsource/sonar-scanner-cli:11
|
||||||
|
entrypoint: [""]
|
||||||
|
|
||||||
|
variables:
|
||||||
|
SONAR_USER_HOME: "${CI_PROJECT_DIR}/.sonar" # Defines the location of the analysis task cache
|
||||||
|
GIT_DEPTH: "0" # Tells git to fetch all the branches of the project, required by the analysis task
|
||||||
|
|
||||||
|
build-sonar:
|
||||||
|
stage: build-sonar
|
||||||
|
|
||||||
|
cache:
|
||||||
|
policy: pull-push
|
||||||
|
key: "sonar-cache-$CI_COMMIT_REF_SLUG"
|
||||||
|
paths:
|
||||||
|
- "${SONAR_USER_HOME}/cache"
|
||||||
|
- sonar-scanner/
|
||||||
|
|
||||||
|
script:
|
||||||
|
- sonar-scanner -Dsonar.host.url="${SONAR_HOST_URL}"
|
||||||
|
allow_failure: true
|
||||||
|
rules:
|
||||||
|
- if: $CI_PIPELINE_SOURCE == 'merge_request_event'
|
||||||
|
- if: $CI_COMMIT_BRANCH == 'master'
|
||||||
|
- if: $CI_COMMIT_BRANCH == 'main'
|
||||||
|
- if: $CI_COMMIT_BRANCH == 'develop'
|
||||||
93
AUTO_NIC_SETUP_README.md
Normal file
93
AUTO_NIC_SETUP_README.md
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
# 자동 NIC 설정 시스템
|
||||||
|
|
||||||
|
이 시스템은 새로운 네트워크 인터페이스(NIC)가 감지되면 자동으로 DHCP를 통해 IP를 할당받고, Policy-Based Routing(PBR) 규칙을 업데이트하는 자동화 시스템입니다.
|
||||||
|
|
||||||
|
## 구성 요소
|
||||||
|
|
||||||
|
### 1. udev 규칙
|
||||||
|
- **파일**: `/etc/udev/rules.d/99-auto-nic-setup.rules`
|
||||||
|
- **기능**: 새로운 네트워크 인터페이스가 추가되거나 활성화될 때 자동으로 스크립트 실행
|
||||||
|
- **지원 인터페이스**: ens*, eth*, enp*
|
||||||
|
|
||||||
|
### 2. 자동 설정 스크립트
|
||||||
|
- **파일**: `/usr/local/bin/auto-nic-setup.sh`
|
||||||
|
- **기능**:
|
||||||
|
- 인터페이스 상태 확인
|
||||||
|
- DHCP를 통한 IP 할당
|
||||||
|
- Policy Routing 규칙 자동 업데이트
|
||||||
|
- 상세한 로깅
|
||||||
|
|
||||||
|
### 3. NetworkManager Dispatcher
|
||||||
|
- **파일**: `/etc/NetworkManager/dispatcher.d/99-policy-routing`
|
||||||
|
- **기능**: NetworkManager에서 인터페이스가 up 상태가 될 때 추가적으로 스크립트 실행
|
||||||
|
|
||||||
|
### 4. 로그 파일
|
||||||
|
- **파일**: `/var/log/auto-nic-setup.log`
|
||||||
|
- **기능**: 모든 자동화 작업의 상세 로그 기록
|
||||||
|
|
||||||
|
## 동작 방식
|
||||||
|
|
||||||
|
1. **NIC 감지**: 새로운 NIC가 시스템에 추가되면 udev 규칙이 트리거됨
|
||||||
|
2. **자동 활성화**: 인터페이스가 DOWN 상태면 자동으로 UP 상태로 변경
|
||||||
|
3. **DHCP 설정**: dhclient를 사용하여 자동으로 IP 주소 할당 시도
|
||||||
|
4. **PBR 업데이트**: IP 할당이 성공하면 policy_routing.py의 apply_changes 실행
|
||||||
|
5. **로깅**: 모든 과정이 `/var/log/auto-nic-setup.log`에 기록됨
|
||||||
|
|
||||||
|
## 현재 설정된 인터페이스
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 현재 PBR 규칙 확인
|
||||||
|
ip rule show
|
||||||
|
|
||||||
|
# 현재 라우팅 테이블 확인
|
||||||
|
ip route show table nic1
|
||||||
|
ip route show table nic2
|
||||||
|
ip route show table nic3
|
||||||
|
|
||||||
|
# 로그 확인
|
||||||
|
sudo tail -f /var/log/auto-nic-setup.log
|
||||||
|
```
|
||||||
|
|
||||||
|
## 수동 테스트
|
||||||
|
|
||||||
|
새로운 NIC가 추가되었을 때 수동으로 테스트하려면:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 수동으로 스크립트 실행
|
||||||
|
sudo /usr/local/bin/auto-nic-setup.sh ens9
|
||||||
|
|
||||||
|
# 또는 udev 이벤트 시뮬레이션
|
||||||
|
sudo udevadm trigger --subsystem-match=net --action=add
|
||||||
|
```
|
||||||
|
|
||||||
|
## 문제 해결
|
||||||
|
|
||||||
|
### 로그 확인
|
||||||
|
```bash
|
||||||
|
sudo tail -n 50 /var/log/auto-nic-setup.log
|
||||||
|
```
|
||||||
|
|
||||||
|
### udev 규칙 다시 로드
|
||||||
|
```bash
|
||||||
|
sudo udevadm control --reload-rules
|
||||||
|
```
|
||||||
|
|
||||||
|
### 수동으로 PBR 규칙 적용
|
||||||
|
```bash
|
||||||
|
cd /home/pieroot/policy-routing
|
||||||
|
sudo python3 policy_routing.py apply_changes
|
||||||
|
```
|
||||||
|
|
||||||
|
## 주의사항
|
||||||
|
|
||||||
|
- 이 시스템은 물리적 네트워크 인터페이스(ens*, eth*, enp*)에만 적용됩니다
|
||||||
|
- 가상 인터페이스(docker*, veth*, br-*)는 자동으로 제외됩니다
|
||||||
|
- DHCP 서버가 있는 네트워크에서만 정상 동작합니다
|
||||||
|
- 시스템 부팅 시에도 자동으로 적용됩니다
|
||||||
|
|
||||||
|
## 설치된 파일 목록
|
||||||
|
|
||||||
|
- `/usr/local/bin/auto-nic-setup.sh` - 메인 자동화 스크립트
|
||||||
|
- `/etc/udev/rules.d/99-auto-nic-setup.rules` - udev 규칙
|
||||||
|
- `/etc/NetworkManager/dispatcher.d/99-policy-routing` - NetworkManager dispatcher
|
||||||
|
- `/var/log/auto-nic-setup.log` - 로그 파일
|
||||||
72
README.md
72
README.md
@@ -18,21 +18,15 @@ NIC 의 ip 설정이 미리 되어 있어야 합니다.
|
|||||||
스크립트는 아래 명령어로 다운로드 받을 수 있습니다
|
스크립트는 아래 명령어로 다운로드 받을 수 있습니다
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
wget -O policy_routing.py https://git.dmslab.xyz/dmslab/policy-routing/-/raw/main/policy_routing.py
|
wget -O policy_routing.py https://raw.githubusercontent.com/jung-geun/policy-routing/main/policy_routing.py
|
||||||
# or
|
# or
|
||||||
curl -o policy_routing.py https://git.dmslab.xyz/dmslab/policy-routing/-/raw/main/policy_routing.py
|
curl -o policy_routing.py https://raw.githubusercontent.com/jung-geun/policy-routing/main/policy_routing.py
|
||||||
```
|
```
|
||||||
|
|
||||||
다운로드한 스크립트를 install 옵션으로 시스템 데몬으로 설치할 수 있습니다
|
다운로드한 스크립트를 setup 옵션으로 시스템 데몬으로 설치할 수 있습니다
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
sudo python3 policy_routing.py install
|
sudo python3 policy_routing.py setup
|
||||||
```
|
|
||||||
|
|
||||||
실제 인터페이스가 감지되고 수행이 되었는지 체크할 수 있습니다.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo python3 policy_routing.py status
|
|
||||||
```
|
```
|
||||||
|
|
||||||
ip rule 을 확인하여 정책 기반 라우팅이 설정되었는지 확인할 수 있습니다.
|
ip rule 을 확인하여 정책 기반 라우팅이 설정되었는지 확인할 수 있습니다.
|
||||||
@@ -40,3 +34,61 @@ ip rule 을 확인하여 정책 기반 라우팅이 설정되었는지 확인할
|
|||||||
```bash
|
```bash
|
||||||
ip rule ls
|
ip rule ls
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## packer 를 사용하여 이미지 배포
|
||||||
|
|
||||||
|
openstack 에 자동으로 PBR 시스템을 구성하는 packer template 을 제공합니다.
|
||||||
|
|
||||||
|
### Packer 설치
|
||||||
|
|
||||||
|
https://developer.hashicorp.com/packer/tutorials/docker-get-started/get-started-install-cli
|
||||||
|
|
||||||
|
### Packer OpenStack plugin 설치
|
||||||
|
|
||||||
|
openstack 에서 사용할 수 있게 하려면 Packer OpenStack 플러그인을 설치해야 합니다. 아래 명령어를 사용하여 설치할 수 있습니다.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
packer plugins install github.com/hashicorp/openstack
|
||||||
|
```
|
||||||
|
|
||||||
|
### Packer OpenStack 템플릿 설정
|
||||||
|
|
||||||
|
packer 를 사용하기 전에 openrc를 설정해야합니다
|
||||||
|
|
||||||
|
```bash
|
||||||
|
vi admin-openrc
|
||||||
|
```
|
||||||
|
|
||||||
|
설정 파일 내용은 아래 내용들을 채워야합니다.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
export OS_USER_DOMAIN_NAME=Default
|
||||||
|
export OS_PROJECT_NAME=admin
|
||||||
|
export OS_TENANT_NAME=$OS_PROJECT_NAME
|
||||||
|
export OS_USERNAME=admin
|
||||||
|
export OS_PASSWORD=ADMIN_PASSWORD
|
||||||
|
export OS_AUTH_URL=http://OPENSTACK_KEYSTONE_HOST/v3
|
||||||
|
export OS_IDENTITY_API_VERSION=3
|
||||||
|
export OS_IMAGE_API_VERSION=2
|
||||||
|
export OS_SOURCE_IMAGE_ID=원본_이미지_ID
|
||||||
|
export OS_NETWORK_NAME=이미지_빌드에_사용할_네트워크_ID
|
||||||
|
export OS_FLOATING_IP_POOL=플로팅_IP_풀_이름
|
||||||
|
```
|
||||||
|
|
||||||
|
위 환경 변수들은 실제 환경에 맞게 수정해야 합니다. 예를 들어, `ADMIN_PASSWORD`는 OpenStack 관리자의 비밀번호로 설정해야 하며, `OPENSTACK_KEYSTONE_HOST`는 OpenStack Keystone 서비스의 호스트 주소로 설정해야 합니다.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
source admin-openrc
|
||||||
|
```
|
||||||
|
|
||||||
|
packer 를 실행할 수 있는지 확인합니다.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
packer validate packer-openstack-ubuntu.json
|
||||||
|
```
|
||||||
|
|
||||||
|
### Packer OpenStack 템플릿 실행
|
||||||
|
|
||||||
|
```bash
|
||||||
|
packer build packer-openstack-ubuntu.json
|
||||||
|
```
|
||||||
|
|||||||
11
admin-openrc.sample
Normal file
11
admin-openrc.sample
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
export OS_USER_DOMAIN_NAME=Default
|
||||||
|
export OS_PROJECT_NAME=admin
|
||||||
|
export OS_TENANT_NAME=$OS_PROJECT_NAME
|
||||||
|
export OS_USERNAME=admin
|
||||||
|
export OS_PASSWORD=ADMIN_PASSWORD
|
||||||
|
export OS_AUTH_URL=http://OPENSTACK_KEYSTONE_HOST/v3
|
||||||
|
export OS_IDENTITY_API_VERSION=3
|
||||||
|
export OS_IMAGE_API_VERSION=2
|
||||||
|
export OS_SOURCE_IMAGE_ID=원본_이미지_ID
|
||||||
|
export OS_NETWORK_NAME=이미지_빌드에_사용할_네트워크_ID
|
||||||
|
export OS_FLOATING_IP_POOL=플로팅_IP_풀_이름
|
||||||
@@ -9,7 +9,7 @@
|
|||||||
"source_image_id": "{{env `OS_SOURCE_IMAGE_ID`}}",
|
"source_image_id": "{{env `OS_SOURCE_IMAGE_ID`}}",
|
||||||
"flavor_name": "cpu.2c_2g",
|
"flavor_name": "cpu.2c_2g",
|
||||||
"network_name": "{{env `OS_NETWORK_NAME`}}",
|
"network_name": "{{env `OS_NETWORK_NAME`}}",
|
||||||
"image_name": "ubuntu 22.04-{{timestamp}} server",
|
"image_name": "ubuntu 24.04 server-{{timestamp}}",
|
||||||
"floating_ip_pool": "{{env `OS_FLOATING_IP_POOL`}}",
|
"floating_ip_pool": "{{env `OS_FLOATING_IP_POOL`}}",
|
||||||
"ssh_username": "ubuntu"
|
"ssh_username": "ubuntu"
|
||||||
},
|
},
|
||||||
@@ -36,6 +36,7 @@
|
|||||||
"use_floating_ip": true,
|
"use_floating_ip": true,
|
||||||
"ssh_timeout": "10m",
|
"ssh_timeout": "10m",
|
||||||
"image_disk_format": "raw",
|
"image_disk_format": "raw",
|
||||||
|
"image_visibility": "public",
|
||||||
"use_blockstorage_volume": true
|
"use_blockstorage_volume": true
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
write_files:
|
write_files:
|
||||||
- path: /tmp/pbr-script-cloud-init.sh
|
- path: /tmp/pbr-script-cloud-init.sh
|
||||||
permissions: '0755'
|
permissions: "0755"
|
||||||
owner: root:root
|
owner: root:root
|
||||||
content: |
|
content: |
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
@@ -10,7 +10,7 @@ write_files:
|
|||||||
# GitLab 스크립트 URL (공개 저장소 또는 접근 가능한 URL)
|
# GitLab 스크립트 URL (공개 저장소 또는 접근 가능한 URL)
|
||||||
# 예시: GitLab Pages, Raw 파일 URL 등
|
# 예시: GitLab Pages, Raw 파일 URL 등
|
||||||
# private repository인 경우 인증 관련 부분을 추가해야 합니다. (아래 설명)
|
# private repository인 경우 인증 관련 부분을 추가해야 합니다. (아래 설명)
|
||||||
SCRIPT_URL="https://git.dmslab.xyz/dmslab/policy-routing/-/raw/main/policy_routing.py"
|
SCRIPT_URL="https://raw.githubusercontent.com/jung-geun/policy-routing/v0.3/policy_routing.py"
|
||||||
DEST_PATH="/opt/PBR/routing.py"
|
DEST_PATH="/opt/PBR/routing.py"
|
||||||
|
|
||||||
# 스크립트 저장될 디렉토리 생성 (필요하다면)
|
# 스크립트 저장될 디렉토리 생성 (필요하다면)
|
||||||
@@ -33,7 +33,7 @@ write_files:
|
|||||||
if [ $? -eq 0 ]; then
|
if [ $? -eq 0 ]; then
|
||||||
echo "Script downloaded successfully to ${DEST_PATH}. Executing..."
|
echo "Script downloaded successfully to ${DEST_PATH}. Executing..."
|
||||||
chmod +x "${DEST_PATH}" # 실행 권한 부여
|
chmod +x "${DEST_PATH}" # 실행 권한 부여
|
||||||
"${DEST_PATH}" install # 스크립트 실행
|
"${DEST_PATH}" setup --force # 스크립트 실행
|
||||||
else
|
else
|
||||||
echo "Error: Failed to download script from ${SCRIPT_URL}."
|
echo "Error: Failed to download script from ${SCRIPT_URL}."
|
||||||
exit 1
|
exit 1
|
||||||
|
|||||||
320
policy_routing.py
Normal file → Executable file
320
policy_routing.py
Normal file → Executable file
@@ -4,6 +4,8 @@ Ubuntu 22.04 Multi-NIC Policy Based Routing Setup Script
|
|||||||
Python Implementation
|
Python Implementation
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
__version__ = "0.3" # 현재 스크립트 버전
|
||||||
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
@@ -13,6 +15,7 @@ import re
|
|||||||
import ipaddress
|
import ipaddress
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import requests # requests 라이브러리 추가
|
||||||
|
|
||||||
|
|
||||||
class PolicyBasedRoutingManager:
|
class PolicyBasedRoutingManager:
|
||||||
@@ -29,8 +32,9 @@ class PolicyBasedRoutingManager:
|
|||||||
self.logger.error("이 스크립트는 root 권한으로 실행해야 합니다.")
|
self.logger.error("이 스크립트는 root 권한으로 실행해야 합니다.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# 네트워크 인터페이스 자동 감지
|
# 네트워크 인터페이스 설정 (초기에는 비워둠)
|
||||||
self.config = self.auto_detect_network_config()
|
self.config = {"nics": {}}
|
||||||
|
self.github_repo_url = "https://raw.githubusercontent.com/jung-geun/policy-routing/main/policy_routing.py"
|
||||||
|
|
||||||
def run_command(self, cmd, ignore_error=False):
|
def run_command(self, cmd, ignore_error=False):
|
||||||
"""시스템 명령어 실행"""
|
"""시스템 명령어 실행"""
|
||||||
@@ -44,6 +48,88 @@ class PolicyBasedRoutingManager:
|
|||||||
self.logger.error(f"명령어 실행 실패: {cmd} - {e}")
|
self.logger.error(f"명령어 실행 실패: {cmd} - {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def get_latest_version(self):
|
||||||
|
"""GitHub에서 최신 버전 정보 가져오기"""
|
||||||
|
try:
|
||||||
|
response = requests.get(self.github_repo_url)
|
||||||
|
response.raise_for_status() # HTTP 오류 발생 시 예외 발생
|
||||||
|
|
||||||
|
# 파일 내용에서 __version__ 라인 찾기
|
||||||
|
for line in response.text.splitlines():
|
||||||
|
if "__version__" in line:
|
||||||
|
match = re.search(
|
||||||
|
r'__version__\s*=\s*["\'](\d+\.\d+\.\d+)["\']', line
|
||||||
|
)
|
||||||
|
if match:
|
||||||
|
return match.group(1)
|
||||||
|
return None
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
self.logger.error(f"최신 버전 정보를 가져오는 데 실패했습니다: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def check_for_updates(self):
|
||||||
|
"""업데이트 확인 및 사용자에게 알림"""
|
||||||
|
self.logger.info("최신 버전 확인 중...")
|
||||||
|
latest_version = self.get_latest_version()
|
||||||
|
current_version = __version__
|
||||||
|
|
||||||
|
if latest_version:
|
||||||
|
self.logger.info(
|
||||||
|
f"현재 버전: {current_version}, 최신 버전: {latest_version}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 버전 비교 (간단한 문자열 비교, 실제로는 semantic versioning 라이브러리 사용 권장)
|
||||||
|
if latest_version > current_version:
|
||||||
|
self.logger.info("새로운 버전이 사용 가능합니다!")
|
||||||
|
response = input("업데이트를 진행하시겠습니까? (y/N): ")
|
||||||
|
if response.lower() == "y":
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
self.logger.info("업데이트가 취소되었습니다.")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
self.logger.info("현재 최신 버전을 사용 중입니다.")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
self.logger.warning(
|
||||||
|
"최신 버전 정보를 가져올 수 없어 업데이트 확인을 건너뜁니다."
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def perform_update(self):
|
||||||
|
"""스크립트를 최신 버전으로 업데이트"""
|
||||||
|
self.logger.info("스크립트 업데이트를 시작합니다...")
|
||||||
|
try:
|
||||||
|
response = requests.get(self.github_repo_url)
|
||||||
|
response.raise_for_status() # HTTP 오류 발생 시 예외 발생
|
||||||
|
|
||||||
|
script_content = response.text
|
||||||
|
current_script_path = Path(sys.argv[0]) # 현재 실행 중인 스크립트의 경로
|
||||||
|
|
||||||
|
# 현재 스크립트 파일을 백업
|
||||||
|
backup_path = current_script_path.with_suffix(
|
||||||
|
f".py.bak_{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
||||||
|
)
|
||||||
|
current_script_path.rename(backup_path)
|
||||||
|
self.logger.info(f"현재 스크립트 백업 완료: {backup_path}")
|
||||||
|
|
||||||
|
# 최신 내용으로 스크립트 파일 덮어쓰기
|
||||||
|
with open(current_script_path, "w") as f:
|
||||||
|
f.write(script_content)
|
||||||
|
|
||||||
|
# 실행 권한 유지
|
||||||
|
current_script_path.chmod(0o755)
|
||||||
|
|
||||||
|
self.logger.info(
|
||||||
|
"스크립트 업데이트가 성공적으로 완료되었습니다. 스크립트를 다시 실행해주세요."
|
||||||
|
)
|
||||||
|
sys.exit(0) # 업데이트 후 스크립트 재시작을 위해 종료
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
self.logger.error(f"스크립트 다운로드 실패: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"스크립트 업데이트 중 오류 발생: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
def get_network_interfaces(self):
|
def get_network_interfaces(self):
|
||||||
"""활성화된 네트워크 인터페이스 목록 가져오기"""
|
"""활성화된 네트워크 인터페이스 목록 가져오기"""
|
||||||
interfaces = {}
|
interfaces = {}
|
||||||
@@ -65,6 +151,7 @@ class PolicyBasedRoutingManager:
|
|||||||
and not interface.startswith("docker")
|
and not interface.startswith("docker")
|
||||||
and not interface.startswith("veth")
|
and not interface.startswith("veth")
|
||||||
and not interface.startswith("br-")
|
and not interface.startswith("br-")
|
||||||
|
and not interface.startswith("ovs-")
|
||||||
and "state UP" in line
|
and "state UP" in line
|
||||||
):
|
):
|
||||||
interfaces[interface] = {}
|
interfaces[interface] = {}
|
||||||
@@ -130,7 +217,7 @@ class PolicyBasedRoutingManager:
|
|||||||
|
|
||||||
if not interfaces:
|
if not interfaces:
|
||||||
self.logger.error("활성화된 네트워크 인터페이스를 찾을 수 없습니다")
|
self.logger.error("활성화된 네트워크 인터페이스를 찾을 수 없습니다")
|
||||||
sys.exit(1)
|
return config # Return empty config instead of sys.exit(1)
|
||||||
|
|
||||||
table_id = 100
|
table_id = 100
|
||||||
metric_base = 100
|
metric_base = 100
|
||||||
@@ -168,7 +255,7 @@ class PolicyBasedRoutingManager:
|
|||||||
|
|
||||||
if not config["nics"]:
|
if not config["nics"]:
|
||||||
self.logger.error("유효한 네트워크 인터페이스를 찾을 수 없습니다")
|
self.logger.error("유효한 네트워크 인터페이스를 찾을 수 없습니다")
|
||||||
sys.exit(1)
|
return config # Return empty config instead of sys.exit(1)
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f"총 {len(config['nics'])}개의 네트워크 인터페이스가 감지되었습니다"
|
f"총 {len(config['nics'])}개의 네트워크 인터페이스가 감지되었습니다"
|
||||||
@@ -342,6 +429,20 @@ class PolicyBasedRoutingManager:
|
|||||||
# 기존 default 라우트 제거
|
# 기존 default 라우트 제거
|
||||||
self.run_command("ip route del default", ignore_error=True)
|
self.run_command("ip route del default", ignore_error=True)
|
||||||
|
|
||||||
|
# DNS 서버 목록 (널리 사용되는 공용 DNS)
|
||||||
|
dns_servers = [
|
||||||
|
"8.8.8.8", # Google DNS
|
||||||
|
"8.8.4.4", # Google DNS
|
||||||
|
"1.1.1.1", # Cloudflare DNS
|
||||||
|
"1.0.0.1", # Cloudflare DNS
|
||||||
|
"208.67.222.222", # OpenDNS
|
||||||
|
"208.67.220.220", # OpenDNS
|
||||||
|
]
|
||||||
|
|
||||||
|
# 기존 DNS 서버 라우트 제거
|
||||||
|
for dns in dns_servers:
|
||||||
|
self.run_command(f"ip route del {dns}", ignore_error=True)
|
||||||
|
|
||||||
# metric 순으로 정렬하여 default 라우트 추가
|
# metric 순으로 정렬하여 default 라우트 추가
|
||||||
sorted_nics = sorted(self.config["nics"].items(), key=lambda x: x[1]["metric"])
|
sorted_nics = sorted(self.config["nics"].items(), key=lambda x: x[1]["metric"])
|
||||||
|
|
||||||
@@ -357,6 +458,25 @@ class PolicyBasedRoutingManager:
|
|||||||
f"Default 라우트 추가: {gateway} via {interface} (metric: {metric})"
|
f"Default 라우트 추가: {gateway} via {interface} (metric: {metric})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# DNS 서버들을 NIC별로 분산하여 라우팅 설정
|
||||||
|
self.logger.info("DNS 서버 라우팅 설정 중...")
|
||||||
|
|
||||||
|
for i, dns in enumerate(dns_servers):
|
||||||
|
# DNS 서버를 NIC 개수만큼 순환하여 분산
|
||||||
|
nic_index = i % len(sorted_nics)
|
||||||
|
nic_name, nic_config = sorted_nics[nic_index]
|
||||||
|
|
||||||
|
interface = nic_config["interface"]
|
||||||
|
gateway = nic_config["gateway"]
|
||||||
|
metric = nic_config["metric"]
|
||||||
|
|
||||||
|
self.run_command(
|
||||||
|
f"ip route add {dns} via {gateway} dev {interface} metric {metric}"
|
||||||
|
)
|
||||||
|
self.logger.info(
|
||||||
|
f"DNS 라우트 추가: {dns} via {gateway} (interface: {interface}, metric: {metric})"
|
||||||
|
)
|
||||||
|
|
||||||
def check_interfaces(self):
|
def check_interfaces(self):
|
||||||
"""네트워크 인터페이스 상태 확인"""
|
"""네트워크 인터페이스 상태 확인"""
|
||||||
self.logger.info("네트워크 인터페이스 상태 확인 중...")
|
self.logger.info("네트워크 인터페이스 상태 확인 중...")
|
||||||
@@ -640,6 +760,21 @@ class StartupPolicyBasedRoutingManager:
|
|||||||
def setup_main_routing(self, current_nics):
|
def setup_main_routing(self, current_nics):
|
||||||
self.logger.info("메인 라우팅 테이블 설정 중...")
|
self.logger.info("메인 라우팅 테이블 설정 중...")
|
||||||
self.run_command("ip route del default", ignore_error=True)
|
self.run_command("ip route del default", ignore_error=True)
|
||||||
|
|
||||||
|
# DNS 서버 목록 (널리 사용되는 공용 DNS)
|
||||||
|
dns_servers = [
|
||||||
|
"8.8.8.8", # Google DNS
|
||||||
|
"8.8.4.4", # Google DNS
|
||||||
|
"1.1.1.1", # Cloudflare DNS
|
||||||
|
"1.0.0.1", # Cloudflare DNS
|
||||||
|
"208.67.222.222", # OpenDNS
|
||||||
|
"208.67.220.220", # OpenDNS
|
||||||
|
]
|
||||||
|
|
||||||
|
# 기존 DNS 서버 라우트 제거
|
||||||
|
for dns in dns_servers:
|
||||||
|
self.run_command(f"ip route del {{dns}}", ignore_error=True)
|
||||||
|
|
||||||
sorted_nics = sorted(current_nics.items(), key=lambda x: x[1]["metric"])
|
sorted_nics = sorted(current_nics.items(), key=lambda x: x[1]["metric"])
|
||||||
for nic_name, nic_config in sorted_nics:
|
for nic_name, nic_config in sorted_nics:
|
||||||
interface = nic_config["interface"]
|
interface = nic_config["interface"]
|
||||||
@@ -648,6 +783,21 @@ class StartupPolicyBasedRoutingManager:
|
|||||||
self.run_command(f"ip route add default via {{gateway}} dev {{interface}} metric {{metric}}")
|
self.run_command(f"ip route add default via {{gateway}} dev {{interface}} metric {{metric}}")
|
||||||
self.logger.info(f"Default 라우트 추가: {{gateway}} via {{interface}} (metric: {{metric}})")
|
self.logger.info(f"Default 라우트 추가: {{gateway}} via {{interface}} (metric: {{metric}})")
|
||||||
|
|
||||||
|
# DNS 서버들을 NIC별로 분산하여 라우팅 설정
|
||||||
|
self.logger.info("DNS 서버 라우팅 설정 중...")
|
||||||
|
|
||||||
|
for i, dns in enumerate(dns_servers):
|
||||||
|
# DNS 서버를 NIC 개수만큼 순환하여 분산
|
||||||
|
nic_index = i % len(sorted_nics)
|
||||||
|
nic_name, nic_config = sorted_nics[nic_index]
|
||||||
|
|
||||||
|
interface = nic_config["interface"]
|
||||||
|
gateway = nic_config["gateway"]
|
||||||
|
metric = nic_config["metric"]
|
||||||
|
|
||||||
|
self.run_command(f"ip route add {{dns}} via {{gateway}} dev {{interface}} metric {{metric}}")
|
||||||
|
self.logger.info(f"DNS 라우트 추가: {{dns}} via {{gateway}} (interface: {{interface}}, metric: {{metric}})")
|
||||||
|
|
||||||
def main_startup():
|
def main_startup():
|
||||||
manager = StartupPolicyBasedRoutingManager()
|
manager = StartupPolicyBasedRoutingManager()
|
||||||
|
|
||||||
@@ -671,6 +821,49 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
self.logger.info(f"시작시 자동 적용 스크립트 생성 완료: {startup_script}")
|
self.logger.info(f"시작시 자동 적용 스크립트 생성 완료: {startup_script}")
|
||||||
|
|
||||||
|
def create_shutdown_script(self):
|
||||||
|
"""시스템 종료시 자동 적용을 위한 스크립트 생성"""
|
||||||
|
self.logger.info("종료시 자동 적용 스크립트 생성 중...")
|
||||||
|
|
||||||
|
shutdown_script = Path("/etc/network/if-down.d/policy-routing-python-down")
|
||||||
|
|
||||||
|
os.makedirs(shutdown_script.parent, exist_ok=True)
|
||||||
|
|
||||||
|
script_content = f"""#!/usr/bin/env python3
|
||||||
|
import subprocess
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def run_command(cmd, ignore_error=False):
|
||||||
|
try:
|
||||||
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||||||
|
if result.returncode != 0 and not ignore_error:
|
||||||
|
logger.warning(f"명령어 실행 경고: {{cmd}}")
|
||||||
|
logger.warning(f"오류: {{result.stderr}}")
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"명령어 실행 실패: {{cmd}} - {{e}}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def main_shutdown():
|
||||||
|
logger.info("정책 기반 라우팅 종료 스크립트 실행 중...")
|
||||||
|
# pbr.py remove를 호출하여 모든 라우팅 규칙 및 테이블을 정리합니다.
|
||||||
|
run_command("/usr/bin/python3 /home/pieroot/pbr/pbr.py remove")
|
||||||
|
logger.info("정책 기반 라우팅 종료 스크립트 실행 완료.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main_shutdown()
|
||||||
|
"""
|
||||||
|
shutdown_script.write_text(script_content)
|
||||||
|
shutdown_script.chmod(0o755)
|
||||||
|
|
||||||
|
self.logger.info(f"종료시 자동 적용 스크립트 생성 완료: {shutdown_script}")
|
||||||
|
|
||||||
def run_connectivity_test(self):
|
def run_connectivity_test(self):
|
||||||
"""연결성 테스트"""
|
"""연결성 테스트"""
|
||||||
self.logger.info("연결성 테스트 실행 중...")
|
self.logger.info("연결성 테스트 실행 중...")
|
||||||
@@ -696,17 +889,21 @@ if __name__ == "__main__":
|
|||||||
else:
|
else:
|
||||||
self.logger.warning("외부 연결 실패")
|
self.logger.warning("외부 연결 실패")
|
||||||
|
|
||||||
def setup(self):
|
def setup(self, force=False):
|
||||||
"""전체 설정 실행"""
|
"""전체 설정 실행"""
|
||||||
print("=" * 50)
|
print("=" * 50)
|
||||||
print(" Ubuntu 22.04 Multi-NIC Policy Based Routing")
|
print(" Ubuntu 22.04 Multi-NIC Policy Based Routing")
|
||||||
print(" Python Implementation with Auto-Detection")
|
print(" Python Implementation with Auto-Detection")
|
||||||
print("=" * 50)
|
print("=" * 50)
|
||||||
|
|
||||||
|
# 네트워크 인터페이스 자동 감지 및 설정 업데이트
|
||||||
|
self.config = self.auto_detect_network_config()
|
||||||
|
|
||||||
# 감지된 설정 출력
|
# 감지된 설정 출력
|
||||||
self.print_detected_config()
|
self.print_detected_config()
|
||||||
|
|
||||||
# 사용자 확인
|
# 사용자 확인 (force 모드일 경우 건너뛰기)
|
||||||
|
if not force:
|
||||||
response = input("위 설정으로 진행하시겠습니까? (y/N): ")
|
response = input("위 설정으로 진행하시겠습니까? (y/N): ")
|
||||||
if response.lower() != "y":
|
if response.lower() != "y":
|
||||||
print("설정이 취소되었습니다.")
|
print("설정이 취소되었습니다.")
|
||||||
@@ -736,6 +933,7 @@ if __name__ == "__main__":
|
|||||||
self.setup_main_routing() # 메인 라우팅 테이블은 전체 NIC 기반으로 재설정
|
self.setup_main_routing() # 메인 라우팅 테이블은 전체 NIC 기반으로 재설정
|
||||||
self.verify_configuration()
|
self.verify_configuration()
|
||||||
self.create_startup_script()
|
self.create_startup_script()
|
||||||
|
self.create_shutdown_script()
|
||||||
self.run_connectivity_test()
|
self.run_connectivity_test()
|
||||||
|
|
||||||
print("\n" + "=" * 50)
|
print("\n" + "=" * 50)
|
||||||
@@ -768,6 +966,13 @@ if __name__ == "__main__":
|
|||||||
startup_script = Path("/etc/network/if-up.d/policy-routing-python")
|
startup_script = Path("/etc/network/if-up.d/policy-routing-python")
|
||||||
if startup_script.exists():
|
if startup_script.exists():
|
||||||
startup_script.unlink()
|
startup_script.unlink()
|
||||||
|
self.logger.info(f"시작 스크립트 제거 완료: {startup_script}")
|
||||||
|
|
||||||
|
# 종료 스크립트 제거
|
||||||
|
shutdown_script = Path("/etc/network/if-down.d/policy-routing-python-down")
|
||||||
|
if shutdown_script.exists():
|
||||||
|
shutdown_script.unlink()
|
||||||
|
self.logger.info(f"종료 스크립트 제거 완료: {shutdown_script}")
|
||||||
|
|
||||||
# 저장된 NIC 설정 파일 제거
|
# 저장된 NIC 설정 파일 제거
|
||||||
if self.config_file_path.exists():
|
if self.config_file_path.exists():
|
||||||
@@ -781,6 +986,91 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
self.logger.info("설정 제거 완료")
|
self.logger.info("설정 제거 완료")
|
||||||
|
|
||||||
|
def install_auto_nic_setup(self):
|
||||||
|
"""자동 NIC 감지 스크립트 및 udev 규칙 설치"""
|
||||||
|
self.logger.info("자동 NIC 감지 설정 설치 중...")
|
||||||
|
|
||||||
|
auto_nic_setup_sh_content = """#!/bin/bash
|
||||||
|
# Auto NIC Setup Script
|
||||||
|
# This script will be called by udev when a network interface event occurs
|
||||||
|
|
||||||
|
INTERFACE="$1"
|
||||||
|
ACTION="$2"
|
||||||
|
LOG_FILE="/var/log/auto-nic-setup.log"
|
||||||
|
PBR_SCRIPT_PATH="/home/pieroot/policy-routing/policy_routing.py"
|
||||||
|
|
||||||
|
# Set PATH for udev execution
|
||||||
|
export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
|
||||||
|
|
||||||
|
# Logging function
|
||||||
|
log_message() {
|
||||||
|
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> "$LOG_FILE"
|
||||||
|
}
|
||||||
|
|
||||||
|
log_message "=== Auto NIC Setup Started for interface: $INTERFACE (Action: $ACTION) ==="
|
||||||
|
|
||||||
|
# Always apply policy routing changes
|
||||||
|
log_message "Applying policy routing changes..."
|
||||||
|
if [ -f "$PBR_SCRIPT_PATH" ]; then
|
||||||
|
# It's better to run the python script in its directory
|
||||||
|
cd "$(dirname "$PBR_SCRIPT_PATH")"
|
||||||
|
if python3 "$PBR_SCRIPT_PATH" apply_changes >> "$LOG_FILE" 2>&1; then
|
||||||
|
log_message "Policy routing changes applied successfully."
|
||||||
|
else
|
||||||
|
log_message "ERROR: Failed to apply policy routing changes."
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
log_message "ERROR: policy_routing.py script not found at $PBR_SCRIPT_PATH"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# The rest of the script can be for additional logic if needed,
|
||||||
|
# but for now, the main goal is to trigger apply_changes.
|
||||||
|
# The original logic for DHCP and bringing interfaces up can be kept if necessary.
|
||||||
|
|
||||||
|
log_message "=== Auto NIC Setup Completed for interface: $INTERFACE ==="
|
||||||
|
"""
|
||||||
|
|
||||||
|
udev_rules_content = """# Auto NIC Setup udev rules
|
||||||
|
# This rule triggers when a network interface is added, changed, or removed.
|
||||||
|
|
||||||
|
# Rule for network interface addition
|
||||||
|
SUBSYSTEM=="net", ACTION=="add", KERNEL=="eth*|ens*|enp*", RUN+="/usr/local/bin/auto-nic-setup.sh $name $action"
|
||||||
|
|
||||||
|
# Rule for network interface changes
|
||||||
|
SUBSYSTEM=="net", ACTION=="change", KERNEL=="eth*|ens*|enp*", RUN+="/usr/local/bin/auto-nic-setup.sh $name $action"
|
||||||
|
|
||||||
|
# Rule for network interface removal
|
||||||
|
SUBSYSTEM=="net", ACTION=="remove", KERNEL=="eth*|ens*|enp*", RUN+="/usr/local/bin/auto-nic-setup.sh $name $action"
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create auto-nic-setup.sh
|
||||||
|
auto_nic_setup_path = Path("/usr/local/bin/auto-nic-setup.sh")
|
||||||
|
auto_nic_setup_path.write_text(auto_nic_setup_sh_content)
|
||||||
|
auto_nic_setup_path.chmod(0o755)
|
||||||
|
self.logger.info(f"스크립트 생성 완료: {auto_nic_setup_path}")
|
||||||
|
|
||||||
|
# Create udev rule
|
||||||
|
udev_rule_path = Path("/etc/udev/rules.d/99-auto-nic-setup.rules")
|
||||||
|
udev_rule_path.write_text(udev_rules_content)
|
||||||
|
self.logger.info(f"udev 규칙 생성 완료: {udev_rule_path}")
|
||||||
|
|
||||||
|
# Reload udev rules
|
||||||
|
self.logger.info("udev 규칙 다시 로드 중...")
|
||||||
|
result = self.run_command("udevadm control --reload-rules && udevadm trigger")
|
||||||
|
if result and result.returncode == 0:
|
||||||
|
self.logger.info("udev 규칙이 성공적으로 다시 로드되었습니다.")
|
||||||
|
else:
|
||||||
|
self.logger.error("udev 규칙을 다시 로드하는 데 실패했습니다.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.logger.info("자동 NIC 감지 설정 설치가 완료되었습니다.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"설치 중 오류 발생: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
import argparse
|
import argparse
|
||||||
@@ -790,22 +1080,36 @@ def main():
|
|||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"action",
|
"action",
|
||||||
choices=["setup", "remove", "verify", "detect"],
|
choices=["setup", "remove", "verify", "detect", "apply_changes", "install"],
|
||||||
help="수행할 작업",
|
help="수행할 작업",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--force",
|
||||||
|
action="store_true",
|
||||||
|
help="설정 시 사용자 확인 프롬프트를 건너뜁니다.",
|
||||||
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
manager = PolicyBasedRoutingManager()
|
manager = PolicyBasedRoutingManager()
|
||||||
|
|
||||||
|
# 스크립트 시작 시 업데이트 확인
|
||||||
|
if manager.check_for_updates():
|
||||||
|
manager.perform_update()
|
||||||
|
# perform_update는 성공 시 sys.exit(0)을 호출하므로, 이 아래 코드는 실행되지 않음
|
||||||
|
|
||||||
if args.action == "setup":
|
if args.action == "setup":
|
||||||
manager.setup()
|
manager.setup(force=args.force)
|
||||||
elif args.action == "remove":
|
elif args.action == "remove":
|
||||||
manager.remove_configuration()
|
manager.remove_configuration()
|
||||||
elif args.action == "verify":
|
elif args.action == "verify":
|
||||||
manager.verify_configuration()
|
manager.verify_configuration()
|
||||||
elif args.action == "detect":
|
elif args.action == "detect":
|
||||||
manager.print_detected_config()
|
manager.print_detected_config()
|
||||||
|
elif args.action == "apply_changes":
|
||||||
|
manager.apply_dynamic_rules()
|
||||||
|
elif args.action == "install":
|
||||||
|
manager.install_auto_nic_setup()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
2
sonar-project.properties
Normal file
2
sonar-project.properties
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
sonar.projectKey=dmslab_policy-routing_56aaf3b2-bfac-465c-8417-8c99edccf1c2
|
||||||
|
sonar.qualitygate.wait=true
|
||||||
@@ -1,246 +0,0 @@
|
|||||||
import sys
|
|
||||||
import os
|
|
||||||
# 현재 스크립트의 디렉토리를 sys.path에 추가하여 policy_routing 모듈을 찾을 수 있도록 함
|
|
||||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '.')))
|
|
||||||
|
|
||||||
import unittest
|
|
||||||
from unittest.mock import patch, MagicMock
|
|
||||||
import unittest.mock as mock
|
|
||||||
import json
|
|
||||||
import subprocess
|
|
||||||
import logging
|
|
||||||
import ipaddress
|
|
||||||
|
|
||||||
# policy-routing.py에서 필요한 클래스와 상수 임포트
|
|
||||||
import policy_routing
|
|
||||||
|
|
||||||
# 로깅 비활성화 (테스트 시 불필요한 로그 출력 방지)
|
|
||||||
logging.disable(logging.CRITICAL)
|
|
||||||
|
|
||||||
class TestPolicyRoutingManager(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
"""각 테스트 전에 실행될 초기화 코드"""
|
|
||||||
# 테스트용 임시 설정 파일 경로 설정
|
|
||||||
self.test_config_file = "/tmp/test_policy_routing.json"
|
|
||||||
self.test_rt_tables_file = "/tmp/test_rt_tables"
|
|
||||||
|
|
||||||
# CONFIG_FILE과 RT_TABLES_FILE을 테스트용으로 오버라이드
|
|
||||||
# 실제 파일에 영향을 주지 않도록 패치
|
|
||||||
self.config_file_patch = patch('policy_routing.CONFIG_FILE', self.test_config_file)
|
|
||||||
self.rt_tables_file_patch = patch('policy_routing.RT_TABLES_FILE', self.test_rt_tables_file)
|
|
||||||
|
|
||||||
self.mock_config_file = self.config_file_patch.start()
|
|
||||||
self.mock_rt_tables_file = self.rt_tables_file_patch.start()
|
|
||||||
|
|
||||||
# 임시 설정 파일 생성 (기본값으로)
|
|
||||||
with open(self.test_config_file, 'w') as f:
|
|
||||||
json.dump(policy_routing.DEFAULT_CONFIG, f, indent=2)
|
|
||||||
|
|
||||||
# 임시 rt_tables 파일 생성
|
|
||||||
with open(self.test_rt_tables_file, 'w') as f:
|
|
||||||
f.write("#\n# reserved values\n#\n255\tlocal\n254\tmain\n253\tdefault\n0\tunspec\n")
|
|
||||||
|
|
||||||
self.manager = policy_routing.PolicyRoutingManager(debug=True)
|
|
||||||
# 로거를 테스트용으로 변경하여 실제 파일에 쓰지 않도록 함
|
|
||||||
self.manager.logger = MagicMock()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
"""각 테스트 후에 실행될 정리 코드"""
|
|
||||||
# 임시 파일 삭제
|
|
||||||
if os.path.exists(self.test_config_file):
|
|
||||||
os.remove(self.test_config_file)
|
|
||||||
if os.path.exists(self.test_rt_tables_file):
|
|
||||||
os.remove(self.test_rt_tables_file)
|
|
||||||
|
|
||||||
self.config_file_patch.stop()
|
|
||||||
self.rt_tables_file_patch.stop()
|
|
||||||
|
|
||||||
def test_calculate_network(self):
|
|
||||||
"""calculate_network 함수 테스트"""
|
|
||||||
self.assertEqual(self.manager.calculate_network("192.168.1.100", "24"), "192.168.1.0/24")
|
|
||||||
self.assertEqual(self.manager.calculate_network("10.0.0.5", "8"), "10.0.0.0/8")
|
|
||||||
self.assertEqual(self.manager.calculate_network("172.16.10.20", "16"), "172.16.0.0/16")
|
|
||||||
self.assertEqual(self.manager.calculate_network("192.168.1.1", "30"), "192.168.1.0/30")
|
|
||||||
|
|
||||||
# 잘못된 IP 형식
|
|
||||||
self.assertEqual(self.manager.calculate_network("invalid-ip", "24"), "0.0.0.0/24")
|
|
||||||
# 잘못된 넷마스크
|
|
||||||
self.assertEqual(self.manager.calculate_network("192.168.1.100", "abc"), "0.0.0.0/abc") # 이 경우 폴백 로직에 따라 달라질 수 있음
|
|
||||||
self.assertEqual(self.manager.calculate_network("192.168.1.100", "33"), "0.0.0.0/33") # 유효하지 않은 넷마스크
|
|
||||||
|
|
||||||
@patch('subprocess.run')
|
|
||||||
def test_run_command_success(self, mock_run):
|
|
||||||
"""run_command 성공 케이스 테스트"""
|
|
||||||
mock_run.return_value = MagicMock(stdout="Success output", stderr="", returncode=0)
|
|
||||||
success, output = self.manager.run_command(["echo", "hello"])
|
|
||||||
self.assertTrue(success)
|
|
||||||
self.assertEqual(output, "Success output")
|
|
||||||
mock_run.assert_called_once_with(["echo", "hello"], capture_output=True, text=True, check=True)
|
|
||||||
self.manager.logger.debug.assert_any_call("실행: echo hello")
|
|
||||||
self.manager.logger.debug.assert_any_call("성공: echo hello")
|
|
||||||
self.manager.logger.debug.assert_any_call("출력: Success output")
|
|
||||||
|
|
||||||
@patch('subprocess.run')
|
|
||||||
def test_run_command_failure(self, mock_run):
|
|
||||||
"""run_command 실패 케이스 테스트"""
|
|
||||||
mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["bad", "cmd"], stderr="Error output")
|
|
||||||
success, output = self.manager.run_command(["bad", "cmd"])
|
|
||||||
self.assertFalse(success)
|
|
||||||
self.assertEqual(output, "Error output")
|
|
||||||
self.manager.logger.error.assert_called_once_with("명령어 실행 실패: bad cmd - Error output")
|
|
||||||
|
|
||||||
@patch('subprocess.run')
|
|
||||||
def test_run_command_ignore_errors(self, mock_run):
|
|
||||||
"""run_command 특정 오류 무시 케이스 테스트"""
|
|
||||||
mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["ip", "route", "flush"], stderr="No such file or directory")
|
|
||||||
success, output = self.manager.run_command(["ip", "route", "flush"], ignore_errors=["No such file or directory"])
|
|
||||||
self.assertTrue(success)
|
|
||||||
self.assertEqual(output, "")
|
|
||||||
|
|
||||||
self.manager.logger.error.assert_not_called()
|
|
||||||
self.manager.logger.debug.assert_any_call("무시된 오류: ip route flush - No such file or directory")
|
|
||||||
self.manager.logger.debug.assert_any_call("실행: ip route flush")
|
|
||||||
self.assertEqual(self.manager.logger.debug.call_count, 2)
|
|
||||||
|
|
||||||
@patch('policy_routing.PolicyRoutingManager.run_command')
|
|
||||||
def test_get_network_interfaces(self, mock_run_command):
|
|
||||||
"""get_network_interfaces 함수 테스트"""
|
|
||||||
# 시뮬레이션할 ip addr show 출력
|
|
||||||
mock_run_command.side_effect = [
|
|
||||||
(True, """
|
|
||||||
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
|
|
||||||
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
|
|
||||||
inet 127.0.0.1/8 scope host lo
|
|
||||||
valid_lft forever preferred_lft forever
|
|
||||||
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
|
|
||||||
link/ether 00:0c:29:12:34:56 brd ff:ff:ff:ff:ff:ff
|
|
||||||
inet 192.168.1.10/24 brd 192.168.1.255 scope global dynamic eth0
|
|
||||||
valid_lft 86399sec preferred_lft 86399sec
|
|
||||||
inet6 fe80::20c:29ff:fe12:3456/64 scope link
|
|
||||||
valid_lft forever preferred_lft forever
|
|
||||||
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
|
|
||||||
link/ether 00:0c:29:ab:cd:ef brd ff:ff:ff:ff:ff:ff
|
|
||||||
inet 10.0.0.5/8 brd 10.255.255.255 scope global dynamic eth1
|
|
||||||
valid_lft 86399sec preferred_lft 86399sec
|
|
||||||
4: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN group default
|
|
||||||
link/ether 02:42:1c:00:00:00 brd ff:ff:ff:ff:ff:ff
|
|
||||||
inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0
|
|
||||||
valid_lft forever preferred_lft forever
|
|
||||||
"""),
|
|
||||||
# _find_gateway를 위한 mock (eth0)
|
|
||||||
(True, "default via 192.168.1.1 dev eth0"),
|
|
||||||
# _find_gateway를 위한 mock (eth1)
|
|
||||||
(True, "default via 10.0.0.1 dev eth1")
|
|
||||||
]
|
|
||||||
|
|
||||||
interfaces = self.manager.get_network_interfaces()
|
|
||||||
self.assertEqual(len(interfaces), 2)
|
|
||||||
|
|
||||||
eth0 = next(filter(lambda x: x['name'] == 'eth0', interfaces))
|
|
||||||
self.assertEqual(eth0['ip'], '192.168.1.10')
|
|
||||||
self.assertEqual(eth0['netmask'], '24')
|
|
||||||
self.assertEqual(eth0['gateway'], '192.168.1.1')
|
|
||||||
self.assertEqual(eth0['state'], 'UP')
|
|
||||||
|
|
||||||
eth1 = next(filter(lambda x: x['name'] == 'eth1', interfaces))
|
|
||||||
self.assertEqual(eth1['ip'], '10.0.0.5')
|
|
||||||
self.assertEqual(eth1['netmask'], '8')
|
|
||||||
self.assertEqual(eth1['gateway'], '10.0.0.1')
|
|
||||||
self.assertEqual(eth1['state'], 'UP')
|
|
||||||
|
|
||||||
# docker0와 lo는 제외되어야 함
|
|
||||||
self.assertNotIn('docker0', [i['name'] for i in interfaces])
|
|
||||||
self.assertNotIn('lo', [i['name'] for i in interfaces])
|
|
||||||
|
|
||||||
def test__find_gateway(self):
|
|
||||||
"""_find_gateway 함수 테스트"""
|
|
||||||
# Case 1: Gateway found in interface-specific route
|
|
||||||
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
|
|
||||||
mock_run_command.side_effect = [
|
|
||||||
(True, "default via 192.168.1.1 dev eth0 proto static")
|
|
||||||
]
|
|
||||||
self.assertEqual(self.manager._find_gateway("eth0"), "192.168.1.1")
|
|
||||||
mock_run_command.assert_called_once_with(["ip", "route", "show", "dev", "eth0"])
|
|
||||||
|
|
||||||
# Case 2: Gateway found in global route after interface-specific fails
|
|
||||||
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
|
|
||||||
mock_run_command.side_effect = [
|
|
||||||
(True, "192.168.1.0/24 dev eth1 proto kernel"), # No default via
|
|
||||||
(True, "default via 10.0.0.1 dev eth1 proto static") # Found in global
|
|
||||||
]
|
|
||||||
self.assertEqual(self.manager._find_gateway("eth1"), "10.0.0.1")
|
|
||||||
self.assertEqual(mock_run_command.call_count, 2)
|
|
||||||
mock_run_command.assert_has_calls([
|
|
||||||
mock.call(["ip", "route", "show", "dev", "eth1"]),
|
|
||||||
mock.call(["ip", "route", "show"])
|
|
||||||
])
|
|
||||||
|
|
||||||
# Case 3: No gateway found (empty output for both)
|
|
||||||
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
|
|
||||||
mock_run_command.side_effect = [
|
|
||||||
(True, ""), # No default via
|
|
||||||
(True, "") # No default via in global
|
|
||||||
]
|
|
||||||
self.assertIsNone(self.manager._find_gateway("eth2"))
|
|
||||||
self.assertEqual(mock_run_command.call_count, 2)
|
|
||||||
|
|
||||||
# Case 4: No gateway found (error for both)
|
|
||||||
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
|
|
||||||
mock_run_command.side_effect = [
|
|
||||||
(False, "Error dev"),
|
|
||||||
(False, "Error global")
|
|
||||||
]
|
|
||||||
self.assertIsNone(self.manager._find_gateway("eth3"))
|
|
||||||
self.assertEqual(mock_run_command.call_count, 2)
|
|
||||||
|
|
||||||
@patch('policy_routing.PolicyRoutingManager.run_command')
|
|
||||||
def test_apply_interface_routing(self, mock_run_command):
|
|
||||||
"""apply_interface_routing 함수 테스트"""
|
|
||||||
interface_info = {
|
|
||||||
"name": "eth0",
|
|
||||||
"ip": "192.168.1.10",
|
|
||||||
"gateway": "192.168.1.1",
|
|
||||||
"netmask": "24",
|
|
||||||
"state": "UP"
|
|
||||||
}
|
|
||||||
table_id = 101
|
|
||||||
priority = 30100
|
|
||||||
metric = 1000 # Add metric for testing
|
|
||||||
|
|
||||||
# run_command의 side_effect를 설정하여 각 호출에 대한 응답을 시뮬레이션
|
|
||||||
# 순서대로 호출될 명령에 대한 응답을 정의
|
|
||||||
mock_run_command.side_effect = [
|
|
||||||
(True, ""), # 1. ip rule del from 192.168.1.10/32
|
|
||||||
(True, ""), # 2. ip route show table 101 (empty, so flush is skipped)
|
|
||||||
(True, ""), # 3. ip route add network
|
|
||||||
(True, ""), # 4. ip route add default (with metric)
|
|
||||||
(True, ""), # 5. ip rule add from
|
|
||||||
(True, ""), # 6. ip rule add iif
|
|
||||||
(True, "30100: from 192.168.1.10 lookup 101"), # 7. ip rule show (verification)
|
|
||||||
(True, "default via 192.168.1.1 dev eth0 table 101 metric 1000") # 8. ip route show table 101 (verification)
|
|
||||||
]
|
|
||||||
|
|
||||||
success = self.manager.apply_interface_routing(interface_info, table_id, priority, metric)
|
|
||||||
|
|
||||||
self.assertTrue(success)
|
|
||||||
self.manager.logger.error.assert_not_called()
|
|
||||||
|
|
||||||
# run_command 호출 검증
|
|
||||||
expected_calls = [
|
|
||||||
mock.call(['ip', 'route', 'show', 'table', '101']), # This is the first call
|
|
||||||
mock.call(['ip', 'rule', 'del', 'from', '192.168.1.10/32'], ignore_errors=['No such file or directory']), # This is the second call
|
|
||||||
# mock.call(['ip', 'route', 'flush', 'table', '101'], ignore_errors=['No such file or directory']), # Removed, as table is empty
|
|
||||||
mock.call(['ip', 'route', 'add', '192.168.1.0/24', 'dev', 'eth0', 'src', '192.168.1.10', 'table', '101'], ignore_errors=['File exists']),
|
|
||||||
mock.call(['ip', 'route', 'add', 'default', 'via', '192.168.1.1', 'dev', 'eth0', 'table', '101', 'metric', '1000'], ignore_errors=['File exists']),
|
|
||||||
mock.call(['ip', 'rule', 'add', 'from', '192.168.1.10/32', 'table', '101', 'pref', '30100'], ignore_errors=['File exists']),
|
|
||||||
mock.call(['ip', 'rule', 'add', 'iif', 'eth0', 'table', '101', 'pref', '30101'], ignore_errors=['File exists']),
|
|
||||||
mock.call(['ip', 'rule', 'show']),
|
|
||||||
mock.call(['ip', 'route', 'show', 'table', '101'])
|
|
||||||
]
|
|
||||||
mock_run_command.assert_has_calls(expected_calls)
|
|
||||||
self.assertEqual(mock_run_command.call_count, len(expected_calls))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
||||||
Reference in New Issue
Block a user