15 Commits
v0.1 ... v0.3

Author SHA1 Message Date
4d39fb678d README.md 및 cloud-init 스크립트의 다운로드 URL을 최신 버전(v0.3)으로 업데이트하고, 테스트 파일을 삭제했습니다. 2025-06-04 15:53:45 +09:00
8b8690ba79 네트워크 인터페이스 자동 감지 로직 수정 및 종료 스크립트 생성 기능 추가, 사용자 확인 옵션 개선 2025-06-02 22:42:41 +09:00
383f7cd2f2 NIC 설정 저장 및 로드 기능 추가, NIC 변경 감지 및 동적 규칙 적용 로직 개선 2025-06-02 21:52:27 +09:00
7839226e72 Implement code changes to enhance functionality and improve performance 2025-06-02 16:51:59 +09:00
0ae3cfb507 Netlink 상수 추가 및 인터페이스 라우팅에 메트릭 지원 추가 2025-05-30 03:00:37 +09:00
1d27875bd9 README.md에 로컬 자동 PBR 시스템 구성 섹션 추가 및 packer-openstack-ubuntu.json 파일 생성 2025-05-28 20:07:36 +09:00
607dac267e Merge branch 'main' of ssh://ssh.dmslab.xyz:12100/dmslab/policy-routing 2025-05-28 15:40:00 +09:00
4be94ade77 .gitlab-ci.yml에 Python 테스트 추가 및 cloud-init 스크립트 생성, policy_routing.py에서 서비스 재시작 명령 추가 2025-05-28 15:38:59 +09:00
77af62ed56 라이센스 추가 2025-05-28 01:50:12 +00:00
2e67e8f777 Merge branch 'set-secret-detection-config-1' into 'main'
`.gitlab-ci.yml`에서 비밀 탐지를 설정하고 이 파일이 없으면 생성합니다.

See merge request dmslab/policy-routing!3
2025-05-28 01:47:27 +00:00
c8b62a71e1 .gitlab-ci.yml에서 비밀 탐지를 설정하고 이 파일이 없으면 생성합니다. 2025-05-28 01:47:19 +00:00
38e473e336 Merge branch 'set-sast-iac-config-1' into 'main'
`.gitlab-ci.yml`에 SAST IaC를 구성하고 이 파일이 없으면 생성합니다.

See merge request dmslab/policy-routing!2
2025-05-28 01:46:52 +00:00
16a19e8da9 .gitlab-ci.yml에 SAST IaC를 구성하고 이 파일이 없으면 생성합니다. 2025-05-28 01:46:43 +00:00
3bc1a9cf33 Merge branch 'set-sast-config-1' into 'main'
`.gitlab-ci.yml`에 SAST를 구성하고, 아직 존재하지 않는 경우 이 파일을 생성합니다.

See merge request dmslab/policy-routing!1
2025-05-28 01:46:34 +00:00
1da112f914 .gitlab-ci.yml에 SAST를 구성하고, 아직 존재하지 않는 경우 이 파일을 생성합니다. 2025-05-28 01:46:18 +00:00
7 changed files with 991 additions and 1100 deletions

33
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,33 @@
# You can override the included template(s) by including variable overrides
# SAST customization: https://docs.gitlab.com/ee/user/application_security/sast/#customizing-the-sast-settings
# Secret Detection customization: https://docs.gitlab.com/ee/user/application_security/secret_detection/pipeline/#customization
# Dependency Scanning customization: https://docs.gitlab.com/ee/user/application_security/dependency_scanning/#customizing-the-dependency-scanning-settings
# Container Scanning customization: https://docs.gitlab.com/ee/user/application_security/container_scanning/#customizing-the-container-scanning-settings
# Note that environment variables can be set in several places
# See https://docs.gitlab.com/ee/ci/variables/#cicd-variable-precedence
stages:
- build
- test
- deploy
- review
- dast
- staging
- canary
- production
- incremental rollout 10%
- incremental rollout 25%
- incremental rollout 50%
- incremental rollout 100%
- performance
- cleanup
sast:
stage: test
include:
- template: Auto-DevOps.gitlab-ci.yml
python_tests:
stage: test
image: python:3.9-slim-buster
script:
- pip install pytest
- pytest test_policy_routing.py

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 봉정근
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

36
README.md Normal file
View File

