strategy.py 11 KB


  1. import io
  2. import time
  3. from contextlib import redirect_stdout
  4. from itertools import combinations, islice
  5. from parse import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
  6. detect_held, print_board, is_basic, is_bomb, bomb_to_basic
  7. GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
  8. GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
  9. PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
  10. MIN_BASIC_GROUP_SIZE = 4
  11. MIN_BOMB_GROUP_SIZE = 2
  12. FIND_GROUPS_DEPTH = 4
  13. FRAG_DEPTH = 4
  14. COLSIZE_PRIO = 5
  15. COLSIZE_PRIO_HIGH = 7
  16. COLSIZE_MAX = 8
  17. BOMB_POINTS = 2
  18. MIN_ROWS = 2
  19. class State:
  20. def __init__(self, blocks, exa, held):
  21. assert exa is not None
  22. self.blocks = blocks
  23. self.exa = exa
  24. self.held = held
  25. def grabbing_of_dropping(self):
  26. skip = self.colskip(self.exa)
  27. i = (skip + 1) * COLUMNS + self.exa
  28. return i < len(self.blocks) and self.blocks[i] == NOBLOCK
  29. #return any(len(col) > 1 and col[1] == NOBLOCK
  30. # for col in map(tuple, self.iter_columns()))
  31. def iter_columns(self):
  32. nrows = self.nrows()
  33. def gen_col(col):
  34. for row in range(nrows):
  35. i = row * COLUMNS + col
  36. if self.blocks[i] != NOBLOCK:
  37. yield i
  38. for col in range(COLUMNS):
  39. yield gen_col(col)
  40. @classmethod
  41. def detect(cls, board, pad=2):
  42. blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
  43. exa = detect_exa(board)
  44. held = detect_held(board, exa)
  45. return cls(blocks, exa, held)
  46. def copy(self):
  47. return State(list(self.blocks), self.exa, self.held)
  48. def colsizes(self):
  49. for col in range(COLUMNS):
  50. yield self.nrows() - self.colskip(col)
  51. #def highest_column(self):
  52. # for i, block in enumerate(self.blocks):
  53. # if block != NOBLOCK:
  54. # return self.nrows() - i // COLUMNS
  55. def empty_column_score(self):
  56. skip = 0
  57. for i, block in enumerate(self.blocks):
  58. if block != NOBLOCK:
  59. skip = i // COLUMNS
  60. break
  61. nrows = self.nrows()
  62. score = 0
  63. for col in range(COLUMNS):
  64. for row in range(skip, nrows):
  65. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  66. break
  67. score += row - skip + 1
  68. return score
  69. def score(self, points, moves, prev):
  70. #colsizes = list(self.colsizes())
  71. #mincol = min(colsizes)
  72. #maxcol = max(colsizes)
  73. #colsize_score = maxcol, colsizes.count(maxcol) #, -mincol
  74. #colsize_score = tuple(sorted(colsizes, reverse=True))
  75. #if prev.nrows() >= 6:
  76. # return colsize_score, -points, frag, len(moves)
  77. #colsize_score = maxcol, self.empty_column_score()
  78. frag = self.fragmentation()
  79. colsize_score = self.empty_column_score()
  80. #return -points, frag + colsize_score, len(moves)
  81. frag += colsize_score
  82. prev_colsize = max(prev.colsizes())
  83. if prev_colsize >= COLSIZE_PRIO_HIGH:
  84. return colsize_score, len(moves), -points, frag
  85. elif prev_colsize >= COLSIZE_PRIO:
  86. return -points, colsize_score, frag, len(moves)
  87. else:
  88. return -points, frag, colsize_score, len(moves)
  89. def score_moves(self):
  90. # clear exploding blocks before computing colsize
  91. #prev = self.copy()
  92. #prev.score_points()
  93. for moves in self.gen_moves():
  94. try:
  95. points, newstate = self.simulate(moves)
  96. yield newstate.score(points, moves, self), moves
  97. except AssertionError:
  98. pass
  99. def colskip(self, col):
  100. nrows = self.nrows()
  101. for row in range(nrows):
  102. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  103. return row
  104. return nrows
  105. def find_unmovable_blocks(self):
  106. unmoveable = set()
  107. bombed = set()
  108. for block, group in self.find_groups():
  109. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  110. for i in group:
  111. unmoveable.add(i)
  112. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  113. bombed.add(bomb_to_basic(block))
  114. for i in group:
  115. unmoveable.add(i)
  116. for i, block in enumerate(self.blocks):
  117. if block in bombed:
  118. unmoveable.add(i)
  119. return unmoveable
  120. def simulate(self, moves):
  121. s = self.copy()
  122. points = 0
  123. #if not moves:
  124. # return s.score_points(), s
  125. # avoid swapping/grabbing currently exploding items
  126. #unmoveable = s.find_unmovable_blocks()
  127. for move in moves:
  128. if move == LEFT:
  129. assert s.exa > 0
  130. s.exa -= 1
  131. elif move == RIGHT:
  132. assert s.exa < COLUMNS - 1
  133. s.exa += 1
  134. elif move == GRAB:
  135. assert s.held == NOBLOCK
  136. row = s.colskip(s.exa)
  137. assert row < s.nrows()
  138. i = row * COLUMNS + s.exa
  139. #assert i not in unmoveable
  140. s.held = s.blocks[i]
  141. s.blocks[i] = NOBLOCK
  142. elif move == DROP:
  143. assert s.held != NOBLOCK
  144. row = s.colskip(s.exa)
  145. assert row > 0
  146. i = row * COLUMNS + s.exa
  147. s.blocks[i - COLUMNS] = s.held
  148. s.held = NOBLOCK
  149. #points += s.score_points()
  150. elif move == SWAP:
  151. row = s.colskip(s.exa)
  152. assert row < s.nrows() - 2
  153. i = row * COLUMNS + s.exa
  154. j = i + COLUMNS
  155. #assert i not in unmoveable
  156. #assert j not in unmoveable
  157. s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i]
  158. #points += s.score_points()
  159. if moves and max(self.colsizes()) < COLSIZE_MAX:
  160. assert max(s.colsizes()) <= COLSIZE_MAX
  161. points += s.score_points()
  162. return points, s
  163. def find_groups(self, depth=FIND_GROUPS_DEPTH, minsize=2):
  164. def follow_group(i, block, group):
  165. if self.blocks[i] == block and i not in visited:
  166. group.append(i)
  167. visited.add(i)
  168. for nb in self.neighbors(i):
  169. follow_group(nb, block, group)
  170. visited = set()
  171. for col in self.iter_columns():
  172. for i in islice(col, depth):
  173. block = self.blocks[i]
  174. group = []
  175. follow_group(i, block, group)
  176. if len(group) >= minsize:
  177. yield block, group
  178. def neighbors(self, i):
  179. def gen_indices():
  180. row, col = divmod(i, COLUMNS)
  181. if col > 0:
  182. yield i - 1
  183. if col < COLUMNS - 1:
  184. yield i + 1
  185. if row > 0:
  186. yield i - COLUMNS
  187. if row < self.nrows() - 1:
  188. yield i + COLUMNS
  189. for j in gen_indices():
  190. if self.blocks[j] != NOBLOCK:
  191. yield j
  192. def fragmentation(self, depth=FRAG_DEPTH):
  193. """
  194. Minimize the sum of dist(i,j) for all blocks i,j of the same color.
  195. Prioritize horitontal distance to avoid column stacking.
  196. """
  197. def dist(i, j):
  198. yi, xi = divmod(i, COLUMNS)
  199. yj, xj = divmod(j, COLUMNS)
  200. if groups[i] == groups[j]:
  201. return abs(yj - yi)
  202. #return abs(xj - xi) * 2 + abs(yj - yi) - 1
  203. return abs(xj - xi) + abs(yj - yi) * 2 - 1
  204. colors = {}
  205. groups = {}
  206. groupsizes = {}
  207. for groupid, (block, group) in enumerate(self.find_groups(depth, 1)):
  208. colors.setdefault(block, []).extend(group)
  209. for i in group:
  210. groups[i] = groupid
  211. groupsizes[i] = len(group)
  212. return sum(dist(i, j) # * (1 + 2 * is_bomb(block))
  213. for block, color in colors.items()
  214. for i, j in combinations(color, 2))
  215. def score_points(self, multiplier=1):
  216. remove = []
  217. points = 0
  218. for block, group in self.find_groups():
  219. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  220. remove.extend(group)
  221. points += len(group) * multiplier
  222. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  223. points += BOMB_POINTS
  224. remove.extend(group)
  225. for i, other in enumerate(self.blocks):
  226. if other == bomb_to_basic(block):
  227. remove.append(i)
  228. remove.sort()
  229. prev = None
  230. for i in remove:
  231. if i != prev:
  232. while self.blocks[i] != NOBLOCK:
  233. self.blocks[i] = self.blocks[i - COLUMNS]
  234. i -= COLUMNS
  235. prev = i
  236. if points:
  237. points += self.score_points(min(2, multiplier * 2))
  238. return points
  239. def has_explosion(self):
  240. return any(is_bomb(block) and
  241. any(self.blocks[j] == block for j in self.neighbors(i))
  242. for i, block in enumerate(self.blocks))
  243. def gen_moves(self):
  244. yield ()
  245. def make_move(diff):
  246. direction = RIGHT if diff > 0 else LEFT
  247. return abs(diff) * (direction,)
  248. for src in range(COLUMNS):
  249. mov1 = make_move(src - self.exa)
  250. yield mov1 + (SWAP,)
  251. yield mov1 + (GRAB, SWAP, DROP)
  252. yield mov1 + (SWAP, GRAB, SWAP, DROP)
  253. for dst in range(COLUMNS):
  254. if dst != src:
  255. mov2 = make_move(dst - src)
  256. for get in GET:
  257. for put in PUT:
  258. yield mov1 + get + mov2 + put
  259. def solve(self):
  260. if self.held != NOBLOCK:
  261. return (DROP,)
  262. if self.nrows() < MIN_ROWS:
  263. return ()
  264. if self.grabbing_of_dropping():
  265. return ()
  266. if self.has_explosion():
  267. return ()
  268. score, moves = min(self.score_moves())
  269. if not moves:
  270. return (SPEED,)
  271. return moves
  272. def print(self):
  273. print_board(self.blocks, self.exa, self.held)
  274. def tostring(self):
  275. stream = io.StringIO()
  276. with redirect_stdout(stream):
  277. self.print()
  278. return stream.getvalue()
  279. def nrows(self):
  280. return len(self.blocks) // COLUMNS
  281. def moves_to_keys(moves):
  282. return ''.join('jjkadl'[move] for move in moves)
  283. if __name__ == '__main__':
  284. import sys
  285. from PIL import Image
  286. #from pprint import pprint
  287. board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
  288. state = State.detect(board)
  289. print('parsed:')
  290. state.print()
  291. print()
  292. print('empty cols:', state.empty_column_score())
  293. print()
  294. start = time.time()
  295. moves = state.solve()
  296. end = time.time()
  297. print('moves:', moves_to_keys(moves))
  298. print('elapsed:', end - start)
  299. print()
  300. print('target after moves:')
  301. points, newstate = state.simulate(moves)
  302. newstate.print()
  303. print()
  304. for score, moves in sorted(state.score_moves()):
  305. print('move %18s:' % moves_to_keys(moves), score)
  306. #print('moves:', moves_to_keys(moves), moves)
  307. #print('score:', score)
  308. #print('\nmoves:', moves_to_keys(state.solve()))