Kurznotiz: Aruba VC / Virtual Instant 8.10.x AP-Kabelfehler schneller finden

Im VC selbst gibt es keine Übersicht, jene die Interface-Anbindung der Accesspoints auflistet, ob sie Gigabit oder bei Aderfehlern/Überlänge nur mit 100M, ohne CRC oder mit jeder Menge CRC-Fehlern angebunden sind. Das ist suboptimal gelöst.

Wenn Du jetzt wissen willst, was da auf der AP-Seite los ist, must Du zu Fuß los und im schlimmsten Fall jeden AP einzeln auf der Konsole oder der Managementoberfläche (z.B. Airwave) fragen.

Im schlimmsten Fall ist der AP auch noch falschherum angebunden, das sorgt dann für Chrüsimüsi, wenn der Bridge-Mode nicht richtig konfiguriert ist und oder LACP fehlt:

aa:bb:cc:dd:ee:ff# sh interface
eth0 is up, line protocol is down
Hardware is 5 Gigabit Ethernet, address is aa:bb:cc:dd:ee:ff
Speed 10Mb/s, duplex half
Received packets                 0
Received bytes                   0
Receive dropped                  0
Receive errors                   0
Receive missed errors            0
Receive overrun errors           0
Receive frame errors             0
Receive CRC errors               0
Receive length errors            0
Transmitted packets              0
Transmitted bytes                0
Transmitted dropped              0
Transmission errors              0
Lost carrier                     0
eth1 is up, line protocol is up
Hardware is 5 Gigabit Ethernet, address is aa:bb:cc:dd:ee:ff
Speed 100Mb/s, duplex half
Received packets                 45014477
Received bytes                   8655039766
Receive dropped                  1
Receive errors                   0
Receive missed errors            0
Receive overrun errors           0
Receive frame errors             0
Receive CRC errors               19885255541
Receive length errors            0
Transmitted packets              2893940
Transmitted bytes                1565615128
Transmitted dropped              0
Transmission errors              0
Lost carrier                     0

^^ So etwas willst Du einfach nicht sehen!

Anbei ein Python-Script mit viel Hilfe von KI, welches vom Schwarm-Conductor (Master) die AP’s auflistet (show aps), diese dann threaded per show interface threaded durchbrömmelt, um nachzufragen, ob sie denn Gigabit angebunden sind, und oder ob CRC-Fehler auf einem oder beiden Interfaces aufgetreten sind, sprich sie schaun da mal in’s Glas, wie spät das ist.

Anwendung auf eigene Gefahr. Verbesserungsvorschläge sind willkommen.

Python

Das Script benötigt Python, falls nicht installiert, findbar unter python.org/downloads/. Die Pfadvariable muss gesetzt werden (Add Python to PATH), mit Admin-Rechten installiert.

Paramiko

Das Skript braucht die „paramiko“-Bibliothek. Damit lassen sich ssh-Verbindungen herstellen. Installieren lässt sich die Bibliothek mit

pip install paramiko

Anschließend braucht’s ein wenig Code (3 Dateien)

Einmal brauchst Du eine passwort.txt, in jener Du das Kennwort reinschreibst. Ich habe mangels Kenntnis hier noch keine sichere Speichermöglichkeit gefunden. Dazu braucht’s eine

config.json:

{
    "master_ip": "10.20.30.4",
    "username": "admin",
    "password_file": "passwort.txt",
    "timeout_seconds": 30
}

Sowie den ganzen Python-Kram:

ap_check.py

# Scriptanfang
import paramiko
import re
import csv
import sys
import time
import json
import threading
from queue import Queue

# Anzahl der gleichzeitigen SSH-Verbindungen
NUM_THREADS = 10

# Gemacht von John mit viel Liebe und Gemini

def execute_command_on_shell(client, command):
    """Führt einen Befehl in einer interaktiven Shell aus und wartet auf den Prompt."""
    channel = client.invoke_shell()
    time.sleep(1)
    channel.recv(65535) # Puffer leeren
    channel.send("no paging\n")
    time.sleep(1)
    channel.send(command + "\n")
    
    output = ""
    end_marker_found = False
    read_attempts = 0
    # Wartet auf den Prompt (#), um das Ende der Ausgabe zu erkennen
    while not end_marker_found and read_attempts < 15:
        time.sleep(1) 
        if channel.recv_ready():
            chunk = channel.recv(65535).decode('utf-8', errors='ignore')
            output += chunk
            if output.strip().endswith('#'):
                end_marker_found = True
        else:
            read_attempts += 1 # Verhindert eine Endlosschleife, wenn der blöde Prompt auf den Hercules-Huetten nicht kommt
    return output

def get_ap_list(ip, user, pw, timeout):
    """Fragt beim Master-AP die Liste aller Member-APs ab."""
    print(f"INFO: Frage AP-Liste von Master-AP {ip} ab...")
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ap_data = {}
    
    try:
        client.connect(ip, username=user, password=pw, timeout=timeout)
        output = execute_command_on_shell(client, "show aps")
        # Verbesserter Regex, der die Spalten gezielt extrahiert
        line_regex = re.compile(r"^\S+\s+([\d\.]+)\s+access.*? (CN\w+)\s+.*?\s(\d+d:\d+h:\d+m:\d+s)")
        
        for line in output.splitlines():
            match = line_regex.search(line)
            if match:
                ap_ip, serial, uptime = match.groups()
                ap_data[ap_ip] = {'serial': serial, 'uptime': uptime}
        
        print(f"INFO: {len(ap_data)} Member-APs zur Überprüfung gefunden.")
        return ap_data
    except Exception as e:
        print(f"FEHLER: Verbindung zum Master-AP {ip} fehlgeschlagen: {e}")
        return None
    finally:
        if client: client.close()