@@ -0,0 +1,36 @@
# Policy Routing
이 프로젝트는 정책 기반 라우팅을 구현하기 위한 Python 스크립트입니다. 이 스크립트는 특정 IP 주소에 대해 지정된 게이트웨이를 사용하여 패킷을 라우팅합니다.
사전 조건으로는 `iproute2` 패키지가 설치되어 있어야 하며, 이 패키지는 Linux 시스템에서 네트워크 인터페이스와 라우팅 테이블을 관리하는 데 사용됩니다.
NIC 의 ip 설정이 미리 되어 있어야 합니다.
## 기능
- 특정 IP 주소에 대해 지정된 게이트웨이를 사용하여 패킷 라우팅
- 라우팅 테이블을 생성하고, 해당 테이블에 규칙을 추가하여 정책 기반 라우팅을 구현
- 자동으로 NIC를 검색하고, 해당 NIC에 대한 라우팅 테이블을 설정
# 사용 방법
## 로컬에 자동 PBR 시스템 구성
스크립트는 아래 명령어로 다운로드 받을 수 있습니다
```bash
wget -O policy_routing.py https://git.dmslab.xyz/dmslab/policy-routing/-/raw/v0.3/policy_routing.py
# or
curl -o policy_routing.py https://git.dmslab.xyz/dmslab/policy-routing/-/raw/v0.3/policy_routing.py
```
다운로드한 스크립트를 setup 옵션으로 시스템 데몬으로 설치할 수 있습니다
```bash
sudo python3 policy_routing.py setup
```
ip rule 을 확인하여 정책 기반 라우팅이 설정되었는지 확인할 수 있습니다.
```bash
ip rule ls
```

View File

@@ -0,0 +1,65 @@
{
"variables": {
"openstack_auth_url": "{{env `OS_AUTH_URL`}}",
"openstack_username": "{{env `OS_USERNAME`}}",
"openstack_password": "{{env `OS_PASSWORD`}}",
"openstack_tenant_name": "{{env `OS_TENANT_NAME`}}",
"openstack_domain_name": "{{env `OS_USER_DOMAIN_NAME`}}",
"openstack_region": "{{env `OS_REGION_NAME`}}",
"source_image_id": "{{env `OS_SOURCE_IMAGE_ID`}}",
"flavor_name": "cpu.2c_2g",
"network_name": "{{env `OS_NETWORK_NAME`}}",
"image_name": "ubuntu 22.04-{{timestamp}} server",
"floating_ip_pool": "{{env `OS_FLOATING_IP_POOL`}}",
"ssh_username": "ubuntu"
},
"builders": [
{
"type": "openstack",
"identity_endpoint": "{{user `openstack_auth_url`}}",
"username": "{{user `openstack_username`}}",
"password": "{{user `openstack_password`}}",
"tenant_name": "{{user `openstack_tenant_name`}}",
"domain_name": "{{user `openstack_domain_name`}}",
"region": "{{user `openstack_region`}}",
"image_name": "{{user `image_name`}}",
"source_image": "{{user `source_image_id`}}",
"flavor": "{{user `flavor_name`}}",
"networks": [
"{{user `network_name`}}"
],
"ssh_username": "{{user `ssh_username`}}",
"security_groups": [
"default"
],
"floating_ip_pool": "private_provider",
"use_floating_ip": true,
"ssh_timeout": "10m",
"image_disk_format": "raw",
"use_blockstorage_volume": true
}
],
"provisioners": [
{
"type": "shell",
"inline": [
"sudo apt-get update",
"sudo apt-get upgrade -y",
"sudo apt-get autoremove -y",
"echo 'Initial system updates and cleanup complete.'"
]
},
{
"type": "file",
"source": "pbr-script-cloud-init.yaml",
"destination": "/tmp/pbr-script-cloud-init.yaml"
},
{
"type": "shell",
"inline": [
"sudo mv /tmp/pbr-script-cloud-init.yaml /etc/cloud/cloud.cfg.d/99-custom-pbr-script.cfg",
"echo 'Cloud-init configuration moved to /etc/cloud/cloud.cfg.d/'"
]
}
]
}

View File

