Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /opt/prometheus-monitoring-scripts/lib/python3.10/site-packages/custom_exporters/ |
Upload File : |
import subprocess import os import re import json import sys import pwd from multiprocessing import Pool # Debug mode control DEBUG_MODE = os.environ.get('CUSTOM_EXPORTER_DEBUG', '0').lower() in ('1', 'true', 'yes') def debug_log(message): """ Log debug messages to stderr only when debug mode is enabled. Args: message: The message to log """ if DEBUG_MODE: print(f"DEBUG: {message}", file=sys.stderr) def run_command_silent(cmd, timeout=30): """ Run a command and handle errors silently unless debug mode is enabled. - If command succeeds, returns the result - If command fails, exits with the command's exit code Args: cmd: Command list to pass to subprocess.run timeout: Timeout in seconds Returns: subprocess.CompletedProcess or exits silently """ debug_log(f"Running command: {' '.join(cmd)}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) if result.returncode != 0: debug_log(f"Command failed with exit code {result.returncode}") if DEBUG_MODE and result.stderr: print(f"Command stderr: {result.stderr}", file=sys.stderr) sys.exit(result.returncode) debug_log("Command succeeded") if DEBUG_MODE and result.stdout: debug_log(f"Command stdout (first 100 chars): {result.stdout[:100]}") return result except subprocess.TimeoutExpired: debug_log(f"Command timed out after {timeout} seconds") sys.exit(1) except Exception as e: debug_log(f"Command failed with exception: {str(e)}") sys.exit(1) def parse_json_silent(json_text): """ Parse JSON and handle errors silently unless debug mode is enabled. - If parsing succeeds, returns the parsed data - If parsing fails, exits with code 1 Args: json_text: JSON string to parse Returns: Parsed JSON data or exits silently """ debug_log("Attempting to parse JSON") try: data = json.loads(json_text) debug_log("JSON parsing succeeded") return data except json.JSONDecodeError as e: debug_log(f"JSON parsing failed: {str(e)}") if DEBUG_MODE: print(f"Invalid JSON: {json_text[:100]}...", file=sys.stderr) sys.exit(1) except Exception as e: debug_log(f"Unexpected error in JSON parsing: {str(e)}") sys.exit(1) def safe_file_operation(operation, *args, **kwargs): """ Perform a file operation safely, capturing errors and exiting silently. Args: operation: Function to call (e.g., open, os.listdir) *args, **kwargs: Arguments to pass to the operation Returns: Result of the operation or exits silently """ debug_log(f"Performing file operation: {operation.__name__}") try: result = operation(*args, **kwargs) debug_log("File operation succeeded") return result except (FileNotFoundError, PermissionError) as e: debug_log(f"File operation failed: {str(e)}") sys.exit(1) except Exception as e: debug_log(f"Unexpected error in file operation: {str(e)}") sys.exit(1) def run_systemctl_command(service, timeout=5): """ Run systemctl is-active and handle the result correctly. For systemctl is-active, non-zero exit codes are expected and don't indicate errors. Args: service: Service name to check timeout: Command timeout in seconds Returns: True if active, False if inactive """ debug_log(f"Checking if {service} is active") try: result = subprocess.run( ["systemctl", "is-active", service], capture_output=True, text=True, timeout=timeout, check=False ) # systemctl is-active returns 0 for active, non-zero for inactive is_active = result.returncode == 0 debug_log(f"{service} is {'active' if is_active else 'inactive'}") return is_active except subprocess.TimeoutExpired: debug_log(f"Command timed out after {timeout} seconds") sys.exit(1) except Exception as e: debug_log(f"Command failed with exception: {str(e)}") sys.exit(1) def nsenter_single_command(pid, command, timeout): return subprocess.run(["nsenter", "-m", "-C", "-t", str(pid), "--", *command], timeout=timeout, capture_output=True, text=True) def nsenter_batch_command(pool_size=50, container_pid_map={}, command=[], command_timeout=5): result_dict = {} with Pool(pool_size) as p: jobs = {} for k, pid in container_pid_map.items(): res = p.apply_async(nsenter_single_command, (pid,command,command_timeout)) jobs[k] = res for k, job in jobs.items(): resp = job.get(timeout=(command_timeout + 5)) result_dict[k] = resp return result_dict def podman_single_command(pod, command, timeout): return subprocess.run(["podman", "exec", pod, *command], capture_output=True, text=True) def podman_batch_command(pool_size=50, command_timeout=10, pods=[], command=[]): result_dict = {} with Pool(pool_size) as p: jobs = {} for pod in pods: res = p.apply_async(podman_single_command, (pod,command, command_timeout)) jobs[pod] = res for pod, job in jobs.items(): resp = job.get(timeout=(command_timeout + 5)) result_dict[pod] = resp return result_dict def get_podman_names(pod_filter=None): data = get_podman_data(pod_filter) return [d['Names'][0] for d in data if 'Names' in d and len(d['Names'])] def get_podman_data(pod_filter=None): try: result = subprocess.run(["podman", "ps", "--format", "json"], capture_output=True, text=True) data = json.loads(result.stdout) except Exception: raise Exception("failed to get pods") if pod_filter: pod_filter_re = re.compile(pod_filter) filtered_data = [] for d in data: if 'Names' in d and len(d['Names']) > 0: if pod_filter_re.match(d['Names'][0]): filtered_data.append(d) return filtered_data return data def get_podman_disk_target(pod, mount): return os.readlink("/podman/container_data/%s/%s" % (pod, mount)) def get_user_by_name(username): try: return pwd.getpwnam(username) except Exception: return None