diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..10488a7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,210 @@ +# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,git +# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode,git + +### Git ### +# Created by git for backups. To disable backups in Git: +# $ git config --global mergetool.keepBackup false +*.orig + +# Created by git when using merge tools for conflicts +*.BACKUP.* +*.BASE.* +*.LOCAL.* +*.REMOTE.* +*_BACKUP_*.txt +*_BASE_*.txt +*_LOCAL_*.txt +*_REMOTE_*.txt + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +### Python Patch ### +# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration +poetry.toml + +# ruff +.ruff_cache/ + +# LSP config files +pyrightconfig.json + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide + +# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,git \ No newline at end of file diff --git a/policy-routing.py b/policy_routing.py similarity index 98% rename from policy-routing.py rename to policy_routing.py index 41baed1..8ef4993 100644 --- a/policy-routing.py +++ b/policy_routing.py @@ -149,16 +149,8 @@ class PolicyRoutingManager: return str(network) except Exception as e: self.logger.error(f"네트워크 계산 실패: {ip}/{netmask} - {e}") - # 폴백: 단순 계산 - ip_parts = ip.split(".") - if int(netmask) >= 24: - return f"{'.'.join(ip_parts[:-1])}.0/{netmask}" - elif int(netmask) >= 16: - return f"{'.'.join(ip_parts[:-2])}.0.0/{netmask}" - elif int(netmask) >= 8: - return f"{ip_parts[0]}.0.0.0/{netmask}" - else: - return f"0.0.0.0/{netmask}" + # 폴백: 유효하지 않은 IP/넷마스크의 경우 0.0.0.0/넷마스크 반환 + return f"0.0.0.0/{netmask}" def network_change_callback(self, source: str, action: str): """네트워크 변화 콜백""" @@ -504,7 +496,7 @@ class PolicyRoutingManager: self.logger.debug(f"적용 결과 확인 중...") success, output = self.run_command(["ip", "rule", "show"]) if success: - if f"from {ip}" in output: + if f"from {ip}" in output.strip(): # Add .strip() self.logger.info(f"✅ {name} 정책 규칙 적용 확인됨") else: self.logger.error(f"❌ {name} 정책 규칙 적용 확인 실패") @@ -513,7 +505,7 @@ class PolicyRoutingManager: success, output = self.run_command( ["ip", "route", "show", "table", str(table_id)] ) - if success and "default via" in output: + if success and "default via" in output.strip(): # Add .strip() self.logger.info(f"✅ {name} 라우팅 테이블 적용 확인됨") self.logger.debug(f"테이블 {table_id} 내용:\n{output}") else: diff --git a/test_policy_routing.py b/test_policy_routing.py new file mode 100644 index 0000000..17a2441 --- /dev/null +++ b/test_policy_routing.py @@ -0,0 +1,245 @@ +import sys +import os +# 현재 스크립트의 디렉토리를 sys.path에 추가하여 policy_routing 모듈을 찾을 수 있도록 함 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '.'))) + +import unittest +from unittest.mock import patch, MagicMock +import unittest.mock as mock +import json +import subprocess +import logging +import ipaddress + +# policy-routing.py에서 필요한 클래스와 상수 임포트 +import policy_routing + +# 로깅 비활성화 (테스트 시 불필요한 로그 출력 방지) +logging.disable(logging.CRITICAL) + +class TestPolicyRoutingManager(unittest.TestCase): + + def setUp(self): + """각 테스트 전에 실행될 초기화 코드""" + # 테스트용 임시 설정 파일 경로 설정 + self.test_config_file = "/tmp/test_policy_routing.json" + self.test_rt_tables_file = "/tmp/test_rt_tables" + + # CONFIG_FILE과 RT_TABLES_FILE을 테스트용으로 오버라이드 + # 실제 파일에 영향을 주지 않도록 패치 + self.config_file_patch = patch('policy_routing.CONFIG_FILE', self.test_config_file) + self.rt_tables_file_patch = patch('policy_routing.RT_TABLES_FILE', self.test_rt_tables_file) + + self.mock_config_file = self.config_file_patch.start() + self.mock_rt_tables_file = self.rt_tables_file_patch.start() + + # 임시 설정 파일 생성 (기본값으로) + with open(self.test_config_file, 'w') as f: + json.dump(policy_routing.DEFAULT_CONFIG, f, indent=2) + + # 임시 rt_tables 파일 생성 + with open(self.test_rt_tables_file, 'w') as f: + f.write("#\n# reserved values\n#\n255\tlocal\n254\tmain\n253\tdefault\n0\tunspec\n") + + self.manager = policy_routing.PolicyRoutingManager(debug=True) + # 로거를 테스트용으로 변경하여 실제 파일에 쓰지 않도록 함 + self.manager.logger = MagicMock() + + def tearDown(self): + """각 테스트 후에 실행될 정리 코드""" + # 임시 파일 삭제 + if os.path.exists(self.test_config_file): + os.remove(self.test_config_file) + if os.path.exists(self.test_rt_tables_file): + os.remove(self.test_rt_tables_file) + + self.config_file_patch.stop() + self.rt_tables_file_patch.stop() + + def test_calculate_network(self): + """calculate_network 함수 테스트""" + self.assertEqual(self.manager.calculate_network("192.168.1.100", "24"), "192.168.1.0/24") + self.assertEqual(self.manager.calculate_network("10.0.0.5", "8"), "10.0.0.0/8") + self.assertEqual(self.manager.calculate_network("172.16.10.20", "16"), "172.16.0.0/16") + self.assertEqual(self.manager.calculate_network("192.168.1.1", "30"), "192.168.1.0/30") + + # 잘못된 IP 형식 + self.assertEqual(self.manager.calculate_network("invalid-ip", "24"), "0.0.0.0/24") + # 잘못된 넷마스크 + self.assertEqual(self.manager.calculate_network("192.168.1.100", "abc"), "0.0.0.0/abc") # 이 경우 폴백 로직에 따라 달라질 수 있음 + self.assertEqual(self.manager.calculate_network("192.168.1.100", "33"), "0.0.0.0/33") # 유효하지 않은 넷마스크 + + @patch('subprocess.run') + def test_run_command_success(self, mock_run): + """run_command 성공 케이스 테스트""" + mock_run.return_value = MagicMock(stdout="Success output", stderr="", returncode=0) + success, output = self.manager.run_command(["echo", "hello"]) + self.assertTrue(success) + self.assertEqual(output, "Success output") + mock_run.assert_called_once_with(["echo", "hello"], capture_output=True, text=True, check=True) + self.manager.logger.debug.assert_any_call("실행: echo hello") + self.manager.logger.debug.assert_any_call("성공: echo hello") + self.manager.logger.debug.assert_any_call("출력: Success output") + + @patch('subprocess.run') + def test_run_command_failure(self, mock_run): + """run_command 실패 케이스 테스트""" + mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["bad", "cmd"], stderr="Error output") + success, output = self.manager.run_command(["bad", "cmd"]) + self.assertFalse(success) + self.assertEqual(output, "Error output") + self.manager.logger.error.assert_called_once_with("명령어 실행 실패: bad cmd - Error output") + + @patch('subprocess.run') + def test_run_command_ignore_errors(self, mock_run): + """run_command 특정 오류 무시 케이스 테스트""" + mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["ip", "route", "flush"], stderr="No such file or directory") + success, output = self.manager.run_command(["ip", "route", "flush"], ignore_errors=["No such file or directory"]) + self.assertTrue(success) + self.assertEqual(output, "") + + self.manager.logger.error.assert_not_called() + self.manager.logger.debug.assert_any_call("무시된 오류: ip route flush - No such file or directory") + self.manager.logger.debug.assert_any_call("실행: ip route flush") + self.assertEqual(self.manager.logger.debug.call_count, 2) + + @patch('policy_routing.PolicyRoutingManager.run_command') + def test_get_network_interfaces(self, mock_run_command): + """get_network_interfaces 함수 테스트""" + # 시뮬레이션할 ip addr show 출력 + mock_run_command.side_effect = [ + (True, """ +1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 + link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 + inet 127.0.0.1/8 scope host lo + valid_lft forever preferred_lft forever +2: eth0: mtu 1500 qdisc pfifo_fast state UP group default qlen 1000 + link/ether 00:0c:29:12:34:56 brd ff:ff:ff:ff:ff:ff + inet 192.168.1.10/24 brd 192.168.1.255 scope global dynamic eth0 + valid_lft 86399sec preferred_lft 86399sec + inet6 fe80::20c:29ff:fe12:3456/64 scope link + valid_lft forever preferred_lft forever +3: eth1: mtu 1500 qdisc pfifo_fast state UP group default qlen 1000 + link/ether 00:0c:29:ab:cd:ef brd ff:ff:ff:ff:ff:ff + inet 10.0.0.5/8 brd 10.255.255.255 scope global dynamic eth1 + valid_lft 86399sec preferred_lft 86399sec +4: docker0: mtu 1500 qdisc noqueue state DOWN group default + link/ether 02:42:1c:00:00:00 brd ff:ff:ff:ff:ff:ff + inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0 + valid_lft forever preferred_lft forever + """), + # _find_gateway를 위한 mock (eth0) + (True, "default via 192.168.1.1 dev eth0"), + # _find_gateway를 위한 mock (eth1) + (True, "default via 10.0.0.1 dev eth1") + ] + + interfaces = self.manager.get_network_interfaces() + self.assertEqual(len(interfaces), 2) + + eth0 = next(filter(lambda x: x['name'] == 'eth0', interfaces)) + self.assertEqual(eth0['ip'], '192.168.1.10') + self.assertEqual(eth0['netmask'], '24') + self.assertEqual(eth0['gateway'], '192.168.1.1') + self.assertEqual(eth0['state'], 'UP') + + eth1 = next(filter(lambda x: x['name'] == 'eth1', interfaces)) + self.assertEqual(eth1['ip'], '10.0.0.5') + self.assertEqual(eth1['netmask'], '8') + self.assertEqual(eth1['gateway'], '10.0.0.1') + self.assertEqual(eth1['state'], 'UP') + + # docker0와 lo는 제외되어야 함 + self.assertNotIn('docker0', [i['name'] for i in interfaces]) + self.assertNotIn('lo', [i['name'] for i in interfaces]) + + def test__find_gateway(self): + """_find_gateway 함수 테스트""" + # Case 1: Gateway found in interface-specific route + with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command: + mock_run_command.side_effect = [ + (True, "default via 192.168.1.1 dev eth0 proto static") + ] + self.assertEqual(self.manager._find_gateway("eth0"), "192.168.1.1") + mock_run_command.assert_called_once_with(["ip", "route", "show", "dev", "eth0"]) + + # Case 2: Gateway found in global route after interface-specific fails + with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command: + mock_run_command.side_effect = [ + (True, "192.168.1.0/24 dev eth1 proto kernel"), # No default via + (True, "default via 10.0.0.1 dev eth1 proto static") # Found in global + ] + self.assertEqual(self.manager._find_gateway("eth1"), "10.0.0.1") + self.assertEqual(mock_run_command.call_count, 2) + mock_run_command.assert_has_calls([ + mock.call(["ip", "route", "show", "dev", "eth1"]), + mock.call(["ip", "route", "show"]) + ]) + + # Case 3: No gateway found (empty output for both) + with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command: + mock_run_command.side_effect = [ + (True, ""), # No default via + (True, "") # No default via in global + ] + self.assertIsNone(self.manager._find_gateway("eth2")) + self.assertEqual(mock_run_command.call_count, 2) + + # Case 4: No gateway found (error for both) + with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command: + mock_run_command.side_effect = [ + (False, "Error dev"), + (False, "Error global") + ] + self.assertIsNone(self.manager._find_gateway("eth3")) + self.assertEqual(mock_run_command.call_count, 2) + + @patch('policy_routing.PolicyRoutingManager.run_command') + def test_apply_interface_routing(self, mock_run_command): + """apply_interface_routing 함수 테스트""" + interface_info = { + "name": "eth0", + "ip": "192.168.1.10", + "gateway": "192.168.1.1", + "netmask": "24", + "state": "UP" + } + table_id = 101 + priority = 30100 + + # run_command의 side_effect를 설정하여 각 호출에 대한 응답을 시뮬레이션 + # 순서대로 호출될 명령에 대한 응답을 정의 + mock_run_command.side_effect = [ + (True, ""), # 1. ip rule del from 192.168.1.10/32 + (True, ""), # 2. ip route show table 101 (empty, so flush is skipped) + (True, ""), # 3. ip route add network + (True, ""), # 4. ip route add default + (True, ""), # 5. ip rule add from + (True, ""), # 6. ip rule add iif + (True, "30100: from 192.168.1.10 lookup 101"), # 7. ip rule show (verification) + (True, "default via 192.168.1.1 dev eth0 table 101") # 8. ip route show table 101 (verification) + ] + + success = self.manager.apply_interface_routing(interface_info, table_id, priority) + + self.assertTrue(success) + self.manager.logger.error.assert_not_called() + + # run_command 호출 검증 + expected_calls = [ + mock.call(['ip', 'route', 'show', 'table', '101']), # This is the first call + mock.call(['ip', 'rule', 'del', 'from', '192.168.1.10/32'], ignore_errors=['No such file or directory']), # This is the second call + # mock.call(['ip', 'route', 'flush', 'table', '101'], ignore_errors=['No such file or directory']), # Removed, as table is empty + mock.call(['ip', 'route', 'add', '192.168.1.0/24', 'dev', 'eth0', 'src', '192.168.1.10', 'table', '101'], ignore_errors=['File exists']), + mock.call(['ip', 'route', 'add', 'default', 'via', '192.168.1.1', 'dev', 'eth0', 'table', '101'], ignore_errors=['File exists']), + mock.call(['ip', 'rule', 'add', 'from', '192.168.1.10/32', 'table', '101', 'pref', '30100'], ignore_errors=['File exists']), + mock.call(['ip', 'rule', 'add', 'iif', 'eth0', 'table', '101', 'pref', '30101'], ignore_errors=['File exists']), + mock.call(['ip', 'rule', 'show']), + mock.call(['ip', 'route', 'show', 'table', '101']) + ] + mock_run_command.assert_has_calls(expected_calls) + self.assertEqual(mock_run_command.call_count, len(expected_calls)) + + +if __name__ == '__main__': + unittest.main()