diff --git a/policy-routing.py b/policy-routing.py index 65142f1..e3480c3 100644 --- a/policy-routing.py +++ b/policy-routing.py @@ -16,7 +16,7 @@ import socket import struct import select from pathlib import Path -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Set, Union # 설정 상수 CONFIG_FILE = "/etc/policy_routing.json" @@ -35,103 +35,107 @@ RTM_DELADDR = 21 # 기본 설정 DEFAULT_CONFIG = { "enabled": True, - "log_level": "INFO", + "log_level": "INFO", # DEBUG, INFO, WARNING, ERROR "check_interval": 5, # 더 빠른 체크 "interfaces": {}, - "global_settings": { - "base_table_id": 100, - "base_priority": 30000 - }, - "monitoring": { - "use_netlink": True, - "use_udev": True, - "use_polling": True - } + "global_settings": {"base_table_id": 100, "base_priority": 30000}, + "monitoring": {"use_netlink": True, "use_udev": True, "use_polling": True}, } + class NetlinkMonitor: """Netlink 소켓을 통한 실시간 네트워크 변화 감지""" - + def __init__(self, callback): self.callback = callback self.running = False self.sock = None - self.logger = logging.getLogger('netlink') - + self.logger = logging.getLogger("netlink") + def start(self): """Netlink 모니터링 시작""" try: self.sock = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, NETLINK_ROUTE) self.sock.bind((os.getpid(), 0)) - + # 관심 있는 그룹에 가입 - groups = (1 << (25-1)) | (1 << (26-1)) # RTNLGRP_LINK, RTNLGRP_IPV4_IFADDR + groups = (1 << (25 - 1)) | ( + 1 << (26 - 1) + ) # RTNLGRP_LINK, RTNLGRP_IPV4_IFADDR self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) - self.sock.setsockopt(socket.SOL_NETLINK, socket.NETLINK_ADD_MEMBERSHIP, 1) # RTNLGRP_LINK - self.sock.setsockopt(socket.SOL_NETLINK, socket.NETLINK_ADD_MEMBERSHIP, 5) # RTNLGRP_IPV4_IFADDR - + self.sock.setsockopt( + socket.SOL_NETLINK, socket.NETLINK_ADD_MEMBERSHIP, 1 + ) # RTNLGRP_LINK + self.sock.setsockopt( + socket.SOL_NETLINK, socket.NETLINK_ADD_MEMBERSHIP, 5 + ) # RTNLGRP_IPV4_IFADDR + self.running = True self.logger.info("Netlink 모니터링 시작됨") - + while self.running: ready, _, _ = select.select([self.sock], [], [], 1.0) if ready: data = self.sock.recv(4096) self._parse_netlink_message(data) - + except Exception as e: self.logger.error(f"Netlink 모니터링 오류: {e}") finally: if self.sock: self.sock.close() - + def stop(self): """Netlink 모니터링 중지""" self.running = False - + def _parse_netlink_message(self, data): """Netlink 메시지 파싱""" try: if len(data) < 16: return - + # Netlink 헤더 파싱 - nlmsg_len, nlmsg_type, nlmsg_flags, nlmsg_seq, nlmsg_pid = struct.unpack("IHHII", data[:16]) - + nlmsg_len, nlmsg_type, nlmsg_flags, nlmsg_seq, nlmsg_pid = struct.unpack( + "IHHII", data[:16] + ) + if nlmsg_type in [RTM_NEWLINK, RTM_DELLINK, RTM_NEWADDR, RTM_DELADDR]: action = "add" if nlmsg_type in [RTM_NEWLINK, RTM_NEWADDR] else "remove" self.logger.info(f"Netlink 이벤트 감지: {action} (type: {nlmsg_type})") self.callback("netlink", action) - + except Exception as e: self.logger.error(f"Netlink 메시지 파싱 오류: {e}") + class PolicyRoutingManager: - def __init__(self): + def __init__(self, debug: bool = False): self.config = {} self.running = False self.interfaces_state = {} self.managed_tables = set() + self.debug = debug self.logger = self._setup_logging() self.netlink_monitor = None self.last_interface_check = {} - + def _setup_logging(self): """로깅 설정""" - logger = logging.getLogger('policy_routing') - logger.setLevel(logging.INFO) - + logger = logging.getLogger("policy_routing") + logger.setLevel(logging.DEBUG if self.debug else logging.INFO) + # 콘솔 핸들러 console_handler = logging.StreamHandler() - console_handler.setLevel(logging.INFO) - + console_handler.setLevel(logging.DEBUG if self.debug else logging.INFO) + # 포맷터 formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) console_handler.setFormatter(formatter) logger.addHandler(console_handler) - + return logger def network_change_callback(self, source: str, action: str): @@ -139,7 +143,7 @@ class PolicyRoutingManager: self.logger.info(f"네트워크 변화 감지됨 (source: {source}, action: {action})") # 즉시 인터페이스 체크 수행 threading.Thread(target=self._immediate_interface_check, daemon=True).start() - + def _immediate_interface_check(self): """즉시 인터페이스 체크""" try: @@ -152,9 +156,8 @@ class PolicyRoutingManager: """설정 파일 로드""" try: if os.path.exists(CONFIG_FILE): - with open(CONFIG_FILE, 'r') as f: + with open(CONFIG_FILE, "r") as f: config = json.load(f) - # 기본값과 병합 for key, value in DEFAULT_CONFIG.items(): if key not in config: config[key] = value @@ -168,142 +171,314 @@ class PolicyRoutingManager: def save_config(self, config: Dict): """설정 파일 저장""" try: - with open(CONFIG_FILE, 'w') as f: + with open(CONFIG_FILE, "w") as f: json.dump(config, f, indent=2) self.logger.info(f"설정 파일 저장됨: {CONFIG_FILE}") except Exception as e: self.logger.error(f"설정 파일 저장 실패: {e}") - def run_command(self, cmd: List[str], ignore_errors: List[str] = None) -> bool: - """명령어 실행 (오류 처리 개선)""" + def run_command( + self, cmd: List[str], ignore_errors: Union[List[str], None] = None + ) -> tuple: + """명령어 실행 (디버그 강화)""" if ignore_errors is None: ignore_errors = [] - + + cmd_str = " ".join(cmd) + self.logger.debug(f"실행: {cmd_str}") + try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) - return True + self.logger.debug(f"성공: {cmd_str}") + if result.stdout: + self.logger.debug(f"출력: {result.stdout.strip()}") + return True, result.stdout except subprocess.CalledProcessError as e: error_msg = e.stderr.strip() if e.stderr else str(e) - + # 특정 오류는 무시 for ignore_pattern in ignore_errors: if ignore_pattern in error_msg: - return True - - self.logger.error(f"명령어 실행 실패 {' '.join(cmd)}: {error_msg}") - return False + self.logger.debug(f"무시된 오류: {cmd_str} - {error_msg}") + return True, "" + + self.logger.error(f"명령어 실행 실패: {cmd_str} - {error_msg}") + return False, error_msg def get_network_interfaces(self) -> List[Dict]: - """네트워크 인터페이스 정보 수집""" + """네트워크 인터페이스 정보 수집 (디버그 강화)""" interfaces = [] + self.logger.debug("네트워크 인터페이스 정보 수집 시작") + try: - # ip addr show 명령어로 인터페이스 정보 수집 - result = subprocess.run(['ip', 'addr', 'show'], - capture_output=True, text=True, check=True) - + success, output = self.run_command(["ip", "addr", "show"]) + if not success: + return interfaces + current_iface = None - for line in result.stdout.split('\n'): - if line and not line.startswith(' '): + for line in output.split("\n"): + if line and not line.startswith(" "): # 새 인터페이스 시작 - parts = line.split(':') + parts = line.split(":") if len(parts) >= 2: iface_name = parts[1].strip() - if iface_name not in ['lo', 'docker0'] and not iface_name.startswith(('veth', 'br-', 'virbr')): + if iface_name not in [ + "lo", + "docker0", + ] and not iface_name.startswith(("veth", "br-", "virbr")): current_iface = { - 'name': iface_name, - 'ip': None, - 'gateway': None, - 'state': 'DOWN' + "name": iface_name, + "ip": None, + "gateway": None, + "netmask": None, + "state": "DOWN", } # 상태 확인 - if 'UP' in line and 'LOWER_UP' in line: - current_iface['state'] = 'UP' - elif current_iface and 'inet ' in line and 'scope global' in line: + if "UP" in line and "LOWER_UP" in line: + current_iface["state"] = "UP" + + self.logger.debug( + f"인터페이스 발견: {iface_name} - {current_iface['state']}" + ) + + elif current_iface and "inet " in line and "scope global" in line: # IP 주소 추출 parts = line.strip().split() for i, part in enumerate(parts): - if part == 'inet' and i + 1 < len(parts): - current_iface['ip'] = parts[i + 1].split('/')[0] - current_iface['netmask'] = parts[i + 1].split('/')[1] if '/' in parts[i + 1] else '24' - break - - if current_iface['ip'] and current_iface not in interfaces: - # 게이트웨이 찾기 - try: - gw_result = subprocess.run( - ['ip', 'route', 'show', 'dev', current_iface['name']], - capture_output=True, text=True + if part == "inet" and i + 1 < len(parts): + ip_with_mask = parts[i + 1] + current_iface["ip"] = ip_with_mask.split("/")[0] + current_iface["netmask"] = ( + ip_with_mask.split("/")[1] + if "/" in ip_with_mask + else "24" ) - for gw_line in gw_result.stdout.split('\n'): - if 'default via' in gw_line: - current_iface['gateway'] = gw_line.split('via')[1].split()[0] - break - - except Exception: - pass - - if current_iface['gateway']: + self.logger.debug( + f"IP 주소 발견: {current_iface['name']} = {current_iface['ip']}/{current_iface['netmask']}" + ) + break + + if current_iface["ip"]: + # 게이트웨이 찾기 + current_iface["gateway"] = self._find_gateway( + current_iface["name"] + ) + + # 게이트웨이가 있는 경우만 추가 + if current_iface["gateway"]: interfaces.append(current_iface) + self.logger.debug(f"인터페이스 추가됨: {current_iface}") + else: + self.logger.debug( + f"게이트웨이 없어서 제외됨: {current_iface['name']}" + ) current_iface = None - + except Exception as e: self.logger.error(f"인터페이스 정보 수집 실패: {e}") - + + self.logger.debug(f"총 {len(interfaces)}개 인터페이스 발견됨") return interfaces + def _find_gateway(self, interface_name: str) -> Optional[str]: + """특정 인터페이스의 게이트웨이 찾기""" + try: + # 인터페이스별 라우트 확인 + success, output = self.run_command( + ["ip", "route", "show", "dev", interface_name] + ) + if success: + for line in output.split("\n"): + if "default via" in line: + gateway = line.split("via")[1].split()[0] + self.logger.debug( + f"{interface_name} 게이트웨이 발견: {gateway}" + ) + return gateway + + # 전체 라우팅 테이블에서 확인 + success, output = self.run_command(["ip", "route", "show"]) + if success: + for line in output.split("\n"): + if f"default via" in line and f"dev {interface_name}" in line: + gateway = line.split("via")[1].split()[0] + self.logger.debug( + f"{interface_name} 전체 테이블에서 게이트웨이 발견: {gateway}" + ) + return gateway + + except Exception as e: + self.logger.debug(f"게이트웨이 조회 실패 {interface_name}: {e}") + + self.logger.debug(f"{interface_name} 게이트웨이 없음") + return None + + def get_existing_rules(self) -> Dict: + """기존 라우팅 규칙 조회""" + rules = {"policy_rules": [], "routing_tables": {}} + + try: + # 정책 규칙 조회 + success, output = self.run_command(["ip", "rule", "show"]) + if success: + for line in output.strip().split("\n"): + if "lookup" in line and line.strip(): + rules["policy_rules"].append(line.strip()) + + # 각 테이블별 라우팅 규칙 조회 + for table_id in range(100, 120): + success, output = self.run_command( + ["ip", "route", "show", "table", str(table_id)] + ) + if success and output.strip(): + rules["routing_tables"][table_id] = output.strip().split("\n") + + except Exception as e: + self.logger.error(f"기존 규칙 조회 실패: {e}") + + return rules + def setup_routing_table(self, interface_name: str, table_id: int): """라우팅 테이블 설정""" try: - with open(RT_TABLES_FILE, 'r') as f: + with open(RT_TABLES_FILE, "r") as f: content = f.read() - + table_line = f"{table_id}\t{interface_name}\n" if table_line not in content: - with open(RT_TABLES_FILE, 'a') as f: + with open(RT_TABLES_FILE, "a") as f: f.write(table_line) self.logger.info(f"라우팅 테이블 {table_id} ({interface_name}) 추가됨") - + except Exception as e: self.logger.error(f"라우팅 테이블 설정 실패: {e}") - def apply_interface_routing(self, interface_info: Dict, table_id: int, priority: int) -> bool: - """인터페이스별 라우팅 규칙 적용""" - name = interface_info['name'] - ip = interface_info['ip'] - gateway = interface_info['gateway'] - netmask = interface_info.get('netmask', '24') - + def apply_interface_routing( + self, interface_info: Dict, table_id: int, priority: int + ) -> bool: + """인터페이스별 라우팅 규칙 적용 (강화된 디버깅)""" + name = interface_info["name"] + ip = interface_info["ip"] + gateway = interface_info["gateway"] + netmask = interface_info.get("netmask", "24") + + self.logger.info(f"=== {name} 인터페이스 라우팅 설정 시작 ===") + self.logger.debug( + f"IP: {ip}, Gateway: {gateway}, Table: {table_id}, Priority: {priority}" + ) + if not all([name, ip, gateway]): + self.logger.warning(f"인터페이스 {name} 정보 불완전: ip={ip}, gw={gateway}") return False - # 상태 변화가 있는지 확인 - current_state = f"{ip}:{gateway}:{table_id}" - if self.last_interface_check.get(name) == current_state: - return True - try: - network = f"{'.'.join(ip.split('.')[:-1])}.0/{netmask}" - - # 기존 규칙 정리 - subprocess.run(['ip', 'rule', 'del', 'from', f"{ip}/32"], capture_output=True) - subprocess.run(['ip', 'route', 'del', 'default', 'table', str(table_id)], capture_output=True) - - # 새 규칙 추가 - commands = [ - ['ip', 'route', 'add', network, 'dev', name, 'src', ip, 'table', str(table_id)], - ['ip', 'route', 'add', 'default', 'via', gateway, 'dev', name, 'table', str(table_id)], - ['ip', 'rule', 'add', 'from', f"{ip}/32", 'table', str(table_id), 'pref', str(priority)] + # 네트워크 계산 + ip_parts = ip.split(".") + network = f"{'.'.join(ip_parts[:-1])}.0/{netmask}" + self.logger.debug(f"네트워크: {network}") + + # 기존 규칙 정리 (테이블별) + self.logger.debug(f"기존 규칙 정리 중...") + cleanup_commands = [ + ["ip", "rule", "del", "from", f"{ip}/32"], + ["ip", "route", "del", "default", "table", str(table_id)], + ["ip", "route", "del", network, "table", str(table_id)], ] - - for cmd in commands: - if not self.run_command(cmd, ignore_errors=['File exists', 'No such file or directory']): + + for cmd in cleanup_commands: + self.run_command( + cmd, + ignore_errors=["No such file or directory", "Cannot find device"], + ) + + # 새 규칙 추가 + self.logger.debug(f"새 라우팅 규칙 추가 중...") + + # 1. 로컬 네트워크 라우트 + success, _ = self.run_command( + [ + "ip", + "route", + "add", + network, + "dev", + name, + "src", + ip, + "table", + str(table_id), + ], + ignore_errors=["File exists"], + ) + + if not success: + self.logger.error(f"로컬 네트워크 라우트 추가 실패: {network}") + return False + + # 2. 기본 게이트웨이 + success, _ = self.run_command( + [ + "ip", + "route", + "add", + "default", + "via", + gateway, + "dev", + name, + "table", + str(table_id), + ], + ignore_errors=["File exists"], + ) + + if not success: + self.logger.error(f"기본 게이트웨이 추가 실패: {gateway}") + return False + + # 3. 정책 규칙 + success, _ = self.run_command( + [ + "ip", + "rule", + "add", + "from", + f"{ip}/32", + "table", + str(table_id), + "pref", + str(priority), + ], + ignore_errors=["File exists"], + ) + + if not success: + self.logger.error(f"정책 규칙 추가 실패: from {ip}/32") + return False + + # 적용 확인 + self.logger.debug(f"적용 결과 확인 중...") + success, output = self.run_command(["ip", "rule", "show"]) + if success: + if f"from {ip}" in output: + self.logger.info(f"✅ {name} 정책 규칙 적용 확인됨") + else: + self.logger.error(f"❌ {name} 정책 규칙 적용 확인 실패") return False - - self.last_interface_check[name] = current_state + + success, output = self.run_command( + ["ip", "route", "show", "table", str(table_id)] + ) + if success and "default via" in output: + self.logger.info(f"✅ {name} 라우팅 테이블 적용 확인됨") + else: + self.logger.error(f"❌ {name} 라우팅 테이블 적용 확인 실패") + return False + self.managed_tables.add(table_id) - self.logger.info(f"인터페이스 {name} 라우팅 설정 완료 (table: {table_id})") + self.logger.info(f"=== {name} 인터페이스 라우팅 설정 완료 ===") return True - + except Exception as e: self.logger.error(f"인터페이스 {name} 라우팅 설정 실패: {e}") return False @@ -312,43 +487,144 @@ class PolicyRoutingManager: """인터페이스 체크 및 적용""" try: interfaces = self.get_network_interfaces() - current_interfaces = {iface['name']: iface for iface in interfaces} - + current_interfaces = {iface["name"]: iface for iface in interfaces} + config_changed = False - + for name, info in current_interfaces.items(): - if info['state'] == 'UP' and info['ip'] and info['gateway']: - if name not in self.config['interfaces']: - table_id = self.config['global_settings']['base_table_id'] + len(self.config['interfaces']) - self.config['interfaces'][name] = { - 'enabled': True, - 'table_id': table_id, - 'priority': 100, - 'health_check_target': '8.8.8.8' + if info["state"] == "UP" and info["ip"] and info["gateway"]: + if name not in self.config["interfaces"]: + table_id = self.config["global_settings"][ + "base_table_id" + ] + len(self.config["interfaces"]) + self.config["interfaces"][name] = { + "enabled": True, + "table_id": table_id, + "priority": 100, + "health_check_target": "8.8.8.8", } config_changed = True self.logger.info(f"새 인터페이스 {name} 자동 추가됨") - - if self.config['interfaces'][name]['enabled']: - iface_config = self.config['interfaces'][name] - table_id = iface_config['table_id'] - priority = self.config['global_settings']['base_priority'] + iface_config['priority'] - + + if self.config["interfaces"][name]["enabled"]: + iface_config = self.config["interfaces"][name] + table_id = iface_config["table_id"] + priority = ( + self.config["global_settings"]["base_priority"] + + iface_config["priority"] + ) + self.setup_routing_table(name, table_id) self.apply_interface_routing(info, table_id, priority) - + if config_changed: self.save_config(self.config) - + except Exception as e: self.logger.error(f"인터페이스 체크 오류: {e}") + def debug_interface(self, interface_name: Union[str, None] = None): + """특정 인터페이스 디버깅""" + print(f"\n🔍 Policy Routing 디버그 정보") + print("=" * 50) + + # 인터페이스 정보 + interfaces = self.get_network_interfaces() + print(f"\n📡 감지된 인터페이스: {len(interfaces)}개") + for iface in interfaces: + print( + f" - {iface['name']}: {iface['ip']} -> {iface['gateway']} ({iface['state']})" + ) + + # 설정 정보 + config = self.load_config() + print(f"\n⚙️ 설정된 인터페이스: {len(config.get('interfaces', {}))}개") + for name, conf in config.get("interfaces", {}).items(): + status = "활성화" if conf.get("enabled") else "비활성화" + print(f" - {name}: {status} (테이블 ID: {conf.get('table_id')})") + + # 현재 라우팅 규칙 + print(f"\n📋 현재 Policy 규칙:") + success, output = self.run_command(["ip", "rule", "show"]) + if success: + rules = [ + line + for line in output.split("\n") + if "lookup 1" in line and line.strip() + ] + if rules: + for rule in rules: + print(f" - {rule}") + else: + print(" ❌ Policy routing 규칙 없음") + + # 라우팅 테이블 + print(f"\n🗂️ 라우팅 테이블:") + for table_id in range(100, 110): + success, output = self.run_command( + ["ip", "route", "show", "table", str(table_id)] + ) + if success and output.strip(): + print(f" 테이블 {table_id}:") + for route in output.strip().split("\n"): + print(f" - {route}") + + def apply_single_interface(self, interface_name: str): + """단일 인터페이스에 규칙 적용""" + self.config = self.load_config() + interfaces = self.get_network_interfaces() + + target_interface = None + for iface in interfaces: + if iface["name"] == interface_name: + target_interface = iface + break + + if not target_interface: + self.logger.error(f"인터페이스 {interface_name}을 찾을 수 없습니다.") + return False + + if target_interface["state"] != "UP" or not target_interface["ip"]: + self.logger.error( + f"인터페이스 {interface_name}이 활성화되지 않았거나 IP가 없습니다." + ) + return False + + # 설정에 추가 (없으면) + if interface_name not in self.config["interfaces"]: + table_id = self.config["global_settings"]["base_table_id"] + len( + self.config["interfaces"] + ) + self.config["interfaces"][interface_name] = { + "enabled": True, + "table_id": table_id, + "priority": 100, + "health_check_target": "8.8.8.8", + } + self.save_config(self.config) + + iface_config = self.config["interfaces"][interface_name] + table_id = iface_config["table_id"] + priority = ( + self.config["global_settings"]["base_priority"] + iface_config["priority"] + ) + + self.setup_routing_table(interface_name, table_id) + success = self.apply_interface_routing(target_interface, table_id, priority) + + if success: + print(f"✅ {interface_name} 인터페이스 규칙 적용 완료") + else: + print(f"❌ {interface_name} 인터페이스 규칙 적용 실패") + + return success + def monitor_interfaces(self): """인터페이스 모니터링 (폴링 방식)""" while self.running: try: self._check_and_apply_interfaces() - time.sleep(self.config['check_interval']) + time.sleep(self.config["check_interval"]) except Exception as e: self.logger.error(f"모니터링 오류: {e}") time.sleep(5) @@ -357,28 +633,32 @@ class PolicyRoutingManager: """데몬 모드 시작""" self.config = self.load_config() self.running = True - + # 신호 핸들러 등록 signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGHUP, self._reload_config) - + self.logger.info("Policy Routing Manager 시작됨") - + # Netlink 모니터링 시작 (설정에 따라) - if self.config.get('monitoring', {}).get('use_netlink', True): + if self.config.get("monitoring", {}).get("use_netlink", True): self.netlink_monitor = NetlinkMonitor(self.network_change_callback) - netlink_thread = threading.Thread(target=self.netlink_monitor.start, daemon=True) + netlink_thread = threading.Thread( + target=self.netlink_monitor.start, daemon=True + ) netlink_thread.start() - + # 폴링 모니터링 시작 - if self.config.get('monitoring', {}).get('use_polling', True): - monitor_thread = threading.Thread(target=self.monitor_interfaces, daemon=True) + if self.config.get("monitoring", {}).get("use_polling", True): + monitor_thread = threading.Thread( + target=self.monitor_interfaces, daemon=True + ) monitor_thread.start() - + # 초기 설정 적용 self._check_and_apply_interfaces() - + try: while self.running: time.sleep(1) @@ -413,20 +693,21 @@ class PolicyRoutingManager: self.logger.info("외부 트리거로 새로고침 요청됨") self.network_change_callback("external", "refresh") + # [PolicyRoutingInstaller 클래스는 이전과 동일] class PolicyRoutingInstaller: def __init__(self): - self.logger = logging.getLogger('installer') - + self.logger = logging.getLogger("installer") + def check_requirements(self) -> bool: if os.geteuid() != 0: print("오류: 관리자 권한이 필요합니다.") return False - - required_commands = ['ip', 'systemctl'] + + required_commands = ["ip", "systemctl"] for cmd in required_commands: try: - subprocess.run(['which', cmd], check=True, capture_output=True) + subprocess.run(["which", cmd], check=True, capture_output=True) except subprocess.CalledProcessError: print(f"오류: {cmd} 명령어를 찾을 수 없습니다.") return False @@ -435,13 +716,13 @@ class PolicyRoutingInstaller: def install(self): if not self.check_requirements(): return False - + try: - script_content = open(__file__, 'r').read() - with open(SCRIPT_PATH, 'w') as f: + script_content = open(__file__, "r").read() + with open(SCRIPT_PATH, "w") as f: f.write(script_content) os.chmod(SCRIPT_PATH, 0o755) - + service_content = f"""[Unit] Description=Policy-Based Routing Manager After=network.target @@ -458,9 +739,9 @@ User=root [Install] WantedBy=multi-user.target """ - with open(SERVICE_FILE, 'w') as f: + with open(SERVICE_FILE, "w") as f: f.write(service_content) - + # 개선된 udev 규칙 udev_content = f"""# Policy Routing udev rules SUBSYSTEM=="net", ACTION=="add", RUN+="{SCRIPT_PATH} refresh" @@ -468,69 +749,103 @@ SUBSYSTEM=="net", ACTION=="remove", RUN+="{SCRIPT_PATH} refresh" SUBSYSTEM=="net", ACTION=="change", RUN+="{SCRIPT_PATH} refresh" SUBSYSTEM=="net", ACTION=="move", RUN+="{SCRIPT_PATH} refresh" """ - with open(UDEV_RULE_FILE, 'w') as f: + with open(UDEV_RULE_FILE, "w") as f: f.write(udev_content) - + # udev 규칙 재로드 - subprocess.run(['udevadm', 'control', '--reload-rules'], check=True) - subprocess.run(['udevadm', 'trigger', '--subsystem-match=net'], check=True) - - subprocess.run(['systemctl', 'daemon-reload'], check=True) - subprocess.run(['systemctl', 'enable', 'policy-routing'], check=True) - + subprocess.run(["udevadm", "control", "--reload-rules"], check=True) + subprocess.run(["udevadm", "trigger", "--subsystem-match=net"], check=True) + + subprocess.run(["systemctl", "daemon-reload"], check=True) + subprocess.run(["systemctl", "enable", "policy-routing"], check=True) + if not os.path.exists(CONFIG_FILE): - with open(CONFIG_FILE, 'w') as f: + with open(CONFIG_FILE, "w") as f: json.dump(DEFAULT_CONFIG, f, indent=2) - + print("✅ Policy Routing Manager 설치 완료!") print(" - Netlink 실시간 모니터링 지원") print(" - 개선된 udev 규칙") print(" - 폴링 백업 모니터링") - + return True - + except Exception as e: print(f"❌ 설치 실패: {e}") return False + def main(): - parser = argparse.ArgumentParser(description='Policy-Based Routing Manager') - parser.add_argument('action', choices=[ - 'install', 'uninstall', 'daemon', 'status', 'refresh', 'config', 'clean', 'test-udev' - ], help='실행할 작업') - + parser = argparse.ArgumentParser(description="Policy-Based Routing Manager") + parser.add_argument( + "action", + choices=[ + "install", + "uninstall", + "daemon", + "status", + "refresh", + "config", + "clean", + "debug", + "apply", + "test-udev", + ], + help="실행할 작업", + ) + parser.add_argument("--interface", help="특정 인터페이스 지정") + parser.add_argument("--debug", action="store_true", help="디버그 모드") + args = parser.parse_args() - - if args.action == 'install': + + if args.action == "debug": + manager = PolicyRoutingManager(debug=True) + manager.debug_interface(args.interface) + + elif args.action == "apply": + if not args.interface: + print("❌ --interface 옵션이 필요합니다.") + sys.exit(1) + manager = PolicyRoutingManager(debug=True) + manager.apply_single_interface(args.interface) + + elif args.action == "daemon": + manager = PolicyRoutingManager(debug=args.debug) + manager.start_daemon() + + elif args.action == "install": installer = PolicyRoutingInstaller() installer.install() - - elif args.action == 'daemon': + + elif args.action == "daemon": manager = PolicyRoutingManager() manager.start_daemon() - - elif args.action == 'refresh': + + elif args.action == "refresh": manager = PolicyRoutingManager() manager.refresh_from_external() - - elif args.action == 'test-udev': + + elif args.action == "test-udev": # udev 규칙 테스트 print("🔍 udev 이벤트 모니터링 중... (Ctrl+C로 중지)") print("새 네트워크 인터페이스를 연결해보세요.") os.system("udevadm monitor --environment --udev --subsystem-match=net") - - elif args.action == 'status': + + elif args.action == "status": manager = PolicyRoutingManager() interfaces = manager.get_network_interfaces() print("📡 네트워크 인터페이스 상태:") for iface in interfaces: - print(f" - {iface['name']}: {iface['ip']} -> {iface['gateway']} ({iface['state']})") - + print( + f" - {iface['name']}: {iface['ip']} -> {iface['gateway']} ({iface['state']})" + ) + # udev 규칙 상태 확인 if os.path.exists(UDEV_RULE_FILE): print(f"\n✅ udev 규칙 설치됨: {UDEV_RULE_FILE}") else: print(f"\n❌ udev 규칙 없음: {UDEV_RULE_FILE}") -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main()