#!/usr/bin/python3 """ slippi-net-test Naive, very hacky example code for connecting to the net thread listener PoC and streaming some Slippi data into your terminal on stdout. Only implementing some very basic parsing of Slippi messages here. This will probably break as the networking code is updated. A beautiful, safe, performant, Windows-compatible implementation has been left as an excercise to the reader :^) """ from struct import pack, unpack from sys import argv import socket import select import errno import sys import os if (len(argv)) < 3: print("usage: slippi-net-test ") exit(0) ADDR = str(argv[1]) PORT = int(argv[2]) # These are the message types and lengths supported by this particular example. # In reality, a Slippi client should self identify commands by emitting the # 0x35 command to us [of length ((3*num_commands)+1)]. EVENT_PAYLOADS = 0x35 GAME_START = 0x36 PRE_FRAME = 0x37 POST_FRAME = 0x38 GAME_END = 0x39 cmd = { GAME_START: 0x160, PRE_FRAME: 0x3b, POST_FRAME: 0x25, GAME_END: 0x1, } glob_off = 0 glob_frame = 0 glob_state = {0x0: 0, 0x1: 0, 0x2: 0, 0x3: 0} glob_buf = bytes() # Naive parsing of Slippi messages. Does some work on a global buffer. We're # basically just relying on a lot of shared state here and the fact that # there'll probably be some time in-between the file descriptor having some # data that we can read off the wire. def process(): global glob_off global glob_frame global glob_state global glob_buf # If we don't have enough data, this function will just return and let the # main loop call recv() again to expand the stream while (glob_off < len(glob_buf)): # EVENT_PAYLOADS if glob_buf[glob_off] == 0x35: #print("[*] Event payload message") glob_off += 1 payload_size = unpack(">b", glob_buf[glob_off:glob_off+1])[0] glob_off += 1 num_commands = (payload_size-1) // 3 for i in range(0, num_commands): command = unpack(">b", glob_buf[glob_off:glob_off+1])[0] glob_off += 1 command_size = unpack(">H", glob_buf[glob_off:glob_off+2])[0] glob_off += 2 #print("supported command {}, size {}".format(hex(command), hex(command_size))) continue # GAME_START if glob_buf[glob_off] == 0x36: blk = glob_buf[glob_off:glob_off+cmd[0x36]] try: version = hex(unpack(">L", blk[0x1:0x5])[0]) stage = hex(unpack(">H", blk[0x13:0x15])[0]) except: break glob_off += 1 glob_off += cmd[0x36] continue # PRE_FRAME if glob_buf[glob_off] == 0x37: blk = glob_buf[glob_off:glob_off+cmd[0x37]] try: frame = unpack(">i", blk[0x1:0x5])[0] player_idx = unpack(">b", blk[0x5:0x6])[0] state = unpack(">H", blk[0xb:0xd])[0] glob_frame = frame glob_state[player_idx] = state except: break glob_off += 1 glob_off += cmd[0x37] continue # POST_FRAME if glob_buf[glob_off] == 0x38: blk = glob_buf[glob_off:glob_off+cmd[0x38]] try: frame = unpack(">i", blk[0x1:0x5])[0] player_idx = unpack(">b", blk[0x5:0x6])[0] state = unpack(">H", blk[0x8:0xa])[0] glob_frame = frame glob_state[player_idx] = state except: break glob_off += 1 glob_off += cmd[0x38] continue # GAME_END if glob_buf[glob_off] == 0x39: glob_off += 1 glob_off += cmd[0x39] continue # Otherwise, we're SOL else: print("stuck in buffer at glob_off={}".format(hex(glob_off))) exit(-1) # Set up a socket and connect to it. This seems sufficient for a hacky PoC. server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.connect((ADDR, PORT)) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.setblocking(0) inputs = [server] outputs = [] # Eh, I think using select() should work alright like this. Otherwise I guess # Python will just spin forever and turn one of your CPU cores into slag while True: print("frame={}, state={}".format(glob_frame, glob_state), end='\r') readable, writable, exceptional = select.select(inputs, outputs, inputs) for s in readable: try: buf = s.recv(0x5b4*3) except socket.error as e: if (e.args[0] == errno.EWOULDBLOCK): process() continue if (len(buf) != 0): glob_buf += buf process() process() server.close()