strategy.py 14 KB


  1. import io
  2. import time
  3. from collections import deque
  4. from contextlib import redirect_stdout
  5. from copy import copy
  6. from itertools import combinations
  7. from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
  8. detect_held, print_board, is_basic, is_bomb
  9. GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
  10. MOVE_DELAYS = (
  11. # in milliseconds
  12. 50, # GRAB
  13. 50, # DROP
  14. 50, # SWAP
  15. 30, # LEFT
  16. 30, # RIGHT
  17. 30, # SPEED
  18. )
  19. MIN_BASIC_GROUP_SIZE = 4
  20. MIN_BOMB_GROUP_SIZE = 2
  21. COLSIZE_PRIO = 5
  22. COLSIZE_PANIC = 8
  23. BOMB_POINTS = 5
  24. class State:
  25. def __init__(self, blocks, exa, held, colskip, busy, moves, groups, groupsizes, maxgroup):
  26. self.blocks = blocks
  27. self.exa = exa
  28. self.held = held
  29. self.colskip = colskip
  30. self.busy = busy
  31. self.moves = moves
  32. self.groups = groups
  33. self.groupsizes = groupsizes
  34. self.maxgroup = maxgroup
  35. self.nrows = len(blocks) // COLUMNS
  36. @classmethod
  37. def detect(cls, board, pad=2):
  38. blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
  39. exa = detect_exa(board)
  40. held = detect_held(board, exa)
  41. colskip = get_colskip(blocks)
  42. busy = get_busy(blocks, colskip)
  43. groups, groupsizes, maxgroup = get_groups(blocks)
  44. return cls(bytearray(blocks), exa, held, bytearray(colskip), busy, (),
  45. bytearray(groups), bytearray(groupsizes), maxgroup)
  46. def copy(self, deep):
  47. mcopy = copy if deep else lambda x: x
  48. return self.__class__(mcopy(self.blocks),
  49. self.exa,
  50. self.held,
  51. mcopy(self.colskip),
  52. self.busy,
  53. self.moves,
  54. mcopy(self.groups),
  55. mcopy(self.groupsizes),
  56. self.maxgroup)
  57. def colbusy(self, col):
  58. return (self.busy >> col) & 1
  59. def colrows(self, col):
  60. return self.nrows - self.colskip[col]
  61. def maxrows(self):
  62. return max(map(self.colrows, range(COLUMNS)))
  63. def causes_panic(self):
  64. return self.max_colsize() >= COLSIZE_PANIC
  65. def max_colsize(self):
  66. return self.nrows - self.empty_rows()
  67. def empty_rows(self):
  68. for i, block in enumerate(self.blocks):
  69. if block != NOBLOCK:
  70. return i // COLUMNS
  71. return 0
  72. def holes(self):
  73. start_row = self.empty_rows()
  74. score = 0
  75. for col in range(COLUMNS):
  76. for row in range(start_row, self.nrows):
  77. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  78. break
  79. score += row - start_row + 1
  80. return score
  81. def locked(self, i):
  82. block = self.blocks[i]
  83. if block == NOBLOCK:
  84. return False
  85. if is_basic(block):
  86. return self.groupsizes[i] >= MIN_BASIC_GROUP_SIZE
  87. assert is_bomb(block)
  88. return self.groupsizes[i] >= MIN_BOMB_GROUP_SIZE
  89. def move(self, *moves):
  90. deep = any(move in (GRAB, DROP, SWAP) for move in moves)
  91. s = self.copy(deep)
  92. s.moves += moves
  93. for move in moves:
  94. if move == LEFT:
  95. assert s.exa > 0
  96. s.exa -= 1
  97. elif move == RIGHT:
  98. assert s.exa < COLUMNS - 1
  99. s.exa += 1
  100. elif move == GRAB:
  101. assert not s.colbusy(s.exa)
  102. assert s.held == NOBLOCK
  103. row = s.colskip[s.exa]
  104. assert row < s.nrows
  105. i = row * COLUMNS + s.exa
  106. if not s.locked(i):
  107. s.held = s.blocks[i]
  108. s.blocks[i] = NOBLOCK
  109. s.colskip[s.exa] += 1
  110. s.ungroup(i)
  111. elif move == DROP:
  112. if s.held != NOBLOCK:
  113. row = s.colskip[s.exa]
  114. assert row > 0
  115. i = (row - 1) * COLUMNS + s.exa
  116. s.blocks[i] = s.held
  117. s.held = NOBLOCK
  118. s.colskip[s.exa] -= 1
  119. s.regroup(i)
  120. elif move == SWAP:
  121. assert not s.colbusy(s.exa)
  122. row = s.colskip[s.exa]
  123. i = row * COLUMNS + s.exa
  124. j = i + COLUMNS
  125. if j < len(s.blocks) and not s.locked(i) and not s.locked(j):
  126. bi = s.blocks[i]
  127. bj = s.blocks[j]
  128. if bi != bj:
  129. s.blocks[i] = NOBLOCK
  130. s.blocks[j] = NOBLOCK
  131. s.ungroup(i)
  132. s.ungroup(j)
  133. s.blocks[j] = bi
  134. s.regroup(j)
  135. s.blocks[i] = bj
  136. s.regroup(i)
  137. return s
  138. def ungroup(self, i):
  139. assert self.blocks[i] == NOBLOCK
  140. visited = set()
  141. oldid = self.groups[i]
  142. for nb in neighbors(i, self.blocks):
  143. if self.groups[nb] == oldid:
  144. newgroup = self.scan_group(nb, visited)
  145. if newgroup:
  146. self.maxgroup = newid = self.maxgroup + 1
  147. for j in newgroup:
  148. self.groups[j] = newid
  149. self.groupsizes[j] = len(newgroup)
  150. self.groups[i] = 0
  151. self.groupsizes[i] = 0
  152. def regroup(self, i):
  153. assert self.blocks[i] != NOBLOCK
  154. self.maxgroup = newid = self.maxgroup + 1
  155. newgroup = self.scan_group(i, set())
  156. for j in newgroup:
  157. self.groups[j] = newid
  158. self.groupsizes[j] = len(newgroup)
  159. def scan_group(self, start, visited):
  160. def scan(i):
  161. if i not in visited:
  162. yield i
  163. visited.add(i)
  164. for nb in neighbors(i, self.blocks):
  165. if self.blocks[nb] == block:
  166. yield from scan(nb)
  167. block = self.blocks[start]
  168. return tuple(scan(start))
  169. def fragmentation(self):
  170. """
  171. Minimize the sum of dist(i,j) between all blocks i,j of the same color.
  172. Magnify vertical distances to avoid column stacking.
  173. """
  174. def dist(i, j):
  175. yi, xi = divmod(i, COLUMNS)
  176. yj, xj = divmod(j, COLUMNS)
  177. # for blocks in the same group, only count vertical distance so
  178. # that groups are spread out horizontally
  179. if self.groups[i] == self.groups[j]:
  180. return abs(yj - yi)
  181. return abs(xj - xi) + abs(yj - yi) * 2 - 1
  182. colors = {}
  183. for i, block in enumerate(self.blocks):
  184. if block != NOBLOCK:
  185. colors.setdefault(block, []).append(i)
  186. return sum(dist(i, j)
  187. for blocks in colors.values()
  188. for i, j in combinations(blocks, 2))
  189. def group_leaders(self):
  190. seen = set()
  191. for i, groupid in enumerate(self.groups):
  192. if groupid > 0 and groupid not in seen:
  193. seen.add(groupid)
  194. yield i
  195. def points(self):
  196. points = 0
  197. for leader in self.group_leaders():
  198. block = self.blocks[leader]
  199. size = self.groupsizes[leader]
  200. if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE:
  201. points += size
  202. elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE:
  203. points += self.maxrows()
  204. return -points
  205. def gen_moves(self):
  206. yield self
  207. for src in self.gen_shift(not self.colbusy(self.exa)):
  208. yield from src.gen_stationary()
  209. for get in src.gen_get():
  210. for dst in get.gen_shift(False):
  211. yield from dst.gen_put()
  212. def gen_shift(self, allow_noshift):
  213. if allow_noshift:
  214. yield self
  215. left = self
  216. for i in range(self.exa):
  217. left = left.move(LEFT)
  218. yield left
  219. right = self
  220. for i in range(COLUMNS - self.exa - 1):
  221. right = right.move(RIGHT)
  222. yield right
  223. def gen_stationary(self):
  224. # SWAP
  225. # GRAB, SWAP, DROP
  226. # GRAB, SWAP, DROP, SWAP
  227. # SWAP, GRAB, SWAP, DROP
  228. # SWAP, GRAB, SWAP, DROP, SWAP
  229. if not self.colbusy(self.exa):
  230. avail = self.colrows(self.exa)
  231. if avail >= 2:
  232. swap = self.move(SWAP)
  233. yield swap
  234. if avail >= 3:
  235. grab = self.move(GRAB, SWAP, DROP)
  236. yield grab
  237. yield grab.move(SWAP)
  238. swap = swap.move(GRAB, SWAP, DROP)
  239. yield swap
  240. yield swap.move(SWAP)
  241. def gen_get(self):
  242. # GRAB
  243. # SWAP, GRAB
  244. # GRAB, SWAP, DROP, SWAP, GRAB
  245. if not self.colbusy(self.exa):
  246. avail = self.colrows(self.exa)
  247. if avail >= 1:
  248. grab = self.move(GRAB)
  249. yield grab
  250. if avail >= 2:
  251. yield self.move(SWAP, GRAB)
  252. if avail >= 3:
  253. yield grab.move(SWAP, DROP, SWAP, GRAB)
  254. def gen_put(self):
  255. # DROP
  256. # DROP, SWAP
  257. # DROP, SWAP, GRAB, SWAP, DROP
  258. if not self.colbusy(self.exa):
  259. avail = self.colrows(self.exa)
  260. drop = self.move(DROP)
  261. yield drop
  262. if avail >= 1:
  263. swap = drop.move(SWAP)
  264. yield swap
  265. if avail >= 2:
  266. yield swap.move(GRAB, SWAP, DROP)
  267. def force(self, *moves):
  268. state = self.move(*moves)
  269. state.score = ()
  270. return state
  271. def solve(self):
  272. assert self.exa is not None
  273. if self.held != NOBLOCK:
  274. return self.force(DROP)
  275. pool = deque(self.gen_moves())
  276. if len(pool) == 0:
  277. return self.force()
  278. best_score = ()
  279. for key in self.score_keys():
  280. if len(pool) == 1:
  281. break
  282. for state in pool:
  283. state.score = key(state)
  284. best = min(state.score for state in pool)
  285. best_score += (best,)
  286. for i in range(len(pool)):
  287. state = pool.popleft()
  288. if state.score == best:
  289. pool.append(state)
  290. best = pool.popleft()
  291. best.score = best_score
  292. return best
  293. def score_keys(self):
  294. cls = self.__class__
  295. colsize = self.nrows - 2
  296. if colsize >= COLSIZE_PANIC:
  297. return cls.holes, cls.points, cls.nmoves, cls.fragmentation
  298. if colsize >= COLSIZE_PRIO:
  299. return cls.causes_panic, cls.points, cls.holes, \
  300. cls.fragmentation, cls.nmoves
  301. return cls.points, cls.fragmentation, cls.holes, cls.nmoves
  302. def print_groupsizes(self):
  303. for start in range(len(self.groupsizes) - COLUMNS, -1, -COLUMNS):
  304. print(' '.join('%-2d' % g for g in self.groupsizes[start:start + COLUMNS]))
  305. def print_groups(self):
  306. for start in range(len(self.groups) - COLUMNS, -1, -COLUMNS):
  307. print(' '.join('%-2d' % g for g in self.groups[start:start + COLUMNS]))
  308. def print(self):
  309. print_board(self.blocks, self.exa, self.held)
  310. def tostring(self):
  311. stream = io.StringIO()
  312. with redirect_stdout(stream):
  313. self.print()
  314. return stream.getvalue()
  315. def has_same_exa(self, state):
  316. return self.exa == state.exa and self.held == state.held
  317. def nmoves(self):
  318. return len(self.moves)
  319. def delay(self):
  320. return moves_delay(self.moves)
  321. def keys(self):
  322. return moves_to_keys(self.moves)
  323. def loops(self, prev):
  324. return self.moves and \
  325. self.exa == prev.exa and \
  326. self.moves == prev.moves and \
  327. self.score == prev.score
  328. def get_colskip(blocks):
  329. def colskip(col):
  330. for row, block in enumerate(blocks[col::COLUMNS]):
  331. if block != NOBLOCK:
  332. return row
  333. return len(blocks) // COLUMNS
  334. return list(map(colskip, range(COLUMNS)))
  335. def get_busy(blocks, colskip):
  336. mask = 0
  337. for col, skip in enumerate(colskip):
  338. start = (skip + 1) * COLUMNS + col
  339. colbusy = NOBLOCK in blocks[start::COLUMNS]
  340. mask |= colbusy << col
  341. return mask
  342. def scan_group(blocks, i, block, visited):
  343. yield i
  344. visited.add(i)
  345. for nb in neighbors(i, blocks):
  346. if blocks[nb] == block and nb not in visited:
  347. yield from scan_group(blocks, nb, block, visited)
  348. def get_groups(blocks):
  349. groupid = 0
  350. groups = [0] * len(blocks)
  351. groupsizes = [0] * len(blocks)
  352. visited = set()
  353. for i, block in enumerate(blocks):
  354. if block != NOBLOCK and i not in visited:
  355. groupid += 1
  356. group = tuple(scan_group(blocks, i, block, visited))
  357. for j in group:
  358. groups[j] = groupid
  359. groupsizes[j] = len(group)
  360. return groups, groupsizes, groupid
  361. def neighbors(i, blocks):
  362. y, x = divmod(i, COLUMNS)
  363. if x > 0:
  364. yield i - 1
  365. if x < COLUMNS - 1:
  366. yield i + 1
  367. if y > 0:
  368. yield i - COLUMNS
  369. if y < len(blocks) // COLUMNS - 1:
  370. yield i + COLUMNS
  371. def move_to_key(move):
  372. return 'jjkadl'[move]
  373. def moves_to_keys(moves):
  374. return ''.join(move_to_key(move) for move in moves)
  375. def moves_delay(moves):
  376. return sum(MOVE_DELAYS[m] for m in moves)
  377. if __name__ == '__main__':
  378. import sys
  379. from PIL import Image
  380. board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
  381. state = State.detect(board)
  382. print('parsed:')
  383. state.print()
  384. print()
  385. print('groups:')
  386. state.print_groups()
  387. print()
  388. print('groupsizes:')
  389. state.print_groupsizes()
  390. print()
  391. start = time.time()
  392. newstate = state.solve()
  393. end = time.time()
  394. print('best move:', newstate.keys())
  395. print('score:', newstate.score)
  396. print('elapsed:', round((end - start) * 1000, 1), 'ms')
  397. print()
  398. print('target after move:')
  399. newstate.print()
  400. #print()
  401. #print('generated moves:')
  402. #for state in state.gen_moves():
  403. # print(state.keys())