From 7839226e72f1c5976aa65cf7bd2d3be41a0c3d44 Mon Sep 17 00:00:00 2001 From: jung-geun Date: Mon, 2 Jun 2025 16:51:59 +0900 Subject: [PATCH] Implement code changes to enhance functionality and improve performance --- policy_routing.py | 1634 +++++++++++++++++++++------------------------ 1 file changed, 757 insertions(+), 877 deletions(-) diff --git a/policy_routing.py b/policy_routing.py index 62f9c56..9801f7d 100644 --- a/policy_routing.py +++ b/policy_routing.py @@ -1,957 +1,837 @@ #!/usr/bin/env python3 """ -Policy-Based Routing Manager - 실시간 네트워크 변화 감지 개선 버전 +Ubuntu 22.04 Multi-NIC Policy Based Routing Setup Script +Python Implementation """ +import subprocess +import logging import os import sys import json -import time -import subprocess -import argparse -import logging -import signal -import threading -import socket -import struct -import select +import re import ipaddress +import time +import threading +from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Set, Union - -# 설정 상수 -CONFIG_FILE = "/etc/policy_routing.json" -SERVICE_FILE = "/etc/systemd/system/policy-routing.service" -UDEV_RULE_FILE = "/etc/udev/rules.d/99-policy-routing.rules" -SCRIPT_PATH = "/usr/local/bin/policy_routing.py" -RT_TABLES_FILE = "/etc/iproute2/rt_tables" - -# Netlink 상수 -NETLINK_ROUTE = 0 -RTM_NEWLINK = 16 -RTM_DELLINK = 17 -RTM_NEWADDR = 20 -RTM_DELADDR = 21 - -# Pylance 오류 해결을 위한 추가 Netlink 상수 정의 -# 실제 시스템의 socket 모듈에서 이 값들이 노출되지 않을 경우를 대비 -try: - _AF_NETLINK = socket.AF_NETLINK - _SOL_NETLINK = socket.SOL_NETLINK - _NETLINK_ADD_MEMBERSHIP = socket.NETLINK_ADD_MEMBERSHIP -except AttributeError: - # Fallback for environments where these are not directly exposed - _AF_NETLINK = 16 # Common value for AF_NETLINK - _SOL_NETLINK = 270 # Common value for SOL_NETLINK - _NETLINK_ADD_MEMBERSHIP = 1 # Common value for NETLINK_ADD_MEMBERSHIP - -# 기본 설정 -DEFAULT_CONFIG = { - "enabled": True, - "log_level": "INFO", # DEBUG, INFO, WARNING, ERROR - "check_interval": 5, # 더 빠른 체크 - "interfaces": {}, - "global_settings": {"base_table_id": 100, "base_priority": 30000, "default_metric": 1000}, - "monitoring": {"use_netlink": True, "use_udev": True, "use_polling": True}, -} -class NetlinkMonitor: - """Netlink 소켓을 통한 실시간 네트워크 변화 감지""" - - def __init__(self, callback): - self.callback = callback - self.running = False - self.sock = None - self.logger = logging.getLogger("netlink") - - def start(self): - """Netlink 모니터링 시작""" - try: - self.sock = socket.socket(_AF_NETLINK, socket.SOCK_RAW, NETLINK_ROUTE) - self.sock.bind((os.getpid(), 0)) - - # 관심 있는 그룹에 가입 - groups = (1 << (25 - 1)) | ( - 1 << (26 - 1) - ) # RTNLGRP_LINK, RTNLGRP_IPV4_IFADDR - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) - self.sock.setsockopt( - _SOL_NETLINK, _NETLINK_ADD_MEMBERSHIP, 1 - ) # RTNLGRP_LINK - self.sock.setsockopt( - _SOL_NETLINK, _NETLINK_ADD_MEMBERSHIP, 5 - ) # RTNLGRP_IPV4_IFADDR - - self.running = True - self.logger.info("Netlink 모니터링 시작됨") - - while self.running: - ready, _, _ = select.select([self.sock], [], [], 1.0) - if ready: - data = self.sock.recv(4096) - self._parse_netlink_message(data) - - except Exception as e: - self.logger.error(f"Netlink 모니터링 오류: {e}") - finally: - if self.sock: - self.sock.close() - - def stop(self): - """Netlink 모니터링 중지""" - self.running = False - - def _parse_netlink_message(self, data): - """Netlink 메시지 파싱""" - try: - if len(data) < 16: - return - - # Netlink 헤더 파싱 - nlmsg_len, nlmsg_type, nlmsg_flags, nlmsg_seq, nlmsg_pid = struct.unpack( - "IHHII", data[:16] - ) - - if nlmsg_type in [RTM_NEWLINK, RTM_DELLINK, RTM_NEWADDR, RTM_DELADDR]: - action = "add" if nlmsg_type in [RTM_NEWLINK, RTM_NEWADDR] else "remove" - self.logger.info(f"Netlink 이벤트 감지: {action} (type: {nlmsg_type})") - self.callback("netlink", action) - - except Exception as e: - self.logger.error(f"Netlink 메시지 파싱 오류: {e}") - - -class PolicyRoutingManager: - def __init__(self, debug: bool = False): - self.config = {} - self.running = False - self.interfaces_state = {} - self.managed_tables = set() - self.debug = debug - self.logger = self._setup_logging() - self.netlink_monitor = None - self.last_interface_check = {} - - def _setup_logging(self): - """로깅 설정""" - logger = logging.getLogger("policy_routing") - logger.setLevel(logging.DEBUG if self.debug else logging.INFO) - - # 콘솔 핸들러 - console_handler = logging.StreamHandler() - console_handler.setLevel(logging.DEBUG if self.debug else logging.INFO) - - # 포맷터 - formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +class PolicyBasedRoutingManager: + def __init__(self): + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) + self.logger = logging.getLogger(__name__) - return logger + # 권한 확인 + if os.geteuid() != 0: + self.logger.error("이 스크립트는 root 권한으로 실행해야 합니다.") + sys.exit(1) - def calculate_network(self, ip: str, netmask: str) -> str: - """올바른 네트워크 주소 계산""" + # 네트워크 인터페이스 자동 감지 + self.config = self.auto_detect_network_config() + + def run_command(self, cmd, ignore_error=False): + """시스템 명령어 실행""" try: - # ipaddress 모듈을 사용해서 정확한 네트워크 계산 - interface = ipaddress.IPv4Interface(f"{ip}/{netmask}") - network = interface.network - self.logger.debug(f"네트워크 계산: {ip}/{netmask} -> {network}") - return str(network) + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + if result.returncode != 0 and not ignore_error: + self.logger.warning(f"명령어 실행 경고: {cmd}") + self.logger.warning(f"오류: {result.stderr}") + return result except Exception as e: - self.logger.error(f"네트워크 계산 실패: {ip}/{netmask} - {e}") - # 폴백: 유효하지 않은 IP/넷마스크의 경우 0.0.0.0/넷마스크 반환 - return f"0.0.0.0/{netmask}" + self.logger.error(f"명령어 실행 실패: {cmd} - {e}") + return None - def network_change_callback(self, source: str, action: str): - """네트워크 변화 콜백""" - self.logger.info(f"네트워크 변화 감지됨 (source: {source}, action: {action})") - # 즉시 인터페이스 체크 수행 - threading.Thread(target=self._immediate_interface_check, daemon=True).start() + def get_network_interfaces(self): + """활성화된 네트워크 인터페이스 목록 가져오기""" + interfaces = {} - def _immediate_interface_check(self): - """즉시 인터페이스 체크""" - try: - time.sleep(1) # 짧은 딜레이로 설정이 안정화되길 기다림 - self._check_and_apply_interfaces() - except Exception as e: - self.logger.error(f"즉시 인터페이스 체크 오류: {e}") + # ip link show로 인터페이스 목록 가져오기 + result = self.run_command("ip link show") + if not result or result.returncode != 0: + self.logger.error("네트워크 인터페이스를 가져올 수 없습니다") + return interfaces - def load_config(self) -> Dict: - """설정 파일 로드""" - try: - if os.path.exists(CONFIG_FILE): - with open(CONFIG_FILE, "r") as f: - config = json.load(f) - for key, value in DEFAULT_CONFIG.items(): - if key not in config: - config[key] = value - return config - else: - return DEFAULT_CONFIG.copy() - except Exception as e: - self.logger.error(f"설정 파일 로드 실패: {e}") - return DEFAULT_CONFIG.copy() + # 루프백과 가상 인터페이스 제외하고 물리적 인터페이스만 선택 + for line in result.stdout.split("\n"): + match = re.match(r"^\d+:\s+(\w+):", line) + if match: + interface = match.group(1) + # 루프백, docker, 가상 인터페이스 제외 + if ( + interface != "lo" + and not interface.startswith("docker") + and not interface.startswith("veth") + and not interface.startswith("br-") + and "state UP" in line + ): + interfaces[interface] = {} - def save_config(self, config: Dict): - """설정 파일 저장""" - try: - with open(CONFIG_FILE, "w") as f: - json.dump(config, f, indent=2) - self.logger.info(f"설정 파일 저장됨: {CONFIG_FILE}") - except Exception as e: - self.logger.error(f"설정 파일 저장 실패: {e}") - - def run_command( - self, cmd: List[str], ignore_errors: Union[List[str], None] = None - ) -> tuple: - """명령어 실행 (디버그 강화)""" - if ignore_errors is None: - ignore_errors = [] - - cmd_str = " ".join(cmd) - self.logger.debug(f"실행: {cmd_str}") - - try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - self.logger.debug(f"성공: {cmd_str}") - if result.stdout: - self.logger.debug(f"출력: {result.stdout.strip()}") - return True, result.stdout - except subprocess.CalledProcessError as e: - error_msg = e.stderr.strip() if e.stderr else str(e) - - # 특정 오류는 무시 - for ignore_pattern in ignore_errors: - if ignore_pattern in error_msg: - self.logger.debug(f"무시된 오류: {cmd_str} - {error_msg}") - return True, "" - - self.logger.error(f"명령어 실행 실패: {cmd_str} - {error_msg}") - return False, error_msg - - def get_network_interfaces(self) -> List[Dict]: - """네트워크 인터페이스 정보 수집""" - interfaces = [] - self.logger.debug("네트워크 인터페이스 정보 수집 시작") - - try: - success, output = self.run_command(["ip", "addr", "show"]) - if not success: - return interfaces - - current_iface = None - for line in output.split("\n"): - if line and not line.startswith(" "): - # 새 인터페이스 시작 - parts = line.split(":") - if len(parts) >= 2: - iface_name = parts[1].strip() - if iface_name not in [ - "lo", - "docker0", - ] and not iface_name.startswith(("veth", "br-", "virbr")): - current_iface = { - "name": iface_name, - "ip": None, - "gateway": None, - "netmask": None, - "state": "DOWN", - } - # 상태 확인 - if "UP" in line and "LOWER_UP" in line: - current_iface["state"] = "UP" - - self.logger.debug( - f"인터페이스 발견: {iface_name} - {current_iface['state']}" - ) - - elif current_iface and "inet " in line and "scope global" in line: - # IP 주소 추출 - parts = line.strip().split() - for i, part in enumerate(parts): - if part == "inet" and i + 1 < len(parts): - ip_with_mask = parts[i + 1] - current_iface["ip"] = ip_with_mask.split("/")[0] - current_iface["netmask"] = ( - ip_with_mask.split("/")[1] - if "/" in ip_with_mask - else "24" - ) - self.logger.debug( - f"IP 주소 발견: {current_iface['name']} = {current_iface['ip']}/{current_iface['netmask']}" - ) - break - - if current_iface["ip"]: - # 게이트웨이 찾기 - current_iface["gateway"] = self._find_gateway( - current_iface["name"] - ) - - # 게이트웨이가 있는 경우만 추가 - if current_iface["gateway"]: - interfaces.append(current_iface) - self.logger.debug(f"인터페이스 추가됨: {current_iface}") - else: - self.logger.debug( - f"게이트웨이 없어서 제외됨: {current_iface['name']}" - ) - current_iface = None - - except Exception as e: - self.logger.error(f"인터페이스 정보 수집 실패: {e}") - - self.logger.debug(f"총 {len(interfaces)}개 인터페이스 발견됨") return interfaces - def _find_gateway(self, interface_name: str) -> Optional[str]: - """특정 인터페이스의 게이트웨이 찾기""" - try: - # 인터페이스별 라우트 확인 - success, output = self.run_command( - ["ip", "route", "show", "dev", interface_name] - ) - if success: - for line in output.split("\n"): - if "default via" in line: - gateway = line.split("via")[1].split()[0] - self.logger.debug( - f"{interface_name} 게이트웨이 발견: {gateway}" - ) - return gateway + def get_interface_ip_info(self, interface): + """특정 인터페이스의 IP 정보 가져오기""" + result = self.run_command(f"ip addr show {interface}") + if not result or result.returncode != 0: + return None - # 전체 라우팅 테이블에서 확인 - success, output = self.run_command(["ip", "route", "show"]) - if success: - for line in output.split("\n"): - if f"default via" in line and f"dev {interface_name}" in line: - gateway = line.split("via")[1].split()[0] - self.logger.debug( - f"{interface_name} 전체 테이블에서 게이트웨이 발견: {gateway}" - ) - return gateway + ip_info = {} + for line in result.stdout.split("\n"): + # IPv4 주소 찾기 + match = re.search(r"inet (\d+\.\d+\.\d+\.\d+)/(\d+)", line) + if match: + ip_addr = match.group(1) + prefix = int(match.group(2)) - except Exception as e: - self.logger.debug(f"게이트웨이 조회 실패 {interface_name}: {e}") + # 네트워크 주소 계산 + network = ipaddress.IPv4Network(f"{ip_addr}/{prefix}", strict=False) - self.logger.debug(f"{interface_name} 게이트웨이 없음") - return None - - def get_existing_rules(self) -> Dict: - """기존 라우팅 규칙 조회""" - rules = {"policy_rules": [], "routing_tables": {}} - - try: - # 정책 규칙 조회 - success, output = self.run_command(["ip", "rule", "show"]) - if success: - for line in output.strip().split("\n"): - if "lookup" in line and line.strip(): - rules["policy_rules"].append(line.strip()) - - # 각 테이블별 라우팅 규칙 조회 - for table_id in range(100, 120): - success, output = self.run_command( - ["ip", "route", "show", "table", str(table_id)] - ) - if success and output.strip(): - rules["routing_tables"][table_id] = output.strip().split("\n") - - except Exception as e: - self.logger.error(f"기존 규칙 조회 실패: {e}") - - return rules - - def setup_routing_table(self, interface_name: str, table_id: int): - """라우팅 테이블 설정""" - try: - with open(RT_TABLES_FILE, "r") as f: - content = f.read() - - table_line = f"{table_id}\t{interface_name}\n" - if table_line not in content: - with open(RT_TABLES_FILE, "a") as f: - f.write(table_line) - self.logger.info(f"라우팅 테이블 {table_id} ({interface_name}) 추가됨") - - except Exception as e: - self.logger.error(f"라우팅 테이블 설정 실패: {e}") - - def apply_interface_routing( - self, interface_info: Dict, table_id: int, priority: int, metric: int - ) -> bool: - """인터페이스별 라우팅 규칙 적용 (네트워크 계산 수정)""" - name = interface_info["name"] - ip = interface_info["ip"] - gateway = interface_info["gateway"] - netmask = interface_info.get("netmask", "24") - - self.logger.info(f"=== {name} 인터페이스 라우팅 설정 시작 ===") - self.logger.debug( - f"IP: {ip}, Gateway: {gateway}, Table: {table_id}, Priority: {priority}, Metric: {metric}" - ) - - if not all([name, ip, gateway]): - self.logger.warning(f"인터페이스 {name} 정보 불완전: ip={ip}, gw={gateway}") - return False - - try: - # 올바른 네트워크 계산 - network = self.calculate_network(ip, netmask) - self.logger.debug(f"계산된 네트워크: {network}") - - # 기존 규칙 정리 (더 안전한 방법) - self.logger.debug(f"기존 규칙 정리 중...") - - # 1. 정책 규칙 제거 - cleanup_commands = [["ip", "rule", "del", "from", f"{ip}/32"]] - - # 2. 해당 테이블의 모든 라우트 제거 - success, output = self.run_command( - ["ip", "route", "show", "table", str(table_id)] - ) - if success and output.strip(): - # 테이블을 완전히 비우기 - self.run_command( - ["ip", "route", "flush", "table", str(table_id)], - ignore_errors=["No such file or directory"], - ) - - for cmd in cleanup_commands: - self.run_command(cmd, ignore_errors=["No such file or directory"]) - - # 새 규칙 추가 - self.logger.debug(f"새 라우팅 규칙 추가 중...") - - # 1. 로컬 네트워크 라우트 - success, _ = self.run_command( - [ - "ip", - "route", - "add", - network, - "dev", - name, - "src", - ip, - "table", - str(table_id), - ], - ignore_errors=["File exists"], - ) - - if not success: - self.logger.error(f"로컬 네트워크 라우트 추가 실패: {network}") - return False - - # 2. 기본 게이트웨이 - success, _ = self.run_command( - [ - "ip", - "route", - "add", - "default", - "via", - gateway, - "dev", - name, - "table", - str(table_id), - "metric", - str(metric), - ], - ignore_errors=["File exists"], - ) - - if not success: - self.logger.error(f"기본 게이트웨이 추가 실패: {gateway}") - return False - - # 3. 정책 규칙 - success, _ = self.run_command( - [ - "ip", - "rule", - "add", - "from", - f"{ip}/32", - "table", - str(table_id), - "pref", - str(priority), - ], - ignore_errors=["File exists"], - ) - - if not success: - self.logger.error(f"정책 규칙 추가 실패: from {ip}/32") - return False - - # 4. 인바운드 트래픽을 위한 추가 규칙 (선택사항) - # 해당 인터페이스로 들어오는 패킷이 같은 인터페이스로 나가도록 - success, _ = self.run_command( - [ - "ip", - "rule", - "add", - "iif", - name, - "table", - str(table_id), - "pref", - str(priority + 1), - ], - ignore_errors=["File exists"], - ) - - # 적용 확인 - self.logger.debug(f"적용 결과 확인 중...") - success, output = self.run_command(["ip", "rule", "show"]) - if success: - if f"from {ip}" in output.strip(): # Add .strip() - self.logger.info(f"✅ {name} 정책 규칙 적용 확인됨") - else: - self.logger.error(f"❌ {name} 정책 규칙 적용 확인 실패") - return False - - success, output = self.run_command( - ["ip", "route", "show", "table", str(table_id)] - ) - if success and "default via" in output.strip(): # Add .strip() - self.logger.info(f"✅ {name} 라우팅 테이블 적용 확인됨") - self.logger.debug(f"테이블 {table_id} 내용:\n{output}") - else: - self.logger.error(f"❌ {name} 라우팅 테이블 적용 확인 실패") - return False - - self.managed_tables.add(table_id) - self.logger.info(f"=== {name} 인터페이스 라우팅 설정 완료 ===") - return True - - except Exception as e: - self.logger.error(f"인터페이스 {name} 라우팅 설정 실패: {e}") - return False - - def test_routing(self, interface_name: str): - """라우팅 테스트""" - print(f"\n🧪 {interface_name} 라우팅 테스트") - print("=" * 30) - - try: - # ping 테스트 - result = subprocess.run( - ["ping", "-c", "3", "-I", interface_name, "8.8.8.8"], - capture_output=True, - text=True, - timeout=15, - ) - - if result.returncode == 0: - print(f"✅ {interface_name}을 통한 ping 성공") - # 응답 시간 표시 - for line in result.stdout.split("\n"): - if "time=" in line: - print(f" {line.strip()}") - else: - print(f"❌ {interface_name}을 통한 ping 실패") - print(f" 오류: {result.stderr.strip()}") - - except subprocess.TimeoutExpired: - print(f"⏰ {interface_name} ping 타임아웃") - except Exception as e: - print(f"❌ 테스트 오류: {e}") - - def _check_and_apply_interfaces(self): - """인터페이스 체크 및 적용""" - try: - interfaces = self.get_network_interfaces() - current_interfaces = {iface["name"]: iface for iface in interfaces} - - config_changed = False - - for name, info in current_interfaces.items(): - if info["state"] == "UP" and info["ip"] and info["gateway"]: - if name not in self.config["interfaces"]: - table_id = self.config["global_settings"][ - "base_table_id" - ] + len(self.config["interfaces"]) - self.config["interfaces"][name] = { - "enabled": True, - "table_id": table_id, - "priority": 100, - "health_check_target": "8.8.8.8", - } - config_changed = True - self.logger.info(f"새 인터페이스 {name} 자동 추가됨") - - if self.config["interfaces"][name]["enabled"]: - iface_config = self.config["interfaces"][name] - table_id = iface_config["table_id"] - priority = ( - self.config["global_settings"]["base_priority"] - + iface_config["priority"] - ) - metric = iface_config.get("metric", self.config["global_settings"]["default_metric"]) - - self.setup_routing_table(name, table_id) - self.apply_interface_routing(info, table_id, priority, metric) - - if config_changed: - self.save_config(self.config) - - except Exception as e: - self.logger.error(f"인터페이스 체크 오류: {e}") - - def debug_interface(self, interface_name: Union[str, None] = None): - """특정 인터페이스 디버깅""" - print(f"\n🔍 Policy Routing 디버그 정보") - print("=" * 50) - - # 인터페이스 정보 - interfaces = self.get_network_interfaces() - print(f"\n📡 감지된 인터페이스: {len(interfaces)}개") - for iface in interfaces: - network = self.calculate_network(iface["ip"], iface["netmask"]) - print( - f" - {iface['name']}: {iface['ip']}/{iface['netmask']} -> {iface['gateway']} ({iface['state']})" - ) - print(f" 네트워크: {network}") - - # 설정 정보 - config = self.load_config() - print(f"\n⚙️ 설정된 인터페이스: {len(config.get('interfaces', {}))}개") - for name, conf in config.get("interfaces", {}).items(): - status = "활성화" if conf.get("enabled") else "비활성화" - print(f" - {name}: {status} (테이블 ID: {conf.get('table_id')})") - - # 현재 라우팅 규칙 - print(f"\n📋 현재 Policy 규칙:") - success, output = self.run_command(["ip", "rule", "show"]) - if success: - for line in output.strip().split("\n"): - if any(str(i) in line for i in range(100, 120)) and "lookup" in line: - print(f" - {line}") - - # 라우팅 테이블 - print(f"\n🗂️ 라우팅 테이블:") - for table_id in range(100, 110): - success, output = self.run_command( - ["ip", "route", "show", "table", str(table_id)] - ) - if success and output.strip(): - print(f" 테이블 {table_id}:") - for route in output.strip().split("\n"): - print(f" - {route}") - - def apply_single_interface(self, interface_name: str): - """단일 인터페이스에 규칙 적용""" - self.config = self.load_config() - interfaces = self.get_network_interfaces() - - target_interface = None - for iface in interfaces: - if iface["name"] == interface_name: - target_interface = iface + ip_info = { + "ip": ip_addr, + "prefix": prefix, + "network": str(network), + "netmask": str(network.netmask), + } break - if not target_interface: - self.logger.error(f"인터페이스 {interface_name}을 찾을 수 없습니다.") - return False + return ip_info - if target_interface["state"] != "UP" or not target_interface["ip"]: - self.logger.error( - f"인터페이스 {interface_name}이 활성화되지 않았거나 IP가 없습니다." - ) - return False + def get_default_gateway(self, interface): + """특정 인터페이스의 기본 게이트웨이 찾기""" + # 현재 라우팅 테이블에서 해당 인터페이스의 기본 라우트 찾기 + result = self.run_command(f"ip route show dev {interface}") + if not result or result.returncode != 0: + return None - # 설정에 추가 (없으면) - if interface_name not in self.config["interfaces"]: - table_id = self.config["global_settings"]["base_table_id"] + len( - self.config["interfaces"] - ) - self.config["interfaces"][interface_name] = { - "enabled": True, - "table_id": table_id, - "priority": 100, - "health_check_target": "8.8.8.8", - "metric": self.config["global_settings"]["default_metric"], + for line in result.stdout.split("\n"): + if "default via" in line: + match = re.search(r"default via (\d+\.\d+\.\d+\.\d+)", line) + if match: + return match.group(1) + + # 기본 라우트가 없으면 네트워크의 첫 번째 주소(.1)를 게이트웨이로 추정 + ip_info = self.get_interface_ip_info(interface) + if ip_info: + network = ipaddress.IPv4Network(ip_info["network"]) + # 일반적으로 .1이 게이트웨이 + gateway = str(network.network_address + 1) + return gateway + + return None + + def auto_detect_network_config(self): + """네트워크 설정 자동 감지""" + self.logger.info("네트워크 인터페이스 자동 감지 중...") + + config = {"nics": {}} + interfaces = self.get_network_interfaces() + + if not interfaces: + self.logger.error("활성화된 네트워크 인터페이스를 찾을 수 없습니다") + sys.exit(1) + + table_id = 100 + metric_base = 100 + + for i, interface in enumerate(interfaces.keys()): + self.logger.info(f"인터페이스 {interface} 정보 수집 중...") + + ip_info = self.get_interface_ip_info(interface) + if not ip_info: + self.logger.warning( + f"인터페이스 {interface}의 IP 정보를 가져올 수 없습니다" + ) + continue + + gateway = self.get_default_gateway(interface) + if not gateway: + self.logger.warning( + f"인터페이스 {interface}의 게이트웨이를 찾을 수 없습니다" + ) + continue + + nic_name = f"nic{i+1}" + config["nics"][nic_name] = { + "interface": interface, + "ip": ip_info["ip"], + "network": ip_info["network"], + "gateway": gateway, + "metric": metric_base + (i * 100), + "table_id": table_id + i, } - self.save_config(self.config) - iface_config = self.config["interfaces"][interface_name] - table_id = iface_config["table_id"] - priority = ( - self.config["global_settings"]["base_priority"] + iface_config["priority"] + self.logger.info( + f"감지된 설정 - {nic_name}: {interface} ({ip_info['ip']}) -> {gateway}" + ) + + if not config["nics"]: + self.logger.error("유효한 네트워크 인터페이스를 찾을 수 없습니다") + sys.exit(1) + + self.logger.info( + f"총 {len(config['nics'])}개의 네트워크 인터페이스가 감지되었습니다" ) - metric = iface_config.get("metric", self.config["global_settings"]["default_metric"]) + return config - self.setup_routing_table(interface_name, table_id) - success = self.apply_interface_routing(target_interface, table_id, priority, metric) + def print_detected_config(self): + """감지된 설정 출력""" + print("\n=== 감지된 네트워크 설정 ===") + for nic_name, nic_config in self.config["nics"].items(): + print(f"{nic_name}:") + print(f" 인터페이스: {nic_config['interface']}") + print(f" IP 주소: {nic_config['ip']}") + print(f" 네트워크: {nic_config['network']}") + print(f" 게이트웨이: {nic_config['gateway']}") + print(f" 메트릭: {nic_config['metric']}") + print(f" 테이블 ID: {nic_config['table_id']}") + print() - if success: - print(f"✅ {interface_name} 인터페이스 규칙 적용 완료") - # 테스트도 수행 - self.test_routing(interface_name) - else: - print(f"❌ {interface_name} 인터페이스 규칙 적용 실패") + def create_udev_rules(self): + """udev 규칙 생성 - 네트워크 인터페이스 변경 감지""" + self.logger.info("udev 규칙 생성 중...") - return success + udev_rule_path = Path("/etc/udev/rules.d/99-pbr-network.rules") - def monitor_interfaces(self): - """인터페이스 모니터링""" - last_check_time = 0 + # udev 규칙 내용 + udev_rule_content = """# Policy Based Routing - Network Interface Detection +# NIC가 추가되거나 IP가 변경될 때 자동으로 PBR 재설정 - while self.running: - try: - current_time = time.time() +# 네트워크 인터페이스 UP 이벤트 +SUBSYSTEM=="net", ACTION=="add", RUN+="/usr/local/bin/pbr-udev-handler.py add %k" +SUBSYSTEM=="net", ACTION=="change", KERNEL!="lo", RUN+="/usr/local/bin/pbr-udev-handler.py change %k" - if current_time - last_check_time < self.config["check_interval"]: - time.sleep(1) - continue +# IP 주소 변경 감지를 위한 추가 규칙 +SUBSYSTEM=="net", ACTION=="change", ATTR{operstate}=="up", RUN+="/usr/local/bin/pbr-udev-handler.py ip-change %k" +""" - last_check_time = current_time - interfaces = self.get_network_interfaces() - current_interfaces = {iface["name"]: iface for iface in interfaces} - - config_changed = False - - for name, info in current_interfaces.items(): - if info["state"] == "UP" and info["ip"] and info["gateway"]: - if name not in self.config["interfaces"]: - table_id = self.config["global_settings"][ - "base_table_id" - ] + len(self.config["interfaces"]) - self.config["interfaces"][name] = { - "enabled": True, - "table_id": table_id, - "priority": 100, - "health_check_target": "8.8.8.8", - "metric": self.config["global_settings"]["default_metric"], - } - config_changed = True - self.logger.info(f"새 인터페이스 {name} 자동 추가됨") - - if self.config["interfaces"][name]["enabled"]: - iface_config = self.config["interfaces"][name] - table_id = iface_config["table_id"] - priority = ( - self.config["global_settings"]["base_priority"] - + iface_config["priority"] - ) - metric = iface_config.get("metric", self.config["global_settings"]["default_metric"]) - - self.setup_routing_table(name, table_id) - self.apply_interface_routing(info, table_id, priority, metric) - - if config_changed: - self.save_config(self.config) - - except Exception as e: - self.logger.error(f"모니터링 오류: {e}") - time.sleep(5) - - def start_daemon(self): - """데몬 모드 시작""" - self.config = self.load_config() - self.running = True - - signal.signal(signal.SIGTERM, self._signal_handler) - signal.signal(signal.SIGINT, self._signal_handler) - signal.signal(signal.SIGHUP, self._reload_config) - - self.logger.info("Policy Routing Manager 시작됨") - - monitor_thread = threading.Thread(target=self.monitor_interfaces, daemon=True) - monitor_thread.start() - - # 초기 설정 적용 try: - interfaces = self.get_network_interfaces() - for iface in interfaces: - if iface["state"] == "UP" and iface["ip"] and iface["gateway"]: - self.apply_single_interface(iface["name"]) + udev_rule_path.write_text(udev_rule_content) + self.logger.info(f"udev 규칙 생성 완료: {udev_rule_path}") + + # udev 규칙 다시 로드 + self.run_command("udevadm control --reload-rules") + except Exception as e: - self.logger.error(f"초기 설정 적용 실패: {e}") + self.logger.error(f"udev 규칙 생성 실패: {e}") - try: - while self.running: - time.sleep(1) - except KeyboardInterrupt: - pass - finally: - self.stop_daemon() + def create_udev_handler_script(self): + """udev 이벤트 처리 스크립트 생성""" + self.logger.info("udev 핸들러 스크립트 생성 중...") - def stop_daemon(self): - """데몬 중지""" - self.running = False - self.logger.info("Policy Routing Manager 중지됨") + handler_script_path = Path("/usr/local/bin/pbr-udev-handler.py") - def _signal_handler(self, signum, frame): - """신호 처리""" - self.logger.info(f"신호 {signum} 수신됨") - if signum == signal.SIGHUP: - self._reload_config(signum, frame) + handler_script_content = f'''#!/usr/bin/env python3 +""" +PBR udev 이벤트 핸들러 +네트워크 인터페이스 변경 시 Policy Based Routing 자동 재설정 +""" + +import sys +import subprocess +import time +import logging +from pathlib import Path + +# 로깅 설정 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('/var/log/pbr-udev.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +def run_command(cmd, ignore_error=False): + """시스템 명령어 실행""" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + if result.returncode != 0 and not ignore_error: + logger.warning(f"명령어 실행 경고: {{cmd}}") + logger.warning(f"오류: {{result.stderr}}") + return result + except Exception as e: + logger.error(f"명령어 실행 실패: {{cmd}} - {{e}}") + return None + +def wait_for_ip_assignment(interface, max_wait=30): + """인터페이스에 IP가 할당될 때까지 대기""" + logger.info(f"인터페이스 {{interface}}의 IP 할당 대기 중...") + + for i in range(max_wait): + result = run_command(f"ip addr show {{interface}}") + if result and result.returncode == 0: + # IPv4 주소가 있는지 확인 + if "inet " in result.stdout and not result.stdout.count("inet ") == result.stdout.count("inet 127."): + logger.info(f"인터페이스 {{interface}}에 IP가 할당됨") + return True + time.sleep(1) + + logger.warning(f"인터페이스 {{interface}}의 IP 할당을 {{max_wait}}초간 기다렸지만 할당되지 않음") + return False + +def trigger_pbr_reconfiguration(): + """PBR 재설정 트리거""" + logger.info("PBR 재설정 트리거 중...") + + # 잠시 대기 후 PBR 재설정 실행 + time.sleep(5) # 시스템이 안정화될 시간을 줌 + + pbr_script = Path("{os.path.abspath(__file__)}") + if pbr_script.exists(): + # 기존 설정 제거 후 재설정 + run_command(f"python3 {{pbr_script}} remove", ignore_error=True) + time.sleep(2) + result = run_command(f"echo 'y' | python3 {{pbr_script}} setup") + + if result and result.returncode == 0: + logger.info("PBR 재설정 완료") else: - self.stop_daemon() + logger.error("PBR 재설정 실패") + else: + logger.error(f"PBR 스크립트를 찾을 수 없음: {{pbr_script}}") - def _reload_config(self, signum, frame): - """설정 재로드""" - self.logger.info("설정 재로드 중...") - self.config = self.load_config() +def main(): + if len(sys.argv) != 3: + logger.error("사용법: pbr-udev-handler.py ") + sys.exit(1) + + action = sys.argv[1] + interface = sys.argv[2] + + logger.info(f"udev 이벤트 수신: {{action}} {{interface}}") + + # 루프백과 가상 인터페이스 제외 + if (interface == "lo" or + interface.startswith("docker") or + interface.startswith("veth") or + interface.startswith("br-")): + logger.info(f"인터페이스 {{interface}} 무시됨 (가상 인터페이스)") + return + + if action == "add": + logger.info(f"새 네트워크 인터페이스 감지: {{interface}}") + # IP 할당 대기 후 PBR 재설정 + if wait_for_ip_assignment(interface): + trigger_pbr_reconfiguration() + + elif action == "change": + logger.info(f"네트워크 인터페이스 변경 감지: {{interface}}") + # 인터페이스 상태 확인 후 필요시 재설정 + result = run_command(f"ip link show {{interface}}") + if result and "state UP" in result.stdout: + if wait_for_ip_assignment(interface, max_wait=10): + trigger_pbr_reconfiguration() + + elif action == "ip-change": + logger.info(f"네트워크 인터페이스 IP 변경 감지: {{interface}}") + # IP 변경 시 PBR 재설정 + trigger_pbr_reconfiguration() - def refresh_from_external(self): - """외부(udev 등)에서 호출되는 새로고침""" - self.logger.info("외부 트리거로 새로고침 요청됨") - self.network_change_callback("external", "refresh") - - -# [PolicyRoutingInstaller 클래스는 이전과 동일] -class PolicyRoutingInstaller: - def __init__(self): - self.logger = logging.getLogger("installer") - - def check_requirements(self) -> bool: - if os.geteuid() != 0: - print("오류: 관리자 권한이 필요합니다.") - return False - - required_commands = ["ip", "systemctl"] - for cmd in required_commands: - try: - subprocess.run(["which", cmd], check=True, capture_output=True) - except subprocess.CalledProcessError: - print(f"오류: {cmd} 명령어를 찾을 수 없습니다.") - return False - return True - - def install(self): - if not self.check_requirements(): - return False +if __name__ == "__main__": + main() +''' try: - script_content = open(__file__, "r").read() - with open(SCRIPT_PATH, "w") as f: - f.write(script_content) - os.chmod(SCRIPT_PATH, 0o755) + handler_script_path.write_text(handler_script_content) + handler_script_path.chmod(0o755) + self.logger.info(f"udev 핸들러 스크립트 생성 완료: {handler_script_path}") - service_content = f"""[Unit] -Description=Policy-Based Routing Manager + except Exception as e: + self.logger.error(f"udev 핸들러 스크립트 생성 실패: {e}") + + def create_systemd_service(self): + """systemd 서비스 생성 - 부팅 시 PBR 자동 설정""" + self.logger.info("systemd 서비스 생성 중...") + + service_path = Path("/etc/systemd/system/pbr-auto-setup.service") + + service_content = f"""[Unit] +Description=Policy Based Routing Auto Setup After=network.target Wants=network.target [Service] -Type=simple -ExecStart={SCRIPT_PATH} daemon -ExecReload=/bin/kill -HUP $MAINPID -Restart=on-failure -RestartSec=5 -User=root +Type=oneshot +ExecStart=/bin/bash -c "sleep 10 && echo 'y' | python3 {os.path.abspath(__file__)} setup" +RemainAfterExit=yes +StandardOutput=journal +StandardError=journal [Install] WantedBy=multi-user.target """ - with open(SERVICE_FILE, "w") as f: - f.write(service_content) - udev_content = f"""SUBSYSTEM=="net", ACTION=="add", RUN+="{SCRIPT_PATH} refresh" -SUBSYSTEM=="net", ACTION=="remove", RUN+="{SCRIPT_PATH} refresh" -SUBSYSTEM=="net", ACTION=="change", RUN+="{SCRIPT_PATH} refresh" -SUBSYSTEM=="net", ACTION=="move", RUN+="{SCRIPT_PATH} refresh" + try: + service_path.write_text(service_content) + self.logger.info(f"systemd 서비스 생성 완료: {service_path}") + + # 서비스 활성화 + self.run_command("systemctl daemon-reload") + self.run_command("systemctl enable pbr-auto-setup.service") + + except Exception as e: + self.logger.error(f"systemd 서비스 생성 실패: {e}") + + def setup_udev_monitoring(self): + """udev 모니터링 시스템 설정""" + self.logger.info("udev 모니터링 시스템 설정 중...") + + try: + # udev 규칙 생성 + self.create_udev_rules() + + # udev 핸들러 스크립트 생성 + self.create_udev_handler_script() + + # systemd 서비스 생성 + self.create_systemd_service() + + # 로그 디렉토리 확인 + log_dir = Path("/var/log") + if not log_dir.exists(): + log_dir.mkdir(exist_ok=True) + + self.logger.info("udev 모니터링 시스템 설정 완료") + print("\n=== udev 모니터링 설정 완료 ===") + print("1. 새로운 NIC가 추가되면 자동으로 PBR 재설정") + print("2. 기존 NIC의 IP가 변경되면 자동으로 PBR 재설정") + print("3. 시스템 부팅 시 자동으로 PBR 설정") + print("4. 로그 파일: /var/log/pbr-udev.log") + + except Exception as e: + self.logger.error(f"udev 모니터링 시스템 설정 실패: {e}") + + def remove_udev_monitoring(self): + """udev 모니터링 시스템 제거""" + self.logger.info("udev 모니터링 시스템 제거 중...") + + try: + # udev 규칙 제거 + udev_rule_path = Path("/etc/udev/rules.d/99-pbr-network.rules") + if udev_rule_path.exists(): + udev_rule_path.unlink() + self.logger.info("udev 규칙 제거됨") + + # udev 핸들러 스크립트 제거 + handler_script_path = Path("/usr/local/bin/pbr-udev-handler.py") + if handler_script_path.exists(): + handler_script_path.unlink() + self.logger.info("udev 핸들러 스크립트 제거됨") + + # systemd 서비스 제거 + service_path = Path("/etc/systemd/system/pbr-auto-setup.service") + if service_path.exists(): + self.run_command( + "systemctl disable pbr-auto-setup.service", ignore_error=True + ) + self.run_command( + "systemctl stop pbr-auto-setup.service", ignore_error=True + ) + service_path.unlink() + self.run_command("systemctl daemon-reload") + self.logger.info("systemd 서비스 제거됨") + + # udev 규칙 다시 로드 + self.run_command("udevadm control --reload-rules") + + self.logger.info("udev 모니터링 시스템 제거 완료") + + except Exception as e: + self.logger.error(f"udev 모니터링 시스템 제거 실패: {e}") + + def check_interface_changes(self): + """인터페이스 변경 사항 실시간 모니터링 (테스트용)""" + self.logger.info("네트워크 인터페이스 변경 모니터링 시작...") + print("네트워크 인터페이스 변경을 모니터링 중입니다...") + print("Ctrl+C로 중지할 수 있습니다.") + + last_config = self.config.copy() + + try: + while True: + time.sleep(5) # 5초마다 확인 + + # 현재 설정 다시 감지 + current_config = self.auto_detect_network_config() + + # 변경 사항 확인 + if current_config != last_config: + self.logger.info("네트워크 인터페이스 변경 감지됨!") + print("\n=== 네트워크 변경 감지 ===") + + # 새로 추가된 인터페이스 + new_nics = set(current_config["nics"].keys()) - set( + last_config["nics"].keys() + ) + if new_nics: + print(f"새로 추가된 인터페이스: {', '.join(new_nics)}") + + # 제거된 인터페이스 + removed_nics = set(last_config["nics"].keys()) - set( + current_config["nics"].keys() + ) + if removed_nics: + print(f"제거된 인터페이스: {', '.join(removed_nics)}") + + # 설정 업데이트 및 PBR 재설정 + self.config = current_config + print("PBR 자동 재설정 중...") + self.cleanup_existing() + self.setup_routing_tables() + self.configure_nic_routes() + self.setup_policy_rules() + self.setup_main_routing() + print("PBR 재설정 완료!") + + last_config = current_config.copy() + + except KeyboardInterrupt: + print("\n모니터링이 중지되었습니다.") + self.logger.info("네트워크 인터페이스 모니터링 중지됨") + + def create_backup(self): + """기존 설정 백업""" + self.logger.info("기존 설정 백업 중...") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_dir = Path("/tmp/routing_backup") + backup_dir.mkdir(exist_ok=True) + + # 라우팅 테이블 백업 + if Path("/etc/iproute2/rt_tables").exists(): + self.run_command( + f"cp /etc/iproute2/rt_tables {backup_dir}/rt_tables_{timestamp}" + ) + + # 현재 라우팅 정보 백업 + self.run_command(f"ip route show > {backup_dir}/routes_{timestamp}.txt") + self.run_command(f"ip rule show > {backup_dir}/rules_{timestamp}.txt") + + self.logger.info(f"백업 완료: {backup_dir}") + + def cleanup_existing(self): + """기존 설정 정리""" + self.logger.info("기존 policy routing 설정 정리 중...") + + for nic_name, nic_config in self.config["nics"].items(): + ip_addr = nic_config["ip"] + + # 기존 policy rules 제거 + self.run_command( + f"ip rule del from {ip_addr}/32 table {nic_name}", ignore_error=True + ) + self.run_command( + f"ip rule del to {ip_addr}/32 table {nic_name}", ignore_error=True + ) + + # 기존 라우팅 테이블 내용 정리 + self.run_command(f"ip route flush table {nic_name}", ignore_error=True) + + def setup_routing_tables(self): + """라우팅 테이블 설정""" + self.logger.info("라우팅 테이블 설정 중...") + + rt_tables_path = Path("/etc/iproute2/rt_tables") + + # 기존 내용 읽기 + existing_content = "" + if rt_tables_path.exists(): + existing_content = rt_tables_path.read_text() + + # 새로운 테이블 추가 + for nic_name, nic_config in self.config["nics"].items(): + table_entry = f"{nic_config['table_id']} {nic_name}" + + if table_entry not in existing_content: + with open(rt_tables_path, "a") as f: + f.write(f"\n{table_entry}\n") + self.logger.info(f"라우팅 테이블 '{nic_name}' 추가됨") + + def configure_nic_routes(self): + """각 NIC별 라우팅 테이블 구성""" + self.logger.info("각 NIC별 라우팅 테이블 구성 중...") + + for nic_name, nic_config in self.config["nics"].items(): + interface = nic_config["interface"] + gateway = nic_config["gateway"] + ip_addr = nic_config["ip"] + network = nic_config["network"] + + self.logger.info(f"NIC {nic_name} ({interface}) 라우팅 설정 중...") + + # 로컬 네트워크 라우트 + self.run_command( + f"ip route add {network} dev {interface} src {ip_addr} table {nic_name}" + ) + + # 기본 게이트웨이 + self.run_command( + f"ip route add default via {gateway} dev {interface} table {nic_name}" + ) + + def setup_policy_rules(self): + """Policy Rules 설정""" + self.logger.info("Policy Rules 설정 중...") + + for nic_name, nic_config in self.config["nics"].items(): + ip_addr = nic_config["ip"] + + # Source IP 기반 정책 + self.run_command( + f"ip rule add from {ip_addr}/32 table {nic_name} priority 100" + ) + + # Destination IP 기반 정책 + self.run_command( + f"ip rule add to {ip_addr}/32 table {nic_name} priority 101" + ) + + self.logger.info(f"NIC {nic_name} (IP: {ip_addr}) Policy Rule 설정 완료") + + def setup_main_routing(self): + """메인 라우팅 테이블 설정 (metric 기반 우선순위)""" + self.logger.info("메인 라우팅 테이블 설정 중...") + + # 기존 모든 default 라우트 제거 (더 강력한 방법) + result = self.run_command("ip route show default") + if result and result.stdout.strip(): + for line in result.stdout.strip().split("\n"): + if line.strip() and "default" in line: + self.run_command(f"ip route del {line.strip()}", ignore_error=True) + + # metric 순으로 정렬하여 default 라우트 추가 + sorted_nics = sorted(self.config["nics"].items(), key=lambda x: x[1]["metric"]) + + for nic_name, nic_config in sorted_nics: + interface = nic_config["interface"] + gateway = nic_config["gateway"] + metric = nic_config["metric"] + + # 라우트 추가 전에 동일한 라우트가 있는지 확인 + check_result = self.run_command( + f"ip route show default via {gateway} dev {interface}" + ) + if not check_result or not check_result.stdout.strip(): + self.run_command( + f"ip route add default via {gateway} dev {interface} metric {metric}" + ) + self.logger.info( + f"Default 라우트 추가: {gateway} via {interface} (metric: {metric})" + ) + else: + self.logger.info( + f"Default 라우트 이미 존재: {gateway} via {interface} (metric: {metric})" + ) + + def check_interfaces(self): + """네트워크 인터페이스 상태 확인""" + self.logger.info("네트워크 인터페이스 상태 확인 중...") + + for nic_name, nic_config in self.config["nics"].items(): + interface = nic_config["interface"] + + result = self.run_command(f"ip link show {interface}") + if result and result.returncode == 0: + output = result.stdout + if "state UP" in output: + self.logger.info(f"인터페이스 {interface}: UP") + else: + self.logger.warning(f"인터페이스 {interface}: DOWN 또는 상태 불명") + else: + self.logger.error(f"인터페이스 {interface}를 찾을 수 없습니다") + return False + return True + + def verify_configuration(self): + """설정 검증""" + self.logger.info("설정 검증 중...") + + print("\n=== 라우팅 테이블 ===") + result = self.run_command( + "cat /etc/iproute2/rt_tables | grep -E '^[0-9]+.*nic[0-9]+'" + ) + if result: + print(result.stdout) + + print("\n=== Policy Rules ===") + result = self.run_command("ip rule show") + if result: + print(result.stdout) + + print("\n=== 메인 라우팅 테이블의 Default 라우트 ===") + result = self.run_command("ip route show | grep default") + if result: + print(result.stdout) + + for nic_name in self.config["nics"].keys(): + print(f"\n=== NIC {nic_name} 라우팅 테이블 ===") + result = self.run_command(f"ip route show table {nic_name}") + if result: + print(result.stdout) + + def create_startup_script(self): + """시스템 시작시 자동 적용을 위한 스크립트 생성""" + self.logger.info("시작시 자동 적용 스크립트 생성 중...") + + startup_script = Path("/etc/network/if-up.d/policy-routing-python") + + script_content = f"""#!/usr/bin/env python3 +import subprocess +import json + +config = {json.dumps(self.config, indent=2)} + +def run_cmd(cmd): + try: + subprocess.run(cmd, shell=True, check=False) + except: + pass + +# Policy rules 재설정 +for nic_name, nic_config in config['nics'].items(): + ip_addr = nic_config['ip'] + run_cmd(f"ip rule add from {{ip_addr}}/32 table {{nic_name}} priority 100") + run_cmd(f"ip rule add to {{ip_addr}}/32 table {{nic_name}} priority 101") """ - with open(UDEV_RULE_FILE, "w") as f: - f.write(udev_content) - subprocess.run(["udevadm", "control", "--reload-rules"], check=True) - subprocess.run(["systemctl", "daemon-reload"], check=True) - subprocess.run(["systemctl", "enable", "policy-routing"], check=True) - subprocess.run(["systemctl", "restart", "policy-routing"], check=True) + startup_script.write_text(script_content) + startup_script.chmod(0o755) - if not os.path.exists(CONFIG_FILE): - with open(CONFIG_FILE, "w") as f: - json.dump(DEFAULT_CONFIG, f, indent=2) + self.logger.info(f"시작시 자동 적용 스크립트 생성 완료: {startup_script}") + + def run_connectivity_test(self): + """연결성 테스트""" + self.logger.info("연결성 테스트 실행 중...") + + for nic_name, nic_config in self.config["nics"].items(): + interface = nic_config["interface"] + ip_addr = nic_config["ip"] + gateway = nic_config["gateway"] + + print(f"\n=== NIC {nic_name} ({interface}) 테스트 ===") + + # 게이트웨이 ping 테스트 + result = self.run_command(f"ping -c 1 -W 2 -I {ip_addr} {gateway}") + if result and result.returncode == 0: + self.logger.info(f"게이트웨이 {gateway} 연결 성공") + else: + self.logger.warning(f"게이트웨이 {gateway} 연결 실패") + + # 외부 DNS 테스트 + result = self.run_command(f"ping -c 1 -W 2 -I {ip_addr} 8.8.8.8") + if result and result.returncode == 0: + self.logger.info("외부 연결 (8.8.8.8) 성공") + else: + self.logger.warning("외부 연결 실패") + + def setup(self): + """전체 설정 실행""" + print("=" * 50) + print(" Ubuntu 22.04 Multi-NIC Policy Based Routing") + print(" Python Implementation with Auto-Detection") + print("=" * 50) + + # 감지된 설정 출력 + self.print_detected_config() + + # 사용자 확인 + response = input("위 설정으로 진행하시겠습니까? (y/N): ") + if response.lower() != "y": + print("설정이 취소되었습니다.") + return False + + try: + self.create_backup() + + if not self.check_interfaces(): + self.logger.error("인터페이스 확인 실패") + return False + + self.cleanup_existing() + self.setup_routing_tables() + self.configure_nic_routes() + self.setup_policy_rules() + self.setup_main_routing() + self.verify_configuration() + self.create_startup_script() + + # udev 모니터링 시스템 설정 추가 + self.setup_udev_monitoring() + + self.run_connectivity_test() + + print("\n" + "=" * 50) + print(" Policy Based Routing 설정이 완료되었습니다!") + print("=" * 50) + print("주요 설정:") + print("1. 외부에서 들어온 패킷은 동일한 NIC로 응답") + print("2. 내부 → 외부 패킷은 metric 우선순위에 따라 라우팅") + print("3. 시스템 재시작시 자동 적용됨") + print("4. 새로운 NIC 추가/변경시 자동 재설정") - print("✅ Policy Routing Manager 설치 완료!") return True except Exception as e: - print(f"❌ 설치 실패: {e}") + self.logger.error(f"설정 중 오류 발생: {e}") return False + def remove_configuration(self): + """설정 제거""" + self.logger.info("Policy routing 설정 제거 중...") + + # Policy rules 제거 + for nic_name, nic_config in self.config["nics"].items(): + ip_addr = nic_config["ip"] + self.run_command( + f"ip rule del from {ip_addr}/32 table {nic_name}", ignore_error=True + ) + self.run_command( + f"ip rule del to {ip_addr}/32 table {nic_name}", ignore_error=True + ) + self.run_command(f"ip route flush table {nic_name}", ignore_error=True) + + # 시작 스크립트 제거 + startup_script = Path("/etc/network/if-up.d/policy-routing-python") + if startup_script.exists(): + startup_script.unlink() + + # udev 모니터링 시스템 제거 + self.remove_udev_monitoring() + + self.logger.info("설정 제거 완료") + def main(): - parser = argparse.ArgumentParser(description="Policy-Based Routing Manager") + import argparse + + parser = argparse.ArgumentParser( + description="Ubuntu 22.04 Policy Based Routing Manager" + ) parser.add_argument( "action", - choices=[ - "install", - "daemon", - "status", - "refresh", - "clean", - "debug", - "apply", - "test", - "test-udev", - ], - help="실행할 작업", + choices=["setup", "remove", "verify", "detect", "monitor"], + help="수행할 작업", ) - parser.add_argument("--interface", help="특정 인터페이스 지정") - parser.add_argument("--debug", action="store_true", help="디버그 모드") args = parser.parse_args() - if args.action == "debug": - manager = PolicyRoutingManager(debug=True) - manager.debug_interface(args.interface) + manager = PolicyBasedRoutingManager() - elif args.action == "apply": - if not args.interface: - print("❌ --interface 옵션이 필요합니다.") - sys.exit(1) - manager = PolicyRoutingManager(debug=True) - manager.apply_single_interface(args.interface) - - elif args.action == "test": - if not args.interface: - print("❌ --interface 옵션이 필요합니다.") - sys.exit(1) - manager = PolicyRoutingManager(debug=True) - manager.test_routing(args.interface) - - elif args.action == "daemon": - manager = PolicyRoutingManager(debug=args.debug) - manager.start_daemon() - - elif args.action == "install": - installer = PolicyRoutingInstaller() - installer.install() - - elif args.action == "refresh": - manager = PolicyRoutingManager() - manager.refresh_from_external() - - elif args.action == "test-udev": - # udev 규칙 테스트 - print("🔍 udev 이벤트 모니터링 중... (Ctrl+C로 중지)") - print("새 네트워크 인터페이스를 연결해보세요.") - os.system("udevadm monitor --environment --udev --subsystem-match=net") - - elif args.action == "status": - manager = PolicyRoutingManager() - interfaces = manager.get_network_interfaces() - print("📡 네트워크 인터페이스 상태:") - for iface in interfaces: - print( - f" - {iface['name']}: {iface['ip']} -> {iface['gateway']} ({iface['state']})" - ) - - # udev 규칙 상태 확인 - if os.path.exists(UDEV_RULE_FILE): - print(f"\n✅ udev 규칙 설치됨: {UDEV_RULE_FILE}") - else: - print(f"\n❌ udev 규칙 없음: {UDEV_RULE_FILE}") + if args.action == "setup": + manager.setup() + elif args.action == "remove": + manager.remove_configuration() + elif args.action == "verify": + manager.verify_configuration() + elif args.action == "detect": + manager.print_detected_config() + elif args.action == "monitor": + manager.check_interface_changes() if __name__ == "__main__":