Source code for examples.space_invaders.space_invaders

  1. import curses
  2. import logging
  3. import random
  4. import re
  5. import sys
  6. import textwrap
  7. import time
  8. from sqlalchemy import Column
  9. from sqlalchemy import create_engine
  10. from sqlalchemy import ForeignKey
  11. from sqlalchemy import func
  12. from sqlalchemy import Integer
  13. from sqlalchemy import String
  14. from sqlalchemy.ext.declarative import declarative_base
  15. from sqlalchemy.ext.hybrid import hybrid_method
  16. from sqlalchemy.ext.hybrid import hybrid_property
  17. from sqlalchemy.orm import joinedload
  18. from sqlalchemy.orm import relationship
  19. from sqlalchemy.orm import Session
  20. _PY3 = sys.version_info > (3, 0)
  21. if _PY3:
  22. xrange = range
  23. logging.basicConfig(
  24. filename="space_invaders.log",
  25. format="%(asctime)s,%(msecs)03d %(levelname)-5.5s %(message)s",
  26. )
  27. logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
  28. Base = declarative_base()
  29. WINDOW_LEFT = 10
  30. WINDOW_TOP = 2
  31. WINDOW_WIDTH = 70
  32. WINDOW_HEIGHT = 34
  33. VERT_PADDING = 2
  34. HORIZ_PADDING = 5
  35. ENEMY_VERT_SPACING = 4
  36. MAX_X = WINDOW_WIDTH - HORIZ_PADDING
  37. MAX_Y = WINDOW_HEIGHT - VERT_PADDING
  38. LEFT_KEY = ord("j")
  39. RIGHT_KEY = ord("l")
  40. FIRE_KEY = ord(" ")
  41. PAUSE_KEY = ord("p")
  42. COLOR_MAP = {
  43. "K": curses.COLOR_BLACK,
  44. "B": curses.COLOR_BLUE,
  45. "C": curses.COLOR_CYAN,
  46. "G": curses.COLOR_GREEN,
  47. "M": curses.COLOR_MAGENTA,
  48. "R": curses.COLOR_RED,
  49. "W": curses.COLOR_WHITE,
  50. "Y": curses.COLOR_YELLOW,
  51. }
  52. class Glyph(Base):
  53. """Describe a "glyph", a graphical element
  54. to be painted on the screen.
  55. """
  56. __tablename__ = "glyph"
  57. id = Column(Integer, primary_key=True)
  58. name = Column(String)
  59. type = Column(String)
  60. width = Column(Integer)
  61. height = Column(Integer)
  62. data = Column(String)
  63. alt_data = Column(String)
  64. __mapper_args__ = {"polymorphic_on": type}
  65. def __init__(self, name, img, alt=None):
  66. self.name = name
  67. self.data, self.width, self.height = self._encode_glyph(img)
  68. if alt is not None:
  69. self.alt_data, alt_w, alt_h = self._encode_glyph(alt)
  70. def _encode_glyph(self, img):
  71. """Receive a textual description of the glyph and
  72. encode into a format understood by
  73. GlyphCoordinate.render().
  74. """
  75. img = re.sub(r"^\n", "", textwrap.dedent(img))
  76. color = "W"
  77. lines = [line.rstrip() for line in img.split("\n")]
  78. data = []
  79. for line in lines:
  80. render_line = []
  81. line = list(line)
  82. while line:
  83. char = line.pop(0)
  84. if char == "#":
  85. color = line.pop(0)
  86. continue
  87. render_line.append((color, char))
  88. data.append(render_line)
  89. width = max([len(rl) for rl in data])
  90. data = "".join(
  91. "".join("%s%s" % (color, char) for color, char in render_line)
  92. + ("W " * (width - len(render_line)))
  93. for render_line in data
  94. )
  95. return data, width, len(lines)
  96. def glyph_for_state(self, coord, state):
  97. """Return the appropriate data representation
  98. for this Glyph, based on the current coordinates
  99. and state.
  100. Subclasses may override this to provide animations.
  101. """
  102. return self.data
  103. class GlyphCoordinate(Base):
  104. """Describe a glyph rendered at a certain x, y coordinate.
  105. The GlyphCoordinate may also include optional values
  106. such as the tick at time of render, a label, and a
  107. score value.
  108. """
  109. __tablename__ = "glyph_coordinate"
  110. id = Column(Integer, primary_key=True)
  111. glyph_id = Column(Integer, ForeignKey("glyph.id"))
  112. x = Column(Integer)
  113. y = Column(Integer)
  114. tick = Column(Integer)
  115. label = Column(String)
  116. score = Column(Integer)
  117. glyph = relationship(Glyph, innerjoin=True)
  118. def __init__(
  119. self, session, glyph_name, x, y, tick=None, label=None, score=None
  120. ):
  121. self.glyph = session.query(Glyph).filter_by(name=glyph_name).one()
  122. self.x = x
  123. self.y = y
  124. self.tick = tick
  125. self.label = label
  126. self.score = score
  127. session.add(self)
  128. def render(self, window, state):
  129. """Render the Glyph at this position."""
  130. col = 0
  131. row = 0
  132. glyph = self.glyph
  133. data = glyph.glyph_for_state(self, state)
  134. for color, char in [
  135. (data[i], data[i + 1]) for i in xrange(0, len(data), 2)
  136. ]:
  137. x = self.x + col
  138. y = self.y + row
  139. if 0 <= x <= MAX_X and 0 <= y <= MAX_Y:
  140. window.addstr(
  141. y + VERT_PADDING,
  142. x + HORIZ_PADDING,
  143. char,
  144. _COLOR_PAIRS[color],
  145. )
  146. col += 1
  147. if col == glyph.width:
  148. col = 0
  149. row += 1
  150. if self.label:
  151. self._render_label(window, False)
  152. def _render_label(self, window, blank):
  153. label = self.label if not blank else " " * len(self.label)
  154. if self.x + self.width + len(self.label) < MAX_X:
  155. window.addstr(self.y, self.x + self.width, label)
  156. else:
  157. window.addstr(self.y, self.x - len(self.label), label)
  158. def blank(self, window):
  159. """Render a blank box for this glyph's position and size."""
  160. glyph = self.glyph
  161. x = min(max(self.x, 0), MAX_X)
  162. width = min(glyph.width, MAX_X - x) or 1
  163. for y_a in xrange(self.y, self.y + glyph.height):
  164. y = y_a
  165. window.addstr(y + VERT_PADDING, x + HORIZ_PADDING, " " * width)
  166. if self.label:
  167. self._render_label(window, True)
  168. @hybrid_property
  169. def width(self):
  170. return self.glyph.width
  171. @width.expression
  172. def width(cls):
  173. return Glyph.width
  174. @hybrid_property
  175. def height(self):
  176. return self.glyph.height
  177. @height.expression
  178. def height(cls):
  179. return Glyph.height
  180. @hybrid_property
  181. def bottom_bound(self):
  182. return self.y + self.height >= MAX_Y
  183. @hybrid_property
  184. def top_bound(self):
  185. return self.y <= 0
  186. @hybrid_property
  187. def left_bound(self):
  188. return self.x <= 0
  189. @hybrid_property
  190. def right_bound(self):
  191. return self.x + self.width >= MAX_X
  192. @hybrid_property
  193. def right_edge_bound(self):
  194. return self.x > MAX_X
  195. @hybrid_method
  196. def intersects(self, other):
  197. """Return True if this GlyphCoordinate intersects with
  198. the given GlyphCoordinate."""
  199. return ~(
  200. (self.x + self.width < other.x) | (self.x > other.x + other.width)
  201. ) & ~(
  202. (self.y + self.height < other.y)
  203. | (self.y > other.y + other.height)
  204. )
  205. class EnemyGlyph(Glyph):
  206. """Describe an enemy."""
  207. __mapper_args__ = {"polymorphic_identity": "enemy"}
  208. class ArmyGlyph(EnemyGlyph):
  209. """Describe an enemy that's part of the "army". """
  210. __mapper_args__ = {"polymorphic_identity": "army"}
  211. def glyph_for_state(self, coord, state):
  212. if state["flip"]:
  213. return self.alt_data
  214. else:
  215. return self.data
  216. class SaucerGlyph(EnemyGlyph):
  217. """Describe the enemy saucer flying overhead."""
  218. __mapper_args__ = {"polymorphic_identity": "saucer"}
  219. def glyph_for_state(self, coord, state):
  220. if state["flip"] == 0:
  221. return self.alt_data
  222. else:
  223. return self.data
  224. class MessageGlyph(Glyph):
  225. """Describe a glyph for displaying a message."""
  226. __mapper_args__ = {"polymorphic_identity": "message"}
  227. class PlayerGlyph(Glyph):
  228. """Describe a glyph representing the player."""
  229. __mapper_args__ = {"polymorphic_identity": "player"}
  230. class MissileGlyph(Glyph):
  231. """Describe a glyph representing a missile."""
  232. __mapper_args__ = {"polymorphic_identity": "missile"}
  233. class SplatGlyph(Glyph):
  234. """Describe a glyph representing a "splat"."""
  235. __mapper_args__ = {"polymorphic_identity": "splat"}
  236. def glyph_for_state(self, coord, state):
  237. age = state["tick"] - coord.tick
  238. if age > 5:
  239. return self.alt_data
  240. else:
  241. return self.data
  242. def init_glyph(session):
  243. """Create the glyphs used during play."""
  244. enemy1 = ArmyGlyph(
  245. "enemy1",
  246. """
  247. #W-#B^#R-#B^#W-
  248. #G| |
  249. """,
  250. """
  251. #W>#B^#R-#B^#W<
  252. #G^ ^
  253. """,
  254. )
  255. enemy2 = ArmyGlyph(
  256. "enemy2",
  257. """
  258. #W***
  259. #R<#C~~~#R>
  260. """,
  261. """
  262. #W@@@
  263. #R<#C---#R>
  264. """,
  265. )
  266. enemy3 = ArmyGlyph(
  267. "enemy3",
  268. """
  269. #Y((--))
  270. #M-~-~-~
  271. """,
  272. """
  273. #Y[[--]]
  274. #M~-~-~-
  275. """,
  276. )
  277. saucer = SaucerGlyph(
  278. "saucer",
  279. """#R~#Y^#R~#G<<((=#WOO#G=))>>""",
  280. """#Y^#R~#Y^#G<<((=#WOO#G=))>>""",
  281. )
  282. splat1 = SplatGlyph(
  283. "splat1",
  284. """
  285. #WVVVVV
  286. #W> #R*** #W<
  287. #W^^^^^
  288. """,
  289. """
  290. #M|
  291. #M- #Y+++ #M-
  292. #M|
  293. """,
  294. )
  295. ship = PlayerGlyph(
  296. "ship",
  297. """
  298. #Y^
  299. #G=====
  300. """,
  301. )
  302. missile = MissileGlyph(
  303. "missile",
  304. """
  305. |
  306. """,
  307. )
  308. start = MessageGlyph(
  309. "start_message",
  310. "J = move left; L = move right; SPACE = fire\n"
  311. " #GPress any key to start",
  312. )
  313. lose = MessageGlyph("lose_message", "#YY O U L O S E ! ! !")
  314. win = MessageGlyph("win_message", "#RL E V E L C L E A R E D ! ! !")
  315. paused = MessageGlyph(
  316. "pause_message", "#WP A U S E D\n#GPress P to continue"
  317. )
  318. session.add_all(
  319. [
  320. enemy1,
  321. enemy2,
  322. enemy3,
  323. ship,
  324. saucer,
  325. missile,
  326. start,
  327. lose,
  328. win,
  329. paused,
  330. splat1,
  331. ]
  332. )
  333. def setup_curses():
  334. """Setup terminal/curses state."""
  335. window = curses.initscr()
  336. curses.noecho()
  337. window = curses.newwin(
  338. WINDOW_HEIGHT + (VERT_PADDING * 2),
  339. WINDOW_WIDTH + (HORIZ_PADDING * 2),
  340. WINDOW_TOP - VERT_PADDING,
  341. WINDOW_LEFT - HORIZ_PADDING,
  342. )
  343. curses.start_color()
  344. global _COLOR_PAIRS
  345. _COLOR_PAIRS = {}
  346. for i, (k, v) in enumerate(COLOR_MAP.items(), 1):
  347. curses.init_pair(i, v, curses.COLOR_BLACK)
  348. _COLOR_PAIRS[k] = curses.color_pair(i)
  349. return window
  350. def init_positions(session):
  351. """Establish a new field of play.
  352. This generates GlyphCoordinate objects
  353. and persists them to the database.
  354. """
  355. # delete all existing coordinates
  356. session.query(GlyphCoordinate).delete()
  357. session.add(
  358. GlyphCoordinate(
  359. session, "ship", WINDOW_WIDTH // 2 - 2, WINDOW_HEIGHT - 4
  360. )
  361. )
  362. arrangement = (
  363. ("enemy3", 50),
  364. ("enemy2", 25),
  365. ("enemy1", 10),
  366. ("enemy2", 25),
  367. ("enemy1", 10),
  368. )
  369. for (ship_vert, (etype, score)) in zip(
  370. xrange(5, 30, ENEMY_VERT_SPACING), arrangement
  371. ):
  372. for ship_horiz in xrange(0, 50, 10):
  373. session.add(
  374. GlyphCoordinate(
  375. session, etype, ship_horiz, ship_vert, score=score
  376. )
  377. )
  378. def draw(session, window, state):
  379. """Load all current GlyphCoordinate objects from the
  380. database and render.
  381. """
  382. for gcoord in session.query(GlyphCoordinate).options(joinedload("glyph")):
  383. gcoord.render(window, state)
  384. window.addstr(1, WINDOW_WIDTH - 5, "Score: %.4d" % state["score"])
  385. window.move(0, 0)
  386. window.refresh()
  387. def check_win(session, state):
  388. """Return the number of army glyphs remaining -
  389. the player wins if this is zero."""
  390. return (
  391. session.query(func.count(GlyphCoordinate.id))
  392. .join(GlyphCoordinate.glyph.of_type(ArmyGlyph))
  393. .scalar()
  394. )
  395. def check_lose(session, state):
  396. """Return the number of army glyphs either colliding
  397. with the player or hitting the bottom of the screen.
  398. The player loses if this is non-zero."""
  399. player = state["player"]
  400. return (
  401. session.query(GlyphCoordinate)
  402. .join(GlyphCoordinate.glyph.of_type(ArmyGlyph))
  403. .filter(
  404. GlyphCoordinate.intersects(player) | GlyphCoordinate.bottom_bound
  405. )
  406. .count()
  407. )
  408. def render_message(session, window, msg, x, y):
  409. """Render a message glyph.
  410. Clears the area beneath the message first
  411. and assumes the display will be paused
  412. afterwards.
  413. """
  414. # create message box
  415. msg = GlyphCoordinate(session, msg, x, y)
  416. # clear existing glyphs which intersect
  417. for gly in (
  418. session.query(GlyphCoordinate)
  419. .join(GlyphCoordinate.glyph)
  420. .filter(GlyphCoordinate.intersects(msg))
  421. ):
  422. gly.blank(window)
  423. # render
  424. msg.render(window, {})
  425. window.refresh()
  426. return msg
  427. def win(session, window, state):
  428. """Handle the win case."""
  429. render_message(session, window, "win_message", 15, 15)
  430. time.sleep(2)
  431. start(session, window, state, True)
  432. def lose(session, window, state):
  433. """Handle the lose case."""
  434. render_message(session, window, "lose_message", 15, 15)
  435. time.sleep(2)
  436. start(session, window, state)
  437. def pause(session, window, state):
  438. """Pause the game."""
  439. msg = render_message(session, window, "pause_message", 15, 15)
  440. prompt(window)
  441. msg.blank(window)
  442. session.delete(msg)
  443. def prompt(window):
  444. """Display a prompt, quashing any keystrokes
  445. which might have remained."""
  446. window.move(0, 0)
  447. window.nodelay(1)
  448. window.getch()
  449. window.nodelay(0)
  450. window.getch()
  451. window.nodelay(1)
  452. def move_army(session, window, state):
  453. """Update the army position based on the current
  454. size of the field."""
  455. speed = 30 // 25 * state["num_enemies"]
  456. flip = (state["tick"] % speed) == 0
  457. if not flip:
  458. return
  459. else:
  460. state["flip"] = not state["flip"]
  461. x_slide = 1
  462. # get the lower/upper boundaries of the army
  463. # along the X axis.
  464. min_x, max_x = (
  465. session.query(
  466. func.min(GlyphCoordinate.x),
  467. func.max(GlyphCoordinate.x + GlyphCoordinate.width),
  468. )
  469. .join(GlyphCoordinate.glyph.of_type(ArmyGlyph))
  470. .first()
  471. )
  472. if min_x is None or max_x is None:
  473. # no enemies
  474. return
  475. direction = state["army_direction"]
  476. move_y = False
  477. if direction == 0 and max_x + x_slide >= MAX_X:
  478. direction = state["army_direction"] = 1
  479. move_y = True
  480. elif direction == 1 and min_x - x_slide <= 0:
  481. direction = state["army_direction"] = 0
  482. move_y = True
  483. for enemy_g in session.query(GlyphCoordinate).join(
  484. GlyphCoordinate.glyph.of_type(ArmyGlyph)
  485. ):
  486. enemy_g.blank(window)
  487. if move_y:
  488. enemy_g.y += 1
  489. elif direction == 0:
  490. enemy_g.x += x_slide
  491. elif direction == 1:
  492. enemy_g.x -= x_slide
  493. def move_player(session, window, state):
  494. """Receive player input and adjust state."""
  495. ch = window.getch()
  496. if ch not in (LEFT_KEY, RIGHT_KEY, FIRE_KEY, PAUSE_KEY):
  497. return
  498. elif ch == PAUSE_KEY:
  499. pause(session, window, state)
  500. return
  501. player = state["player"]
  502. if ch == RIGHT_KEY and not player.right_bound:
  503. player.blank(window)
  504. player.x += 1
  505. elif ch == LEFT_KEY and not player.left_bound:
  506. player.blank(window)
  507. player.x -= 1
  508. elif ch == FIRE_KEY and state["missile"] is None:
  509. state["missile"] = GlyphCoordinate(
  510. session, "missile", player.x + 3, player.y - 1
  511. )
  512. def move_missile(session, window, state):
  513. """Update the status of the current missile, if any."""
  514. if state["missile"] is None or state["tick"] % 2 != 0:
  515. return
  516. missile = state["missile"]
  517. # locate enemy glyphs which intersect with the
  518. # missile's current position; i.e. a hit
  519. glyph = (
  520. session.query(GlyphCoordinate)
  521. .join(GlyphCoordinate.glyph.of_type(EnemyGlyph))
  522. .filter(GlyphCoordinate.intersects(missile))
  523. .first()
  524. )
  525. missile.blank(window)
  526. if glyph or missile.top_bound:
  527. # missile is done
  528. session.delete(missile)
  529. state["missile"] = None
  530. if glyph:
  531. # score!
  532. score(session, window, state, glyph)
  533. else:
  534. # move missile up one character.
  535. missile.y -= 1
  536. def move_saucer(session, window, state):
  537. """Update the status of the saucer."""
  538. saucer_interval = 500
  539. saucer_speed_interval = 4
  540. if state["saucer"] is None and state["tick"] % saucer_interval != 0:
  541. return
  542. if state["saucer"] is None:
  543. state["saucer"] = saucer = GlyphCoordinate(
  544. session, "saucer", -6, 1, score=random.randrange(100, 600, 100)
  545. )
  546. elif state["tick"] % saucer_speed_interval == 0:
  547. saucer = state["saucer"]
  548. saucer.blank(window)
  549. saucer.x += 1
  550. if saucer.right_edge_bound:
  551. session.delete(saucer)
  552. state["saucer"] = None
  553. def update_splat(session, window, state):
  554. """Render splat animations."""
  555. for splat in session.query(GlyphCoordinate).join(
  556. GlyphCoordinate.glyph.of_type(SplatGlyph)
  557. ):
  558. age = state["tick"] - splat.tick
  559. if age > 10:
  560. splat.blank(window)
  561. session.delete(splat)
  562. else:
  563. splat.render(window, state)
  564. def score(session, window, state, glyph):
  565. """Process a glyph intersecting with a missile."""
  566. glyph.blank(window)
  567. session.delete(glyph)
  568. if state["saucer"] is glyph:
  569. state["saucer"] = None
  570. state["score"] += glyph.score
  571. # render a splat !
  572. GlyphCoordinate(
  573. session,
  574. "splat1",
  575. glyph.x,
  576. glyph.y,
  577. tick=state["tick"],
  578. label=str(glyph.score),
  579. )
  580. def update_state(session, window, state):
  581. """Update all state for each game tick."""
  582. num_enemies = state["num_enemies"] = check_win(session, state)
  583. if num_enemies == 0:
  584. win(session, window, state)
  585. elif check_lose(session, state):
  586. lose(session, window, state)
  587. else:
  588. # update the tick counter.
  589. state["tick"] += 1
  590. move_player(session, window, state)
  591. move_missile(session, window, state)
  592. move_army(session, window, state)
  593. move_saucer(session, window, state)
  594. update_splat(session, window, state)
  595. def start(session, window, state, continue_=False):
  596. """Start a new field of play."""
  597. render_message(session, window, "start_message", 15, 20)
  598. prompt(window)
  599. init_positions(session)
  600. player = (
  601. session.query(GlyphCoordinate)
  602. .join(GlyphCoordinate.glyph.of_type(PlayerGlyph))
  603. .one()
  604. )
  605. state.update(
  606. {
  607. "field_pos": 0,
  608. "alt": False,
  609. "tick": 0,
  610. "missile": None,
  611. "saucer": None,
  612. "player": player,
  613. "army_direction": 0,
  614. "flip": False,
  615. }
  616. )
  617. if not continue_:
  618. state["score"] = 0
  619. window.clear()
  620. window.box()
  621. draw(session, window, state)
  622. def main():
  623. """Initialize the database and establish the game loop."""
  624. e = create_engine("sqlite://")
  625. Base.metadata.create_all(e)
  626. session = Session(e)
  627. init_glyph(session)
  628. session.commit()
  629. window = setup_curses()
  630. state = {}
  631. start(session, window, state)
  632. while True:
  633. update_state(session, window, state)
  634. draw(session, window, state)
  635. time.sleep(0.01)
  636. if __name__ == "__main__":
  637. main()