strategy.py 11 KB


  1. import io
  2. import time
  3. from contextlib import redirect_stdout
  4. from itertools import combinations, islice
  5. from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
  6. detect_held, print_board, is_basic, is_bomb, \
  7. bomb_to_basic
  8. GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
  9. GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
  10. PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
  11. MIN_BASIC_GROUP_SIZE = 4
  12. MIN_BOMB_GROUP_SIZE = 2
  13. FIND_GROUPS_DEPTH = 3
  14. FRAG_DEPTH = 3
  15. DEFRAG_PRIO = 3
  16. COLSIZE_PRIO = 5
  17. COLSIZE_PANIC = 7
  18. COLSIZE_MAX = 8
  19. BOMB_POINTS = 1
  20. MIN_ROWS = 2
  21. MAX_SPEED_ROWS = 3
  22. class State:
  23. def __init__(self, blocks, exa, held):
  24. self.blocks = blocks
  25. self.exa = exa
  26. self.held = held
  27. def grabbing_of_dropping(self):
  28. skip = self.colskip(self.exa)
  29. i = (skip + 1) * COLUMNS + self.exa
  30. return i < len(self.blocks) and self.blocks[i] == NOBLOCK
  31. def iter_columns(self):
  32. nrows = self.nrows()
  33. def gen_col(col):
  34. for row in range(nrows):
  35. i = row * COLUMNS + col
  36. if self.blocks[i] != NOBLOCK:
  37. yield i
  38. for col in range(COLUMNS):
  39. yield gen_col(col)
  40. @classmethod
  41. def detect(cls, board, pad=2):
  42. blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
  43. exa = detect_exa(board)
  44. held = detect_held(board, exa)
  45. return cls(blocks, exa, held)
  46. def copy(self):
  47. return State(list(self.blocks), self.exa, self.held)
  48. def colsizes(self):
  49. for col in range(COLUMNS):
  50. yield self.nrows() - self.colskip(col)
  51. def colsize_panic(self):
  52. return int(max(self.colsizes()) >= COLSIZE_PANIC)
  53. def empty_column_score(self):
  54. skip = 0
  55. for i, block in enumerate(self.blocks):
  56. if block != NOBLOCK:
  57. skip = i // COLUMNS
  58. break
  59. nrows = self.nrows()
  60. score = 0
  61. for col in range(COLUMNS):
  62. for row in range(skip, nrows):
  63. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  64. break
  65. score += row - skip + 1
  66. return score
  67. def score(self, points, moves, prev):
  68. prev_colsize = max(prev.colsizes())
  69. points = self.score_points()
  70. if prev_colsize >= COLSIZE_PANIC:
  71. colsize = self.empty_column_score()
  72. #frag = self.fragmentation()
  73. return colsize, len(moves), -points #, frag
  74. elif prev_colsize >= DEFRAG_PRIO:
  75. colsize = self.empty_column_score()
  76. frag = self.fragmentation()
  77. panic = self.colsize_panic()
  78. return -points, panic, frag, colsize, len(moves)
  79. elif prev_colsize >= COLSIZE_PRIO:
  80. colsize = self.empty_column_score()
  81. return -points, colsize, len(moves)
  82. else:
  83. return -points, len(moves)
  84. def score_moves(self):
  85. for moves in self.gen_moves():
  86. try:
  87. points, newstate = self.simulate(moves)
  88. yield newstate.score(points, moves, self), moves
  89. except AssertionError:
  90. pass
  91. def colskip(self, col):
  92. nrows = self.nrows()
  93. for row in range(nrows):
  94. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  95. return row
  96. return nrows
  97. def find_unmovable_blocks(self):
  98. unmoveable = set()
  99. bombed = set()
  100. for block, group in self.find_groups():
  101. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  102. for i in group:
  103. unmoveable.add(i)
  104. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  105. bombed.add(bomb_to_basic(block))
  106. for i in group:
  107. unmoveable.add(i)
  108. for i, block in enumerate(self.blocks):
  109. if block in bombed:
  110. unmoveable.add(i)
  111. return unmoveable
  112. def simulate(self, moves):
  113. s = self.copy()
  114. points = 0
  115. # avoid swapping/grabbing currently exploding items
  116. #unmoveable = s.find_unmovable_blocks()
  117. for move in moves:
  118. if move == LEFT:
  119. assert s.exa > 0
  120. s.exa -= 1
  121. elif move == RIGHT:
  122. assert s.exa < COLUMNS - 1
  123. s.exa += 1
  124. elif move == GRAB:
  125. assert s.held == NOBLOCK
  126. row = s.colskip(s.exa)
  127. assert row < s.nrows()
  128. i = row * COLUMNS + s.exa
  129. #assert i not in unmoveable
  130. s.held = s.blocks[i]
  131. s.blocks[i] = NOBLOCK
  132. elif move == DROP:
  133. assert s.held != NOBLOCK
  134. row = s.colskip(s.exa)
  135. assert row > 0
  136. i = row * COLUMNS + s.exa
  137. s.blocks[i - COLUMNS] = s.held
  138. s.held = NOBLOCK
  139. #points += s.score_points()
  140. elif move == SWAP:
  141. row = s.colskip(s.exa)
  142. assert row < s.nrows() - 2
  143. i = row * COLUMNS + s.exa
  144. j = i + COLUMNS
  145. #assert i not in unmoveable
  146. #assert j not in unmoveable
  147. s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i]
  148. #points += s.score_points()
  149. if moves and max(self.colsizes()) < COLSIZE_MAX:
  150. assert max(s.colsizes()) <= COLSIZE_MAX
  151. return points, s
  152. def find_groups(self, depth=FIND_GROUPS_DEPTH, minsize=2):
  153. def follow_group(i, block, group):
  154. if self.blocks[i] == block and i not in visited:
  155. group.append(i)
  156. visited.add(i)
  157. for nb in self.neighbors(i):
  158. follow_group(nb, block, group)
  159. visited = set()
  160. for col in self.iter_columns():
  161. for i in islice(col, depth):
  162. block = self.blocks[i]
  163. group = []
  164. follow_group(i, block, group)
  165. if len(group) >= minsize:
  166. yield block, group
  167. def neighbors(self, i):
  168. def gen_indices():
  169. row, col = divmod(i, COLUMNS)
  170. if col > 0:
  171. yield i - 1
  172. if col < COLUMNS - 1:
  173. yield i + 1
  174. if row > 0:
  175. yield i - COLUMNS
  176. if row < self.nrows() - 1:
  177. yield i + COLUMNS
  178. for j in gen_indices():
  179. if self.blocks[j] != NOBLOCK:
  180. yield j
  181. def fragmentation(self, depth=FRAG_DEPTH):
  182. """
  183. Minimize the sum of dist(i,j) between all blocks i,j of the same color.
  184. Magnify vertical distances to avoid column stacking.
  185. """
  186. def dist(i, j):
  187. yi, xi = divmod(i, COLUMNS)
  188. yj, xj = divmod(j, COLUMNS)
  189. # for blocks in the same group, only count vertical distance so that
  190. # groups are spread out horizontally
  191. if groups[i] == groups[j]:
  192. return abs(yj - yi)
  193. return abs(xj - xi) + abs(yj - yi) * 2 - 1
  194. colors = {}
  195. groups = {}
  196. groupsizes = {}
  197. for groupid, (block, group) in enumerate(self.find_groups(depth, 1)):
  198. colors.setdefault(block, []).extend(group)
  199. for i in group:
  200. groups[i] = groupid
  201. groupsizes[i] = len(group)
  202. return sum(dist(i, j) # * (1 + 2 * is_bomb(block))
  203. for block, color in colors.items()
  204. for i, j in combinations(color, 2))
  205. def score_points(self, multiplier=1):
  206. #remove = []
  207. points = 0
  208. for block, group in self.find_groups():
  209. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  210. #remove.extend(group)
  211. points += len(group) * multiplier
  212. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  213. points += BOMB_POINTS
  214. #remove.extend(group)
  215. #for i, other in enumerate(self.blocks):
  216. # if other == bomb_to_basic(block):
  217. # remove.append(i)
  218. #remove.sort()
  219. #prev = None
  220. #for i in remove:
  221. # if i != prev:
  222. # while self.blocks[i] != NOBLOCK:
  223. # self.blocks[i] = self.blocks[i - COLUMNS]
  224. # i -= COLUMNS
  225. # prev = i
  226. #if points:
  227. # points += self.score_points(min(2, multiplier * 2))
  228. return points
  229. def has_explosion(self):
  230. return any(is_bomb(block) and
  231. any(self.blocks[j] == block for j in self.neighbors(i))
  232. for i, block in enumerate(self.blocks))
  233. def gen_moves(self):
  234. yield ()
  235. def make_move(diff):
  236. direction = RIGHT if diff > 0 else LEFT
  237. return abs(diff) * (direction,)
  238. for src in range(COLUMNS):
  239. mov1 = make_move(src - self.exa)
  240. yield mov1 + (SWAP,)
  241. yield mov1 + (GRAB, SWAP, DROP)
  242. yield mov1 + (SWAP, GRAB, SWAP, DROP)
  243. for dst in range(COLUMNS):
  244. if dst != src:
  245. mov2 = make_move(dst - src)
  246. for get in GET:
  247. for put in PUT:
  248. yield mov1 + get + mov2 + put
  249. def solve(self):
  250. assert self.exa is not None
  251. if self.held != NOBLOCK:
  252. return (DROP,)
  253. if self.nrows() < MIN_ROWS:
  254. return ()
  255. if self.grabbing_of_dropping():
  256. return ()
  257. if self.has_explosion():
  258. return ()
  259. score, moves = min(self.score_moves())
  260. if not moves and max(self.colsizes()) <= MAX_SPEED_ROWS:
  261. return (SPEED,)
  262. return moves
  263. def print(self):
  264. print_board(self.blocks, self.exa, self.held)
  265. def tostring(self):
  266. stream = io.StringIO()
  267. with redirect_stdout(stream):
  268. self.print()
  269. return stream.getvalue()
  270. def nrows(self):
  271. return len(self.blocks) // COLUMNS
  272. def moves_to_keys(moves):
  273. return ''.join('jjkadl'[move] for move in moves)
  274. if __name__ == '__main__':
  275. import sys
  276. from PIL import Image
  277. board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
  278. state = State.detect(board)
  279. print('parsed:')
  280. state.print()
  281. print()
  282. print('empty cols:', state.empty_column_score())
  283. print()
  284. start = time.time()
  285. moves = state.solve()
  286. end = time.time()
  287. print('moves:', moves_to_keys(moves))
  288. print('elapsed:', end - start)
  289. print()
  290. print('target after moves:')
  291. points, newstate = state.simulate(moves)
  292. newstate.print()
  293. print()
  294. for score, moves in sorted(state.score_moves()):
  295. print('move %18s:' % moves_to_keys(moves), score)