@@ -0,0 +1,45 @@
#cloud-config
write_files:
- path: /tmp/pbr-script-cloud-init.sh
permissions: '0755'
owner: root:root
content: |
#!/bin/bash
# GitLab 스크립트 URL (공개 저장소 또는 접근 가능한 URL)
# 예시: GitLab Pages, Raw 파일 URL 등
# private repository인 경우 인증 관련 부분을 추가해야 합니다. (아래 설명)
SCRIPT_URL="https://git.dmslab.xyz/dmslab/policy-routing/-/raw/v0.3/policy_routing.py"
DEST_PATH="/opt/PBR/routing.py"
# 스크립트 저장될 디렉토리 생성 (필요하다면)
mkdir -p $(dirname "${DEST_PATH}")
echo "Downloading script from ${SCRIPT_URL}..."
# wget 또는 curl 사용
# wget이 일반적으로 더 많이 사용됨
if command -v wget &> /dev/null
then
wget -O "${DEST_PATH}" "${SCRIPT_URL}"
elif command -v curl &> /dev/null
then
curl -o "${DEST_PATH}" "${SCRIPT_URL}"
else
echo "Error: Neither wget nor curl found. Cannot download script."
exit 1
fi
if [ $? -eq 0 ]; then
echo "Script downloaded successfully to ${DEST_PATH}. Executing..."
chmod +x "${DEST_PATH}" # 실행 권한 부여
"${DEST_PATH}" install # 스크립트 실행
else
echo "Error: Failed to download script from ${SCRIPT_URL}."
exit 1
fi
echo "Script execution finished."
runcmd:
- /tmp/pbr-script-cloud-init.sh

File diff suppressed because it is too large Load Diff

View File