def check_ap_interfaces(ip, user, pw, timeout):
    """
    Liest die Daten für eth0, eth1 und bond0 aus und gibt sie als strukturiertes Wörterbuch zurück.
    """
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    interface_results = {
        'eth0_speed': 'N/A', 'eth0_errors': 0,
        'eth1_speed': 'N/A', 'eth1_errors': 0,
        'crashed': "0"
    }

    try:
        client.connect(ip, username=user, password=pw, timeout=timeout)
        output = execute_command_on_shell(client, "show interface")
        
        interface_blocks = re.split(r'\n(?=(?:eth|bond)\d is)', '\n' + output)
        
        for block in interface_blocks:
            name_match = re.search(r"^((?:eth|bond)\d)", block.strip())
            if not name_match: continue
            if_name = name_match.group(1)

            csv_if_name = "eth0" if if_name == "bond0" else if_name
            if csv_if_name not in ["eth0", "eth1"]: continue

            speed_duplex = "N/A"
            if sm := re.search(r"Speed (\S+), duplex (\w+)", block):
                speed_duplex = f"{sm.group(1)}, {sm.group(2)}"
            
            # --- KORREKTUR: Es werden nur noch CRC-Fehler gezählt ---
            errors = 0
            if crc_match := re.search(r"Receive CRC errors\s+(\d+)", block):
                errors = int(crc_match.group(1))
            
            interface_results[f'{csv_if_name}_speed'] = speed_duplex
            interface_results[f'{csv_if_name}_errors'] = errors
            
        return interface_results

    except Exception as e:
        print(f"FEHLER bei AP {ip}: {e}")
        interface_results['crashed'] = "X"
        return interface_results
    finally:
        if client: client.close()

def worker(task_queue, result_queue, user, pw, timeout):
    """Entscheidung, ob ein Ergebnis gemeldet wird."""
    while True:
        item = task_queue.get()
        if item is None: break
        
        ip, ap_info = item
        print(f"INFO: Thread startet Überprüfung von AP {ip}...")
        
        iface_data = check_ap_interfaces(ip, user, pw, timeout)
        
        report_needed = False
        if iface_data['crashed'] == "X":
            report_needed = True
        # Prüft, ob eth0 nicht 1000/full ist (und nicht N/A, was auf einen Fehler hindeutet)
        elif "1000mb/s, full" not in iface_data['eth0_speed'].lower() and iface_data['eth0_speed'] != 'N/A':
            report_needed = True
        # Prüft auf CRC-Fehler auf beiden Interfaces
        elif iface_data['eth0_errors'] > 0 or iface_data['eth1_errors'] > 0:
            report_needed = True

        if report_needed:
            full_result = ap_info.copy()
            full_result.update(iface_data)
            full_result['ip'] = ip
            result_queue.put(full_result)
            
        task_queue.task_done()

# === Hauptskript ===
if __name__ == "__main__":
    try:
        with open("config.json", 'r') as f: config = json.load(f)
        with open(config['password_file'], 'r') as f: ssh_password = f.read().strip()
    except FileNotFoundError as e:
        print(f"FEHLER: Konfigurationsdatei nicht gefunden: {e.filename}"); sys.exit(1)
    
    master_ip = config['master_ip']
    ssh_username = config['username']
    timeout = config.get('timeout_seconds', 30)

    if len(sys.argv) > 1:
        master_ip = sys.argv[1]
        print(f"INFO: Master-IP '{master_ip}' aus Befehlszeile wird verwendet.")

    all_aps = get_ap_list(master_ip, ssh_username, ssh_password, timeout)
    
    if all_aps:
        task_queue, result_queue, threads = Queue(), Queue(), []
        
        for _ in range(NUM_THREADS):
            thread = threading.Thread(target=worker, args=(task_queue, result_queue, ssh_username, ssh_password, timeout))
            thread.start()
            threads.append(thread)
            
        for ip, info in all_aps.items(): task_queue.put((ip, info))
        task_queue.join()
        for _ in range(NUM_THREADS): task_queue.put(None)
        for thread in threads: thread.join()
        
        problem_aps = []
        while not result_queue.empty(): problem_aps.append(result_queue.get())

        if problem_aps:
            print(f"\nINFO: Schreibe Bericht für {len(problem_aps)} AP(s) in 'ap_fehlerbericht.csv'...")
            
            fieldnames = ['IP Adresse', 'Seriennummer', 'Laufzeit', 'eth0_speed', 'eth0_errors', 'eth1_speed', 'eth1_errors', 'crashed']
            
            with open("ap_fehlerbericht.csv", mode='w', newline='', encoding='utf-8') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                
                for ap in sorted(problem_aps, key=lambda x: x['ip']):
                    writer.writerow({
                        'IP Adresse': ap.get('ip'),
                        'Seriennummer': ap.get('serial'),
                        'Laufzeit': ap.get('uptime'),
                        'eth0_speed': ap.get('eth0_speed'),
                        'eth0_errors': ap.get('eth0_errors'),
                        'eth1_speed': ap.get('eth1_speed'),
                        'eth1_errors': ap.get('eth1_errors'),
                        'crashed': ap.get('crashed')
                    })
            
            print("INFO: Bericht erfolgreich erstellt.")
        else:
            print("\nINFO: Keine APs mit Problemen gefunden. Es wird kein Bericht erstellt.")

# Scriptende

Starten kannst Du das ganze dann mit

py ap_check.py <vc-IP>

Wenn Du die IP des VC’s nicht ranschreibst, dann nimmt er die IP aus der config.json.