What we have here is the outcome of the wonderful question "I wonder if that would work". By running pygame via async, and bolting aiohttp on top of it, we have player creation and controls working via the web browser, while the game itself is displayed via pygame.
The idea behind this, is running a fullscreen pygame game, where various people connect to it with their cell phones and join in the game. And it actually works surprisingly well at that.
The actual "game" is just different colored squares for each player, that can move around. Not too exciting, I was just testing that it'd actually work.
Feel free to roast the code or ask questions or whatever lol
The idea behind this, is running a fullscreen pygame game, where various people connect to it with their cell phones and join in the game. And it actually works surprisingly well at that.
The actual "game" is just different colored squares for each player, that can move around. Not too exciting, I was just testing that it'd actually work.
Feel free to roast the code or ask questions or whatever lol
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
import asyncio from aiohttp import web import pygame # async version of pygame.time.Clock class Clock: def __init__( self , time_func = pygame.time.get_ticks): self .time_func = time_func self .last_tick = time_func() or 0 async def tick( self , fps = 0 ): if 0 > = fps: return end_time = ( 1.0 / fps) * 1000 current = self .time_func() time_diff = current - self .last_tick delay = (end_time - time_diff) / 1000 self .last_tick = current if delay < 0 : delay = 0 await asyncio.sleep(delay) class EventEngine: def __init__( self ): self .listeners = {} def on( self , event): if event not in self .listeners: self .listeners[event] = [] def wrapper(func, * args): self .listeners[event].append(func) return func return wrapper # this function is purposefully not async # code calling this will do so in a "fire-and-forget" manner, and shouldn't be slowed down by needing to await a result def trigger( self , event, * args, * * kwargs): asyncio.create_task( self .async_trigger(event, * args, * * kwargs)) # whatever gets triggered is just added to the current asyncio event loop, which we then trust to run eventually async def async_trigger( self , event, * args, * * kwargs): if event in self .listeners: handlers = [func( * args, * * kwargs) for func in self .listeners[event]] # schedule all listeners to run return await asyncio.gather( * handlers) events = EventEngine() class Player: player_count = 0 def __init__( self , color): Player.player_count + = 1 self .player_id = Player.player_count self .surface = self .original_surface = self .create_surface(color) self .pos = [ 10 , 10 ] self .movement_intensity = 10 self .register_handlers() def create_surface( self , color): surf = pygame.Surface(( 25 , 25 ), pygame.SRCALPHA) surf.fill(color) return surf def register_handlers( self ): events.on( f "input.move_up.{self.player_id}" )( self .move_up) events.on( f "input.move_right.{self.player_id}" )( self .move_right) async def move_right( self , amount): self .pos[ 0 ] + = amount * self .movement_intensity async def move_up( self , amount): # 0 == top of screen, so 'up' is negative self .pos[ 1 ] - = amount * self .movement_intensity async def update( self , window): window.blit( self .surface, self .pos) class Game: player_colors = [( 155 , 155 , 0 ), ( 0 , 155 , 155 ), ( 155 , 0 , 155 )] def __init__( self ): self .players = [] events.on( "player.add" )( self .create_player) async def create_player( self ): color = self .player_colors[ len ( self .players) % len ( self .player_colors)] new_player = Player(color) self .players.append(new_player) return new_player async def update( self , window): for player in self .players: await player.update(window) html_page = """ <html><body><table> <tr><td></td><td><a href="/move/{player_id}/up"><button>UP</button></a></td><td></td></tr> <tr><td><a href="/move/{player_id}/left"><button>LEFT</button></td> <td></td> <td><a href="/move/{player_id}/right"><button>RIGHT</button></td></tr> <tr><td></td><td><a href="/move/{player_id}/down"><button>DOWN</button></a></td><td></td></tr> </table></body></html> """ class WebFrontend: def __init__( self , port = 8080 ): self .port = port self .runner = None self .app = web.Application() self .app.add_routes( [ web.get( "/" , self .register_new_user), web.get( "/controls/{player_id}" , self .player_controls), web.get( "/move/{player_id}/{direction}" , self .move_player), ] ) async def register_new_user( self , request): player = ( await events.async_trigger( "player.add" ))[ 0 ] raise web.HTTPFound(location = f "/controls/{player.player_id}" ) async def player_controls( self , request): data = request.match_info[ "player_id" ] return web.Response( content_type = "text/html" , body = html_page. format (player_id = data) ) async def move_player( self , request): player_id = request.match_info[ "player_id" ] action = request.match_info[ "direction" ] direction = "right" if action in ( "left" , "right" ) else "up" power = 1 if action in ( "right" , "up" ) else - 1 events.trigger( f "input.move_{direction}.{player_id}" , power) return await self .player_controls(request) async def startup( self ): self .runner = web.AppRunner( self .app) await self .runner.setup() site = web.TCPSite( self .runner, "localhost" , 8080 ) await site.start() async def shutdown( self ): if self .runner: await self .runner.cleanup() async def main(): window = pygame.display.set_mode(( 500 , 500 )) web_server = WebFrontend() await web_server.startup() game = Game() # add a local player local_player = ( await events.async_trigger( "player.add" ))[ 0 ] local_player_id = local_player.player_id clock = Clock() while True : for ev in pygame.event.get(): if ev. type = = pygame.QUIT: return # handlers for a local player using keyboard if ev. type = = pygame.KEYDOWN: if ev.key = = pygame.K_LEFT: events.trigger( f "input.move_right.{local_player_id}" , - 1 ) elif ev.key = = pygame.K_RIGHT: events.trigger( f "input.move_right.{local_player_id}" , + 1 ) elif ev.key = = pygame.K_UP: events.trigger( f "input.move_up.{local_player_id}" , + 1 ) elif ev.key = = pygame.K_DOWN: events.trigger( f "input.move_up.{local_player_id}" , - 1 ) window.fill(( 0 , 0 , 0 )) await game.update(window) pygame.display.flip() await clock.tick( 30 ) if __name__ = = "__main__" : pygame.init() asyncio.run(main()) pygame.quit() |