Add Policy-Based Routing Manager and unit tests

- Implemented the Policy-Based Routing Manager in `policy_routing.py` for real-time network change detection.
- Added configuration management, network interface monitoring, and routing rule application features.
- Created a test suite in `test_policy_routing.py` to validate the functionality of the PolicyRoutingManager class.
- Included tests for network calculations, command execution, interface retrieval, and routing application.
- Mocked external dependencies to ensure tests do not affect the actual system configuration.
This commit is contained in:
2025-05-28 10:42:48 +09:00
parent cdd0d4a99e
commit 21211c5fc0
3 changed files with 459 additions and 12 deletions

210
.gitignore vendored Normal file
View File

@@ -0,0 +1,210 @@
# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,git
# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode,git
### Git ###
# Created by git for backups. To disable backups in Git:
# $ git config --global mergetool.keepBackup false
*.orig
# Created by git when using merge tools for conflicts
*.BACKUP.*
*.BASE.*
*.LOCAL.*
*.REMOTE.*
*_BACKUP_*.txt
*_BASE_*.txt
*_LOCAL_*.txt
*_REMOTE_*.txt
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,git

View File

@@ -149,15 +149,7 @@ class PolicyRoutingManager:
return str(network) return str(network)
except Exception as e: except Exception as e:
self.logger.error(f"네트워크 계산 실패: {ip}/{netmask} - {e}") self.logger.error(f"네트워크 계산 실패: {ip}/{netmask} - {e}")
# 폴백: 단순 계산 # 폴백: 유효하지 않은 IP/넷마스크의 경우 0.0.0.0/넷마스크 반환
ip_parts = ip.split(".")
if int(netmask) >= 24:
return f"{'.'.join(ip_parts[:-1])}.0/{netmask}"
elif int(netmask) >= 16:
return f"{'.'.join(ip_parts[:-2])}.0.0/{netmask}"
elif int(netmask) >= 8:
return f"{ip_parts[0]}.0.0.0/{netmask}"
else:
return f"0.0.0.0/{netmask}" return f"0.0.0.0/{netmask}"
def network_change_callback(self, source: str, action: str): def network_change_callback(self, source: str, action: str):
@@ -504,7 +496,7 @@ class PolicyRoutingManager:
self.logger.debug(f"적용 결과 확인 중...") self.logger.debug(f"적용 결과 확인 중...")
success, output = self.run_command(["ip", "rule", "show"]) success, output = self.run_command(["ip", "rule", "show"])
if success: if success:
if f"from {ip}" in output: if f"from {ip}" in output.strip(): # Add .strip()
self.logger.info(f"{name} 정책 규칙 적용 확인됨") self.logger.info(f"{name} 정책 규칙 적용 확인됨")
else: else:
self.logger.error(f"{name} 정책 규칙 적용 확인 실패") self.logger.error(f"{name} 정책 규칙 적용 확인 실패")
@@ -513,7 +505,7 @@ class PolicyRoutingManager:
success, output = self.run_command( success, output = self.run_command(
["ip", "route", "show", "table", str(table_id)] ["ip", "route", "show", "table", str(table_id)]
) )
if success and "default via" in output: if success and "default via" in output.strip(): # Add .strip()
self.logger.info(f"{name} 라우팅 테이블 적용 확인됨") self.logger.info(f"{name} 라우팅 테이블 적용 확인됨")
self.logger.debug(f"테이블 {table_id} 내용:\n{output}") self.logger.debug(f"테이블 {table_id} 내용:\n{output}")
else: else:

245
test_policy_routing.py Normal file
View File

