strategy.py 11 KB


  1. import io
  2. import time
  3. from collections import deque
  4. from contextlib import redirect_stdout
  5. from itertools import combinations, islice
  6. from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
  7. detect_held, print_board, is_basic, is_bomb
  8. GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
  9. MOVE_DELAYS = (
  10. # in milliseconds
  11. 50, # GRAB
  12. 50, # DROP
  13. 50, # SWAP
  14. 30, # LEFT
  15. 30, # RIGHT
  16. 30, # SPEED
  17. )
  18. GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
  19. PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
  20. MIN_BASIC_GROUP_SIZE = 4
  21. MIN_BOMB_GROUP_SIZE = 2
  22. POINTS_DEPTH = 3
  23. FRAG_DEPTH = 4
  24. DEFRAG_PRIO = 4
  25. COLSIZE_PRIO = 5
  26. COLSIZE_PANIC = 8
  27. COLSIZE_MAX = 9
  28. BOMB_POINTS = 5
  29. class State:
  30. def __init__(self, blocks, exa, held, colskip=None):
  31. self.blocks = blocks
  32. self.exa = exa
  33. self.held = held
  34. self.moves = ()
  35. self.score = ()
  36. self.nrows = len(self.blocks) // COLUMNS
  37. if colskip is None:
  38. colskip = []
  39. for col in range(COLUMNS):
  40. for row in range(self.nrows):
  41. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  42. colskip.append(row)
  43. break
  44. else:
  45. colskip.append(self.nrows)
  46. self.colskip = colskip
  47. def grabbing_or_dropping(self):
  48. skip = self.colskip[self.exa]
  49. i = (skip + 1) * COLUMNS + self.exa
  50. return i < len(self.blocks) and self.blocks[i] == NOBLOCK
  51. def iter_columns(self):
  52. def gen_col(col):
  53. for row in range(self.nrows):
  54. i = row * COLUMNS + col
  55. if self.blocks[i] != NOBLOCK:
  56. yield i
  57. for col in range(COLUMNS):
  58. yield gen_col(col)
  59. @classmethod
  60. def detect(cls, board, pad=2):
  61. blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
  62. exa = detect_exa(board)
  63. held = detect_held(board, exa)
  64. return cls(blocks, exa, held)
  65. def copy(self):
  66. return self.__class__(list(self.blocks), self.exa, self.held,
  67. list(self.colskip))
  68. def causes_panic(self):
  69. return self.max_colsize() >= COLSIZE_PANIC
  70. def max_colsize(self):
  71. return self.nrows - self.empty_rows()
  72. def empty_rows(self):
  73. for i, block in enumerate(self.blocks):
  74. if block != NOBLOCK:
  75. return i // COLUMNS
  76. return 0
  77. def holes(self):
  78. start_row = self.empty_rows()
  79. score = 0
  80. for col in range(COLUMNS):
  81. for row in range(start_row, self.nrows):
  82. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  83. break
  84. score += row - start_row + 1
  85. return score
  86. def move(self, moves):
  87. s = self.copy() if moves else self
  88. s.moves = moves
  89. s.placed = set()
  90. s.grabbed = {}
  91. for move in moves:
  92. if move == LEFT:
  93. assert s.exa > 0
  94. s.exa -= 1
  95. elif move == RIGHT:
  96. assert s.exa < COLUMNS - 1
  97. s.exa += 1
  98. elif move == GRAB:
  99. assert s.held == NOBLOCK
  100. row = s.colskip[s.exa]
  101. assert row < s.nrows
  102. i = row * COLUMNS + s.exa
  103. s.held = s.blocks[i]
  104. s.blocks[i] = NOBLOCK
  105. s.grabbed[i] = s.held
  106. s.colskip[s.exa] += 1
  107. elif move == DROP:
  108. assert s.held != NOBLOCK
  109. row = s.colskip[s.exa]
  110. assert row > 0
  111. i = (row - 1) * COLUMNS + s.exa
  112. s.blocks[i] = s.held
  113. s.held = NOBLOCK
  114. s.placed.add(i)
  115. s.colskip[s.exa] -= 1
  116. elif move == SWAP:
  117. row = s.colskip[s.exa]
  118. i = row * COLUMNS + s.exa
  119. j = i + COLUMNS
  120. assert j < len(s.blocks)
  121. bi = s.blocks[i]
  122. bj = s.blocks[j]
  123. if bi != bj:
  124. s.blocks[i] = bj
  125. s.blocks[j] = bi
  126. s.grabbed[i] = bi
  127. s.grabbed[j] = bj
  128. s.placed.add(i)
  129. s.placed.add(j)
  130. if moves and self.max_colsize() < COLSIZE_MAX:
  131. assert s.max_colsize() <= COLSIZE_MAX
  132. return s
  133. def find_groups(self, depth=POINTS_DEPTH, minsize=2):
  134. def follow_group(i, block, group):
  135. if self.blocks[i] == block and i not in visited:
  136. group.append(i)
  137. visited.add(i)
  138. for nb in self.neighbors(i):
  139. follow_group(nb, block, group)
  140. visited = set()
  141. for col in self.iter_columns():
  142. for i in islice(col, depth):
  143. block = self.blocks[i]
  144. group = []
  145. follow_group(i, block, group)
  146. if len(group) >= minsize:
  147. yield block, group
  148. def neighbors(self, i):
  149. row, col = divmod(i, COLUMNS)
  150. if col > 0 and self.blocks[i - 1] != NOBLOCK:
  151. yield i - 1
  152. if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK:
  153. yield i + 1
  154. if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK:
  155. yield i - COLUMNS
  156. if row < self.nrows - 1 and self.blocks[i + COLUMNS] != NOBLOCK:
  157. yield i + COLUMNS
  158. def fragmentation(self, depth=FRAG_DEPTH):
  159. """
  160. Minimize the sum of dist(i,j) between all blocks i,j of the same color.
  161. Magnify vertical distances to avoid column stacking.
  162. """
  163. def dist(i, j):
  164. yi, xi = divmod(i, COLUMNS)
  165. yj, xj = divmod(j, COLUMNS)
  166. # for blocks in the same group, only count vertical distance so
  167. # that groups are spread out horizontally
  168. if groups[i] == groups[j]:
  169. return abs(yj - yi)
  170. return abs(xj - xi) + abs(yj - yi) * 2 - 1
  171. colors = {}
  172. groups = {}
  173. groupsizes = {}
  174. for groupid, (block, group) in enumerate(self.find_groups(depth, 1)):
  175. colors.setdefault(block, []).extend(group)
  176. for i in group:
  177. groups[i] = groupid
  178. groupsizes[i] = len(group)
  179. return sum(dist(i, j)
  180. for block, color in colors.items()
  181. for i, j in combinations(color, 2))
  182. def points(self):
  183. def group_size(start):
  184. work = [start]
  185. visited.add(start)
  186. size = 0
  187. block = self.blocks[start]
  188. while work:
  189. i = work.pop()
  190. # avoid giving points to moving a block within the same group
  191. if self.grabbed.get(i, None) == block:
  192. return 0
  193. if self.blocks[i] == block:
  194. size += 1
  195. for nb in self.neighbors(i):
  196. if nb not in visited:
  197. visited.add(nb)
  198. work.append(nb)
  199. return size
  200. points = 0
  201. visited = set()
  202. for i in self.placed:
  203. if i not in visited:
  204. block = self.blocks[i]
  205. size = group_size(i)
  206. if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE:
  207. points += size
  208. elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE:
  209. points += BOMB_POINTS
  210. return -points
  211. def gen_moves(self):
  212. yield ()
  213. def shift_exa(diff):
  214. direction = RIGHT if diff > 0 else LEFT
  215. return abs(diff) * (direction,)
  216. ignore_exa_column = self.grabbing_or_dropping()
  217. for src in range(COLUMNS):
  218. mov1 = shift_exa(src - self.exa)
  219. if mov1 or not ignore_exa_column:
  220. yield mov1 + (SWAP,)
  221. yield mov1 + (GRAB, SWAP, DROP)
  222. yield mov1 + (SWAP, GRAB, SWAP, DROP)
  223. yield mov1 + (GRAB, SWAP, DROP, SWAP)
  224. yield mov1 + (SWAP, GRAB, SWAP, DROP, SWAP)
  225. for dst in range(COLUMNS):
  226. if dst != src:
  227. mov2 = shift_exa(dst - src)
  228. for get in GET:
  229. for put in PUT:
  230. yield mov1 + get + mov2 + put
  231. def gen_valid_moves(self):
  232. for moves in self.gen_moves():
  233. try:
  234. yield self.move(moves)
  235. except AssertionError:
  236. pass
  237. def solve(self):
  238. assert self.exa is not None
  239. if self.held != NOBLOCK:
  240. return self.move((DROP,))
  241. valid = deque(self.gen_valid_moves())
  242. if len(valid) == 0:
  243. return self.move(())
  244. best_score = ()
  245. for key in self.score_keys():
  246. if len(valid) == 1:
  247. break
  248. for state in valid:
  249. state.score = key(state)
  250. best = min(state.score for state in valid)
  251. best_score += (best,)
  252. for i in range(len(valid)):
  253. state = valid.popleft()
  254. if state.score == best:
  255. valid.append(state)
  256. best = valid.popleft()
  257. best.score = best_score
  258. return best
  259. def score_keys(self):
  260. cls = self.__class__
  261. colsize = self.nrows - 2
  262. if colsize >= COLSIZE_PANIC:
  263. return cls.holes, cls.nmoves, cls.points, cls.fragmentation
  264. if colsize >= COLSIZE_PRIO:
  265. return cls.causes_panic, cls.points, cls.holes, \
  266. cls.fragmentation, cls.nmoves
  267. return cls.points, cls.fragmentation, cls.holes, cls.nmoves
  268. def print(self):
  269. print_board(self.blocks, self.exa, self.held)
  270. def tostring(self):
  271. stream = io.StringIO()
  272. with redirect_stdout(stream):
  273. self.print()
  274. return stream.getvalue()
  275. def has_same_exa(self, state):
  276. return self.exa == state.exa and self.held == state.held
  277. def nmoves(self):
  278. return len(self.moves)
  279. def delay(self):
  280. return moves_delay(self.moves)
  281. def keys(self):
  282. return moves_to_keys(self.moves)
  283. def __lt__(self, other):
  284. return self.score < other.score
  285. def loops(self, prev):
  286. return self.moves and \
  287. self.exa == prev.exa and \
  288. self.moves == prev.moves and \
  289. self.score == prev.score
  290. def move_to_key(move):
  291. return 'jjkadl'[move]
  292. def moves_to_keys(moves):
  293. return ''.join(move_to_key(move) for move in moves)
  294. def moves_delay(moves):
  295. return sum(MOVE_DELAYS[m] for m in moves)
  296. if __name__ == '__main__':
  297. import sys
  298. from PIL import Image
  299. board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
  300. state = State.detect(board)
  301. print('parsed:')
  302. state.print()
  303. print()
  304. start = time.time()
  305. newstate = state.solve()
  306. end = time.time()
  307. print('best move:', newstate.keys())
  308. print('score:', newstate.score)
  309. print('elapsed:', round((end - start) * 1000, 1), 'ms')
  310. print()
  311. print('target after move:')
  312. newstate.print()