strategy.py 12 KB


  1. import io
  2. import time
  3. from contextlib import redirect_stdout
  4. from itertools import combinations, islice
  5. from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
  6. detect_held, print_board, is_basic, is_bomb, \
  7. bomb_to_basic
  8. GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
  9. MOVE_DELAYS = (
  10. # in milliseconds
  11. 50, # GRAB
  12. 50, # DROP
  13. 50, # SWAP
  14. 30, # LEFT
  15. 30, # RIGHT
  16. 30, # SPEED
  17. )
  18. GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
  19. PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
  20. MIN_BASIC_GROUP_SIZE = 4
  21. MIN_BOMB_GROUP_SIZE = 2
  22. POINTS_DEPTH = 3
  23. FRAG_DEPTH = 5
  24. DEFRAG_PRIO = 4
  25. COLSIZE_PRIO = 5
  26. COLSIZE_PANIC = 8
  27. COLSIZE_MAX = 9
  28. BOMB_POINTS = 1
  29. MIN_ROWS = 2
  30. class State:
  31. def __init__(self, blocks, exa, held):
  32. self.blocks = blocks
  33. self.exa = exa
  34. self.held = held
  35. def grabbing_or_dropping(self):
  36. skip = self.colskip(self.exa)
  37. i = (skip + 1) * COLUMNS + self.exa
  38. return i < len(self.blocks) and self.blocks[i] == NOBLOCK
  39. def iter_columns(self):
  40. nrows = self.nrows()
  41. def gen_col(col):
  42. for row in range(nrows):
  43. i = row * COLUMNS + col
  44. if self.blocks[i] != NOBLOCK:
  45. yield i
  46. for col in range(COLUMNS):
  47. yield gen_col(col)
  48. @classmethod
  49. def detect(cls, board, pad=2):
  50. blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
  51. exa = detect_exa(board)
  52. held = detect_held(board, exa)
  53. return cls(blocks, exa, held)
  54. def copy(self):
  55. return State(list(self.blocks), self.exa, self.held)
  56. def causes_panic(self):
  57. return self.max_colsize() >= COLSIZE_PANIC
  58. def max_colsize(self):
  59. return self.nrows() - self.empty_rows()
  60. def empty_rows(self):
  61. for i, block in enumerate(self.blocks):
  62. if block != NOBLOCK:
  63. return i // COLUMNS
  64. return 0
  65. def holes(self):
  66. start_row = self.empty_rows()
  67. total_rows = self.nrows()
  68. score = 0
  69. for col in range(COLUMNS):
  70. for row in range(start_row, total_rows):
  71. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  72. break
  73. score += row - start_row + 1
  74. return score
  75. def score(self, points, moves, prev):
  76. prev_colsize = prev.nrows() - 2
  77. delay = moves_delay(moves)
  78. # Don't care about defragging for few rows, just score points quickly.
  79. # This saves computation time which in turn makes for nice combos when
  80. # the bot speeds around throwing blocks on top of each other.
  81. if prev_colsize < DEFRAG_PRIO:
  82. return -points, delay
  83. holes = self.holes()
  84. frag = self.fragmentation()
  85. # When rows start stacking up, start defragmenting colors to make
  86. # opportunities for scoring points.
  87. if prev_colsize < COLSIZE_PRIO:
  88. return -points, frag, holes, delay
  89. # When they stack higher, start moving blocks down into holes before
  90. # continuing to defragment.
  91. if prev_colsize < COLSIZE_PANIC:
  92. panic = int(self.causes_panic())
  93. return panic, -points, holes, frag, delay
  94. # Column heights are getting out of hand, just move shit DOWN.
  95. return holes, delay, -points, frag
  96. def solutions(self):
  97. for moves in self.gen_moves():
  98. try:
  99. yield Solution(self, moves)
  100. except AssertionError:
  101. pass
  102. def colskip(self, col):
  103. nrows = self.nrows()
  104. for row in range(nrows):
  105. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  106. return row
  107. return nrows
  108. def find_unmovable_blocks(self):
  109. unmoveable = set()
  110. bombed = set()
  111. for block, group in self.find_groups():
  112. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  113. for i in group:
  114. unmoveable.add(i)
  115. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  116. bombed.add(bomb_to_basic(block))
  117. for i in group:
  118. unmoveable.add(i)
  119. for i, block in enumerate(self.blocks):
  120. if block in bombed:
  121. unmoveable.add(i)
  122. return unmoveable
  123. def simulate(self, moves):
  124. s = self.copy()
  125. #points = 0
  126. # avoid swapping/grabbing currently exploding items
  127. #unmoveable = s.find_unmovable_blocks()
  128. for move in moves:
  129. if move == LEFT:
  130. assert s.exa > 0
  131. s.exa -= 1
  132. elif move == RIGHT:
  133. assert s.exa < COLUMNS - 1
  134. s.exa += 1
  135. elif move == GRAB:
  136. assert s.held == NOBLOCK
  137. row = s.colskip(s.exa)
  138. assert row < s.nrows()
  139. i = row * COLUMNS + s.exa
  140. #assert i not in unmoveable
  141. s.held = s.blocks[i]
  142. s.blocks[i] = NOBLOCK
  143. elif move == DROP:
  144. assert s.held != NOBLOCK
  145. row = s.colskip(s.exa)
  146. assert row > 0
  147. i = row * COLUMNS + s.exa
  148. s.blocks[i - COLUMNS] = s.held
  149. s.held = NOBLOCK
  150. #points += s.remove_blocks()
  151. elif move == SWAP:
  152. row = s.colskip(s.exa)
  153. assert row < s.nrows() - 2
  154. i = row * COLUMNS + s.exa
  155. j = i + COLUMNS
  156. #assert i not in unmoveable
  157. #assert j not in unmoveable
  158. s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i]
  159. #points += s.remove_blocks()
  160. if moves and self.max_colsize() < COLSIZE_MAX:
  161. assert s.max_colsize() <= COLSIZE_MAX
  162. points = s.remove_blocks()
  163. return points, s
  164. def find_groups(self, depth=POINTS_DEPTH, minsize=2):
  165. def follow_group(i, block, group):
  166. if self.blocks[i] == block and i not in visited:
  167. group.append(i)
  168. visited.add(i)
  169. for nb in self.neighbors(i):
  170. follow_group(nb, block, group)
  171. visited = set()
  172. for col in self.iter_columns():
  173. for i in islice(col, depth):
  174. block = self.blocks[i]
  175. group = []
  176. follow_group(i, block, group)
  177. if len(group) >= minsize:
  178. yield block, group
  179. def neighbors(self, i):
  180. row, col = divmod(i, COLUMNS)
  181. if col > 0 and self.blocks[i - 1] != NOBLOCK:
  182. yield i - 1
  183. if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK:
  184. yield i + 1
  185. if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK:
  186. yield i - COLUMNS
  187. if row < self.nrows() - 1 and self.blocks[i + COLUMNS] != NOBLOCK:
  188. yield i + COLUMNS
  189. def fragmentation(self, depth=FRAG_DEPTH):
  190. """
  191. Minimize the sum of dist(i,j) between all blocks i,j of the same color.
  192. Magnify vertical distances to avoid column stacking.
  193. """
  194. def dist(i, j):
  195. yi, xi = divmod(i, COLUMNS)
  196. yj, xj = divmod(j, COLUMNS)
  197. # for blocks in the same group, only count vertical distance so
  198. # that groups are spread out horizontally
  199. if groups[i] == groups[j]:
  200. return abs(yj - yi)
  201. return abs(xj - xi) + abs(yj - yi) * 2 - 1
  202. colors = {}
  203. groups = {}
  204. groupsizes = {}
  205. for groupid, (block, group) in enumerate(self.find_groups(depth, 1)):
  206. colors.setdefault(block, []).extend(group)
  207. for i in group:
  208. groups[i] = groupid
  209. groupsizes[i] = len(group)
  210. return sum(dist(i, j) # * (1 + 2 * is_bomb(block))
  211. for block, color in colors.items()
  212. for i, j in combinations(color, 2))
  213. def remove_blocks(self):
  214. removed = 0
  215. for block, group in self.find_groups():
  216. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  217. removed += len(group)
  218. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  219. removed += BOMB_POINTS
  220. return removed
  221. #def remove_blocks(self):
  222. # remove = []
  223. # for block, group in self.find_groups():
  224. # if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  225. # remove.extend(group)
  226. # elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  227. # remove.extend(group)
  228. # remove.extend(i for i, other in enumerate(self.blocks)
  229. # if other == bomb_to_basic(block))
  230. # remove.sort()
  231. # removed = 0
  232. # prev = None
  233. # for i in remove:
  234. # if i != prev:
  235. # while self.blocks[i] != NOBLOCK:
  236. # self.blocks[i] = self.blocks[i - COLUMNS]
  237. # i -= COLUMNS
  238. # removed += 1
  239. # prev = i
  240. # if removed:
  241. # removed += self.remove_blocks()
  242. # return removed
  243. def has_explosion(self):
  244. return any(is_bomb(block) and
  245. any(self.blocks[j] == block for j in self.neighbors(i))
  246. for i, block in enumerate(self.blocks))
  247. def gen_moves(self):
  248. yield ()
  249. def make_move(diff):
  250. direction = RIGHT if diff > 0 else LEFT
  251. return abs(diff) * (direction,)
  252. ignore_exa_column = self.grabbing_or_dropping()
  253. for src in range(COLUMNS):
  254. mov1 = make_move(src - self.exa)
  255. if mov1 or not ignore_exa_column:
  256. yield mov1 + (SWAP,)
  257. yield mov1 + (GRAB, SWAP, DROP)
  258. yield mov1 + (SWAP, GRAB, SWAP, DROP)
  259. for dst in range(COLUMNS):
  260. if dst != src:
  261. mov2 = make_move(dst - src)
  262. for get in GET:
  263. for put in PUT:
  264. yield mov1 + get + mov2 + put
  265. def solve(self):
  266. assert self.exa is not None
  267. if self.held != NOBLOCK:
  268. return Solution(self, (DROP,))
  269. if self.nrows() < MIN_ROWS:
  270. return Solution(self, ())
  271. if self.has_explosion():
  272. return Solution(self, ())
  273. return min(self.solutions())
  274. def print(self):
  275. print_board(self.blocks, self.exa, self.held)
  276. def tostring(self):
  277. stream = io.StringIO()
  278. with redirect_stdout(stream):
  279. self.print()
  280. return stream.getvalue()
  281. def nrows(self):
  282. return len(self.blocks) // COLUMNS
  283. def has_same_exa(self, state):
  284. return self.exa == state.exa and self.held == state.held
  285. class Solution:
  286. def __init__(self, state, moves):
  287. self.state = state
  288. self.moves = moves
  289. points, self.newstate = state.simulate(moves)
  290. self.score = self.newstate.score(points, moves, state)
  291. def __lt__(self, other):
  292. return self.score < other.score
  293. def loops(self, prev_prev):
  294. return self.moves and \
  295. self.state.exa == prev_prev.state.exa and \
  296. self.moves == prev_prev.moves and \
  297. self.score == prev_prev.score
  298. def delay(self):
  299. return moves_delay(self.moves)
  300. def keys(self):
  301. return moves_to_keys(self.moves)
  302. def move_to_key(move):
  303. return 'jjkadl'[move]
  304. def moves_to_keys(moves):
  305. return ''.join(move_to_key(move) for move in moves)
  306. def moves_delay(moves):
  307. return sum(MOVE_DELAYS[m] for m in moves)
  308. if __name__ == '__main__':
  309. import sys
  310. from PIL import Image
  311. board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
  312. state = State.detect(board)
  313. print('parsed:')
  314. state.print()
  315. print()
  316. start = time.time()
  317. solution = state.solve()
  318. end = time.time()
  319. print('best moves:', solution.keys())
  320. print('elapsed:', round((end - start) * 1000, 1), 'ms')
  321. print()
  322. print('target after moves:')
  323. solution.newstate.print()
  324. print()
  325. for solution in sorted(state.solutions()):
  326. print('move %18s:' % solution.keys(), solution.score)