127 lines
6.3 KiB
Python
127 lines
6.3 KiB
Python
import re
|
|
import aiohttp
|
|
import time
|
|
import py_mini_racer
|
|
import base64
|
|
import uvicorn
|
|
import websockets
|
|
import json
|
|
import asyncio
|
|
import random
|
|
import itertools
|
|
|
|
from fastapi import FastAPI, Form
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
|
class App(FastAPI):
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(title="Kahoot API", openapi_url=None)
|
|
self.add_middleware(CORSMiddleware, allow_origins=["*"])
|
|
self.semaphore = asyncio.Semaphore(10)
|
|
|
|
async def connect(self, pin: int, name: str, number: int, namerator: bool = False) -> None:
|
|
try:
|
|
async with self.semaphore:
|
|
async with aiohttp.ClientSession() as session:
|
|
if namerator:
|
|
async with session.get("https://apis.kahoot.it/") as response:
|
|
name = (await response.json(content_type=None))["name"]
|
|
async with session.get(f"https://play.kahoot.it/reserve/session/{pin}/?{int(time.time() * 1000)}") as response:
|
|
sessionToken = response.headers["x-kahoot-session-token"]
|
|
challenge = (await response.json())["challenge"]
|
|
text = re.split("[{};]", challenge.replace("\t", "").encode("ascii", "ignore").decode())
|
|
solChars = [ord(s) for s in py_mini_racer.MiniRacer().eval("".join([text[1] + "{", text[2] + ";", "return message.replace(/./g, function(char, position) {", text[7] + ";})};", text[0]]))]
|
|
sessChars = [ord(s) for s in base64.b64decode(sessionToken).decode()]
|
|
sessionID = "".join([chr(sessChars[i] ^ solChars[i % len(solChars)]) for i in range(len(sessChars))])
|
|
ws = await websockets.connect(f"wss://play.kahoot.it/cometd/{pin}/{sessionID}", open_timeout=None, ping_timeout=None, close_timeout=None)
|
|
await ws.send(json.dumps([{"channel": "/meta/handshake"}]))
|
|
clientID = json.loads(await ws.recv())[0]["clientId"]
|
|
await ws.send(json.dumps([
|
|
{
|
|
"channel": "/service/controller",
|
|
"data": {
|
|
"type": "login",
|
|
"gameid": pin,
|
|
"name": name,
|
|
},
|
|
"clientId": clientID,
|
|
}
|
|
]))
|
|
await ws.send(json.dumps([
|
|
{
|
|
"channel": "/service/controller",
|
|
"data": {
|
|
"type": "message",
|
|
"gameid": pin,
|
|
"id": 16,
|
|
},
|
|
"clientId": clientID,
|
|
}
|
|
]))
|
|
while True:
|
|
message = json.loads(await ws.recv())[0]
|
|
if message.get("data", {}).get("id", None) == 2:
|
|
question = json.loads(message["data"]["content"])
|
|
if question["type"] == "quiz":
|
|
await asyncio.sleep(number / 100)
|
|
await ws.send(json.dumps([
|
|
{
|
|
"channel": "/service/controller",
|
|
"data": {
|
|
"type": "message",
|
|
"gameid": pin,
|
|
"id": 45,
|
|
"content": json.dumps({"type": "quiz", "choice": random.randint(0, question["numberOfChoices"] - 1), "questionIndex": question["gameBlockIndex"]})
|
|
},
|
|
"clientId": clientID,
|
|
}
|
|
]))
|
|
elif question["type"] == "multiple_select_quiz":
|
|
await asyncio.sleep(number / 100)
|
|
await ws.send(json.dumps([
|
|
{
|
|
"channel": "/service/controller",
|
|
"data": {
|
|
"type": "message",
|
|
"gameid": pin,
|
|
"id": 45,
|
|
"content": json.dumps({"type": "multiple_select_quiz", "choice": [random.randint(0, question["numberOfChoices"] - 1)], "questionIndex": question["gameBlockIndex"]})
|
|
},
|
|
"clientId": clientID,
|
|
}
|
|
]))
|
|
elif message.get("data", {}).get("id", None) in [10, 13] or message.get("data", {}).get("reason", None) == "disconnect":
|
|
await ws.close()
|
|
except websockets.ConnectionClosed:
|
|
return
|
|
except:
|
|
if "ws" in locals().keys():
|
|
await ws.close()
|
|
|
|
app = App()
|
|
|
|
@app.post("/flood", response_class=JSONResponse)
|
|
async def flood(pin: int = Form(0), naming: str = Form(""), name: str = Form(""), amount: int = Form(0)) -> JSONResponse:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"https://play.kahoot.it/reserve/session/{pin}/?{int(time.time() * 1000)}") as response:
|
|
if "x-kahoot-session-token" not in response.headers:
|
|
return JSONResponse({"message": "Invalid PIN specified.", "type": "error"}, 400)
|
|
if len(name) < 2 or len(name) > 15:
|
|
return JSONResponse({"message": "Invalid name specified.", "type": "error"}, 400)
|
|
if amount < 1 or amount > 2000:
|
|
return JSONResponse({"message": "Invalid amount specified.", "type": "error"}, 400)
|
|
if naming == "enumerated":
|
|
await asyncio.gather(*[app.connect(pin, f"{name[:15-len(str(i))]}{i}", i) for i in range(amount)])
|
|
elif naming == "capitalized":
|
|
names = list(map(''.join, itertools.product(*zip(name.upper(), name.lower()))))
|
|
await asyncio.gather(*[app.connect(pin, names[i], i) for i in range(amount)])
|
|
elif naming == "random":
|
|
await asyncio.gather(*[app.connect(pin, str(i), i, True) for i in range(amount)])
|
|
else:
|
|
return JSONResponse({"message": "Invalid naming method specified.", "type": "error"}, 400)
|
|
return {"message": "Game finished.", "type": "success"}
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run("app:app", host="0.0.0.0", port=80) |