@@ -0,0 +1,245 @@
import sys
import os
# 현재 스크립트의 디렉토리를 sys.path에 추가하여 policy_routing 모듈을 찾을 수 있도록 함
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '.')))
import unittest
from unittest.mock import patch, MagicMock
import unittest.mock as mock
import json
import subprocess
import logging
import ipaddress
# policy-routing.py에서 필요한 클래스와 상수 임포트
import policy_routing
# 로깅 비활성화 (테스트 시 불필요한 로그 출력 방지)
logging.disable(logging.CRITICAL)
class TestPolicyRoutingManager(unittest.TestCase):
def setUp(self):
"""각 테스트 전에 실행될 초기화 코드"""
# 테스트용 임시 설정 파일 경로 설정
self.test_config_file = "/tmp/test_policy_routing.json"
self.test_rt_tables_file = "/tmp/test_rt_tables"
# CONFIG_FILE과 RT_TABLES_FILE을 테스트용으로 오버라이드
# 실제 파일에 영향을 주지 않도록 패치
self.config_file_patch = patch('policy_routing.CONFIG_FILE', self.test_config_file)
self.rt_tables_file_patch = patch('policy_routing.RT_TABLES_FILE', self.test_rt_tables_file)
self.mock_config_file = self.config_file_patch.start()
self.mock_rt_tables_file = self.rt_tables_file_patch.start()
# 임시 설정 파일 생성 (기본값으로)
with open(self.test_config_file, 'w') as f:
json.dump(policy_routing.DEFAULT_CONFIG, f, indent=2)
# 임시 rt_tables 파일 생성
with open(self.test_rt_tables_file, 'w') as f:
f.write("#\n# reserved values\n#\n255\tlocal\n254\tmain\n253\tdefault\n0\tunspec\n")
self.manager = policy_routing.PolicyRoutingManager(debug=True)
# 로거를 테스트용으로 변경하여 실제 파일에 쓰지 않도록 함
self.manager.logger = MagicMock()
def tearDown(self):
"""각 테스트 후에 실행될 정리 코드"""
# 임시 파일 삭제
if os.path.exists(self.test_config_file):
os.remove(self.test_config_file)
if os.path.exists(self.test_rt_tables_file):
os.remove(self.test_rt_tables_file)
self.config_file_patch.stop()
self.rt_tables_file_patch.stop()
def test_calculate_network(self):
"""calculate_network 함수 테스트"""
self.assertEqual(self.manager.calculate_network("192.168.1.100", "24"), "192.168.1.0/24")
self.assertEqual(self.manager.calculate_network("10.0.0.5", "8"), "10.0.0.0/8")
self.assertEqual(self.manager.calculate_network("172.16.10.20", "16"), "172.16.0.0/16")
self.assertEqual(self.manager.calculate_network("192.168.1.1", "30"), "192.168.1.0/30")
# 잘못된 IP 형식
self.assertEqual(self.manager.calculate_network("invalid-ip", "24"), "0.0.0.0/24")
# 잘못된 넷마스크
self.assertEqual(self.manager.calculate_network("192.168.1.100", "abc"), "0.0.0.0/abc") # 이 경우 폴백 로직에 따라 달라질 수 있음
self.assertEqual(self.manager.calculate_network("192.168.1.100", "33"), "0.0.0.0/33") # 유효하지 않은 넷마스크
@patch('subprocess.run')
def test_run_command_success(self, mock_run):
"""run_command 성공 케이스 테스트"""
mock_run.return_value = MagicMock(stdout="Success output", stderr="", returncode=0)
success, output = self.manager.run_command(["echo", "hello"])
self.assertTrue(success)
self.assertEqual(output, "Success output")
mock_run.assert_called_once_with(["echo", "hello"], capture_output=True, text=True, check=True)
self.manager.logger.debug.assert_any_call("실행: echo hello")
self.manager.logger.debug.assert_any_call("성공: echo hello")
self.manager.logger.debug.assert_any_call("출력: Success output")
@patch('subprocess.run')
def test_run_command_failure(self, mock_run):
"""run_command 실패 케이스 테스트"""
mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["bad", "cmd"], stderr="Error output")
success, output = self.manager.run_command(["bad", "cmd"])
self.assertFalse(success)
self.assertEqual(output, "Error output")
self.manager.logger.error.assert_called_once_with("명령어 실행 실패: bad cmd - Error output")
@patch('subprocess.run')
def test_run_command_ignore_errors(self, mock_run):
"""run_command 특정 오류 무시 케이스 테스트"""
mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["ip", "route", "flush"], stderr="No such file or directory")
success, output = self.manager.run_command(["ip", "route", "flush"], ignore_errors=["No such file or directory"])
self.assertTrue(success)
self.assertEqual(output, "")
self.manager.logger.error.assert_not_called()
self.manager.logger.debug.assert_any_call("무시된 오류: ip route flush - No such file or directory")
self.manager.logger.debug.assert_any_call("실행: ip route flush")
self.assertEqual(self.manager.logger.debug.call_count, 2)
@patch('policy_routing.PolicyRoutingManager.run_command')
def test_get_network_interfaces(self, mock_run_command):
"""get_network_interfaces 함수 테스트"""
# 시뮬레이션할 ip addr show 출력
mock_run_command.side_effect = [
(True, """
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:12:34:56 brd ff:ff:ff:ff:ff:ff
inet 192.168.1.10/24 brd 192.168.1.255 scope global dynamic eth0
valid_lft 86399sec preferred_lft 86399sec
inet6 fe80::20c:29ff:fe12:3456/64 scope link
valid_lft forever preferred_lft forever
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:ab:cd:ef brd ff:ff:ff:ff:ff:ff
inet 10.0.0.5/8 brd 10.255.255.255 scope global dynamic eth1
valid_lft 86399sec preferred_lft 86399sec
4: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN group default
link/ether 02:42:1c:00:00:00 brd ff:ff:ff:ff:ff:ff
inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0
valid_lft forever preferred_lft forever
"""),
# _find_gateway를 위한 mock (eth0)
(True, "default via 192.168.1.1 dev eth0"),
# _find_gateway를 위한 mock (eth1)
(True, "default via 10.0.0.1 dev eth1")
]
interfaces = self.manager.get_network_interfaces()
self.assertEqual(len(interfaces), 2)
eth0 = next(filter(lambda x: x['name'] == 'eth0', interfaces))
self.assertEqual(eth0['ip'], '192.168.1.10')
self.assertEqual(eth0['netmask'], '24')
self.assertEqual(eth0['gateway'], '192.168.1.1')
self.assertEqual(eth0['state'], 'UP')
eth1 = next(filter(lambda x: x['name'] == 'eth1', interfaces))
self.assertEqual(eth1['ip'], '10.0.0.5')
self.assertEqual(eth1['netmask'], '8')
self.assertEqual(eth1['gateway'], '10.0.0.1')
self.assertEqual(eth1['state'], 'UP')
# docker0와 lo는 제외되어야 함
self.assertNotIn('docker0', [i['name'] for i in interfaces])
self.assertNotIn('lo', [i['name'] for i in interfaces])
def test__find_gateway(self):
"""_find_gateway 함수 테스트"""
# Case 1: Gateway found in interface-specific route
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(True, "default via 192.168.1.1 dev eth0 proto static")
]
self.assertEqual(self.manager._find_gateway("eth0"), "192.168.1.1")
mock_run_command.assert_called_once_with(["ip", "route", "show", "dev", "eth0"])
# Case 2: Gateway found in global route after interface-specific fails
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(True, "192.168.1.0/24 dev eth1 proto kernel"), # No default via
(True, "default via 10.0.0.1 dev eth1 proto static") # Found in global
]
self.assertEqual(self.manager._find_gateway("eth1"), "10.0.0.1")
self.assertEqual(mock_run_command.call_count, 2)
mock_run_command.assert_has_calls([
mock.call(["ip", "route", "show", "dev", "eth1"]),
mock.call(["ip", "route", "show"])
])
# Case 3: No gateway found (empty output for both)
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(True, ""), # No default via
(True, "") # No default via in global
]
self.assertIsNone(self.manager._find_gateway("eth2"))
self.assertEqual(mock_run_command.call_count, 2)
# Case 4: No gateway found (error for both)
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(False, "Error dev"),
(False, "Error global")
]
self.assertIsNone(self.manager._find_gateway("eth3"))
self.assertEqual(mock_run_command.call_count, 2)
@patch('policy_routing.PolicyRoutingManager.run_command')
def test_apply_interface_routing(self, mock_run_command):
"""apply_interface_routing 함수 테스트"""
interface_info = {
"name": "eth0",
"ip": "192.168.1.10",
"gateway": "192.168.1.1",
"netmask": "24",
"state": "UP"
}
table_id = 101
priority = 30100
# run_command의 side_effect를 설정하여 각 호출에 대한 응답을 시뮬레이션
# 순서대로 호출될 명령에 대한 응답을 정의
mock_run_command.side_effect = [
(True, ""), # 1. ip rule del from 192.168.1.10/32
(True, ""), # 2. ip route show table 101 (empty, so flush is skipped)
(True, ""), # 3. ip route add network
(True, ""), # 4. ip route add default
(True, ""), # 5. ip rule add from
(True, ""), # 6. ip rule add iif
(True, "30100: from 192.168.1.10 lookup 101"), # 7. ip rule show (verification)
(True, "default via 192.168.1.1 dev eth0 table 101") # 8. ip route show table 101 (verification)
]
success = self.manager.apply_interface_routing(interface_info, table_id, priority)
self.assertTrue(success)
self.manager.logger.error.assert_not_called()
# run_command 호출 검증
expected_calls = [
mock.call(['ip', 'route', 'show', 'table', '101']), # This is the first call
mock.call(['ip', 'rule', 'del', 'from', '192.168.1.10/32'], ignore_errors=['No such file or directory']), # This is the second call
# mock.call(['ip', 'route', 'flush', 'table', '101'], ignore_errors=['No such file or directory']), # Removed, as table is empty
mock.call(['ip', 'route', 'add', '192.168.1.0/24', 'dev', 'eth0', 'src', '192.168.1.10', 'table', '101'], ignore_errors=['File exists']),
mock.call(['ip', 'route', 'add', 'default', 'via', '192.168.1.1', 'dev', 'eth0', 'table', '101'], ignore_errors=['File exists']),
mock.call(['ip', 'rule', 'add', 'from', '192.168.1.10/32', 'table', '101', 'pref', '30100'], ignore_errors=['File exists']),
mock.call(['ip', 'rule', 'add', 'iif', 'eth0', 'table', '101', 'pref', '30101'], ignore_errors=['File exists']),
mock.call(['ip', 'rule', 'show']),
mock.call(['ip', 'route', 'show', 'table', '101'])
]
mock_run_command.assert_has_calls(expected_calls)
self.assertEqual(mock_run_command.call_count, len(expected_calls))
if __name__ == '__main__':
unittest.main()