@@ -1,245 +0,0 @@
import sys
import os
# 현재 스크립트의 디렉토리를 sys.path에 추가하여 policy_routing 모듈을 찾을 수 있도록 함
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '.')))
import unittest
from unittest.mock import patch, MagicMock
import unittest.mock as mock
import json
import subprocess
import logging
import ipaddress
# policy-routing.py에서 필요한 클래스와 상수 임포트
import policy_routing
# 로깅 비활성화 (테스트 시 불필요한 로그 출력 방지)
logging.disable(logging.CRITICAL)
class TestPolicyRoutingManager(unittest.TestCase):
def setUp(self):
"""각 테스트 전에 실행될 초기화 코드"""
# 테스트용 임시 설정 파일 경로 설정
self.test_config_file = "/tmp/test_policy_routing.json"
self.test_rt_tables_file = "/tmp/test_rt_tables"
# CONFIG_FILE과 RT_TABLES_FILE을 테스트용으로 오버라이드
# 실제 파일에 영향을 주지 않도록 패치
self.config_file_patch = patch('policy_routing.CONFIG_FILE', self.test_config_file)
self.rt_tables_file_patch = patch('policy_routing.RT_TABLES_FILE', self.test_rt_tables_file)
self.mock_config_file = self.config_file_patch.start()
self.mock_rt_tables_file = self.rt_tables_file_patch.start()
# 임시 설정 파일 생성 (기본값으로)
with open(self.test_config_file, 'w') as f:
json.dump(policy_routing.DEFAULT_CONFIG, f, indent=2)
# 임시 rt_tables 파일 생성
with open(self.test_rt_tables_file, 'w') as f:
f.write("#\n# reserved values\n#\n255\tlocal\n254\tmain\n253\tdefault\n0\tunspec\n")
self.manager = policy_routing.PolicyRoutingManager(debug=True)
# 로거를 테스트용으로 변경하여 실제 파일에 쓰지 않도록 함
self.manager.logger = MagicMock()
def tearDown(self):
"""각 테스트 후에 실행될 정리 코드"""
# 임시 파일 삭제
if os.path.exists(self.test_config_file):
os.remove(self.test_config_file)
if os.path.exists(self.test_rt_tables_file):
os.remove(self.test_rt_tables_file)
self.config_file_patch.stop()
self.rt_tables_file_patch.stop()
def test_calculate_network(self):
"""calculate_network 함수 테스트"""
self.assertEqual(self.manager.calculate_network("192.168.1.100", "24"), "192.168.1.0/24")
self.assertEqual(self.manager.calculate_network("10.0.0.5", "8"), "10.0.0.0/8")
self.assertEqual(self.manager.calculate_network("172.16.10.20", "16"), "172.16.0.0/16")
self.assertEqual(self.manager.calculate_network("192.168.1.1", "30"), "192.168.1.0/30")
# 잘못된 IP 형식
self.assertEqual(self.manager.calculate_network("invalid-ip", "24"), "0.0.0.0/24")
# 잘못된 넷마스크
self.assertEqual(self.manager.calculate_network("192.168.1.100", "abc"), "0.0.0.0/abc") # 이 경우 폴백 로직에 따라 달라질 수 있음
self.assertEqual(self.manager.calculate_network("192.168.1.100", "33"), "0.0.0.0/33") # 유효하지 않은 넷마스크
@patch('subprocess.run')
def test_run_command_success(self, mock_run):
"""run_command 성공 케이스 테스트"""
mock_run.return_value = MagicMock(stdout="Success output", stderr="", returncode=0)
success, output = self.manager.run_command(["echo", "hello"])
self.assertTrue(success)
self.assertEqual(output, "Success output")
mock_run.assert_called_once_with(["echo", "hello"], capture_output=True, text=True, check=True)
self.manager.logger.debug.assert_any_call("실행: echo hello")
self.manager.logger.debug.assert_any_call("성공: echo hello")
self.manager.logger.debug.assert_any_call("출력: Success output")
@patch('subprocess.run')
def test_run_command_failure(self, mock_run):
"""run_command 실패 케이스 테스트"""
mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["bad", "cmd"], stderr="Error output")
success, output = self.manager.run_command(["bad", "cmd"])
self.assertFalse(success)
self.assertEqual(output, "Error output")
self.manager.logger.error.assert_called_once_with("명령어 실행 실패: bad cmd - Error output")
@patch('subprocess.run')
def test_run_command_ignore_errors(self, mock_run):
"""run_command 특정 오류 무시 케이스 테스트"""
mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd=["ip", "route", "flush"], stderr="No such file or directory")
success, output = self.manager.run_command(["ip", "route", "flush"], ignore_errors=["No such file or directory"])
self.assertTrue(success)
self.assertEqual(output, "")
self.manager.logger.error.assert_not_called()
self.manager.logger.debug.assert_any_call("무시된 오류: ip route flush - No such file or directory")
self.manager.logger.debug.assert_any_call("실행: ip route flush")
self.assertEqual(self.manager.logger.debug.call_count, 2)
@patch('policy_routing.PolicyRoutingManager.run_command')
def test_get_network_interfaces(self, mock_run_command):
"""get_network_interfaces 함수 테스트"""
# 시뮬레이션할 ip addr show 출력
mock_run_command.side_effect = [
(True, """
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:12:34:56 brd ff:ff:ff:ff:ff:ff
inet 192.168.1.10/24 brd 192.168.1.255 scope global dynamic eth0
valid_lft 86399sec preferred_lft 86399sec
inet6 fe80::20c:29ff:fe12:3456/64 scope link
valid_lft forever preferred_lft forever
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:ab:cd:ef brd ff:ff:ff:ff:ff:ff
inet 10.0.0.5/8 brd 10.255.255.255 scope global dynamic eth1
valid_lft 86399sec preferred_lft 86399sec
4: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN group default
link/ether 02:42:1c:00:00:00 brd ff:ff:ff:ff:ff:ff
inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0
valid_lft forever preferred_lft forever
"""),
# _find_gateway를 위한 mock (eth0)
(True, "default via 192.168.1.1 dev eth0"),
# _find_gateway를 위한 mock (eth1)
(True, "default via 10.0.0.1 dev eth1")
]
interfaces = self.manager.get_network_interfaces()
self.assertEqual(len(interfaces), 2)
eth0 = next(filter(lambda x: x['name'] == 'eth0', interfaces))
self.assertEqual(eth0['ip'], '192.168.1.10')
self.assertEqual(eth0['netmask'], '24')
self.assertEqual(eth0['gateway'], '192.168.1.1')
self.assertEqual(eth0['state'], 'UP')
eth1 = next(filter(lambda x: x['name'] == 'eth1', interfaces))
self.assertEqual(eth1['ip'], '10.0.0.5')
self.assertEqual(eth1['netmask'], '8')
self.assertEqual(eth1['gateway'], '10.0.0.1')
self.assertEqual(eth1['state'], 'UP')
# docker0와 lo는 제외되어야 함
self.assertNotIn('docker0', [i['name'] for i in interfaces])
self.assertNotIn('lo', [i['name'] for i in interfaces])
def test__find_gateway(self):
"""_find_gateway 함수 테스트"""
# Case 1: Gateway found in interface-specific route
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(True, "default via 192.168.1.1 dev eth0 proto static")
]
self.assertEqual(self.manager._find_gateway("eth0"), "192.168.1.1")
mock_run_command.assert_called_once_with(["ip", "route", "show", "dev", "eth0"])
# Case 2: Gateway found in global route after interface-specific fails
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(True, "192.168.1.0/24 dev eth1 proto kernel"), # No default via
(True, "default via 10.0.0.1 dev eth1 proto static") # Found in global
]
self.assertEqual(self.manager._find_gateway("eth1"), "10.0.0.1")
self.assertEqual(mock_run_command.call_count, 2)
mock_run_command.assert_has_calls([
mock.call(["ip", "route", "show", "dev", "eth1"]),
mock.call(["ip", "route", "show"])
])
# Case 3: No gateway found (empty output for both)
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(True, ""), # No default via
(True, "") # No default via in global
]
self.assertIsNone(self.manager._find_gateway("eth2"))
self.assertEqual(mock_run_command.call_count, 2)
# Case 4: No gateway found (error for both)
with patch('policy_routing.PolicyRoutingManager.run_command') as mock_run_command:
mock_run_command.side_effect = [
(False, "Error dev"),
(False, "Error global")
]
self.assertIsNone(self.manager._find_gateway("eth3"))
self.assertEqual(mock_run_command.call_count, 2)
@patch('policy_routing.PolicyRoutingManager.run_command')
def test_apply_interface_routing(self, mock_run_command):
"""apply_interface_routing 함수 테스트"""
interface_info = {
"name": "eth0",
"ip": "192.168.1.10",
"gateway": "192.168.1.1",
"netmask": "24",
"state": "UP"
}
table_id = 101
priority = 30100
# run_command의 side_effect를 설정하여 각 호출에 대한 응답을 시뮬레이션
# 순서대로 호출될 명령에 대한 응답을 정의
mock_run_command.side_effect = [
(True, ""), # 1. ip rule del from 192.168.1.10/32
(True, ""), # 2. ip route show table 101 (empty, so flush is skipped)
(True, ""), # 3. ip route add network
(True, ""), # 4. ip route add default
(True, ""), # 5. ip rule add from
(True, ""), # 6. ip rule add iif
(True, "30100: from 192.168.1.10 lookup 101"), # 7. ip rule show (verification)
(True, "default via 192.168.1.1 dev eth0 table 101") # 8. ip route show table 101 (verification)
]
success = self.manager.apply_interface_routing(interface_info, table_id, priority)
self.assertTrue(success)
self.manager.logger.error.assert_not_called()
# run_command 호출 검증
expected_calls = [
mock.call(['ip', 'route', 'show', 'table', '101']), # This is the first call
mock.call(['ip', 'rule', 'del', 'from', '192.168.1.10/32'], ignore_errors=['No such file or directory']), # This is the second call
# mock.call(['ip', 'route', 'flush', 'table', '101'], ignore_errors=['No such file or directory']), # Removed, as table is empty
mock.call(['ip', 'route', 'add', '192.168.1.0/24', 'dev', 'eth0', 'src', '192.168.1.10', 'table', '101'], ignore_errors=['File exists']),
mock.call(['ip', 'route', 'add', 'default', 'via', '192.168.1.1', 'dev', 'eth0', 'table', '101'], ignore_errors=['File exists']),
mock.call(['ip', 'rule', 'add', 'from', '192.168.1.10/32', 'table', '101', 'pref', '30100'], ignore_errors=['File exists']),
mock.call(['ip', 'rule', 'add', 'iif', 'eth0', 'table', '101', 'pref', '30101'], ignore_errors=['File exists']),
mock.call(['ip', 'rule', 'show']),
mock.call(['ip', 'route', 'show', 'table', '101'])
]
mock_run_command.assert_has_calls(expected_calls)
self.assertEqual(mock_run_command.call_count, len(expected_calls))
if __name__ == '__main__':
unittest.main()