strategy.py 14 KB


  1. import io
  2. import time
  3. from collections import deque
  4. from contextlib import redirect_stdout
  5. from copy import copy
  6. from itertools import combinations
  7. from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
  8. detect_held, print_board, is_basic, is_bomb
  9. GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
  10. MOVE_DELAYS = (
  11. # in milliseconds
  12. 50, # GRAB
  13. 50, # DROP
  14. 50, # SWAP
  15. 30, # LEFT
  16. 30, # RIGHT
  17. 30, # SPEED
  18. )
  19. MIN_BASIC_GROUP_SIZE = 4
  20. MIN_BOMB_GROUP_SIZE = 2
  21. COLSIZE_PRIO = 5
  22. COLSIZE_PANIC = 8
  23. BOMB_POINTS = 5
  24. class State:
  25. def __init__(self, blocks, exa, held, colskip, busy, moves, groups, groupsizes, maxgroup):
  26. self.blocks = blocks
  27. self.exa = exa
  28. self.held = held
  29. self.colskip = colskip
  30. self.busy = busy
  31. self.moves = moves
  32. self.groups = groups
  33. self.groupsizes = groupsizes
  34. self.maxgroup = maxgroup
  35. self.nrows = len(blocks) // COLUMNS
  36. @classmethod
  37. def detect(cls, board, pad=2):
  38. blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
  39. exa = detect_exa(board)
  40. held = detect_held(board, exa)
  41. colskip = get_colskip(blocks)
  42. busy = get_busy(blocks, colskip)
  43. groups, groupsizes, maxgroup = get_groups(blocks)
  44. return cls(bytearray(blocks), exa, held, bytearray(colskip), busy, (),
  45. bytearray(groups), bytearray(groupsizes), maxgroup)
  46. def copy(self, deep):
  47. mcopy = copy if deep else lambda x: x
  48. return self.__class__(mcopy(self.blocks),
  49. self.exa,
  50. self.held,
  51. mcopy(self.colskip),
  52. self.busy,
  53. self.moves,
  54. mcopy(self.groups),
  55. mcopy(self.groupsizes),
  56. self.maxgroup)
  57. def colbusy(self, col):
  58. return (self.busy >> col) & 1
  59. def colrows(self, col):
  60. return self.nrows - self.colskip[col]
  61. def maxrows(self):
  62. return max(map(self.colrows, range(COLUMNS)))
  63. def causes_panic(self):
  64. return self.max_colsize() >= COLSIZE_PANIC
  65. def max_colsize(self):
  66. return self.nrows - self.empty_rows()
  67. def empty_rows(self):
  68. for i, block in enumerate(self.blocks):
  69. if block != NOBLOCK:
  70. return i // COLUMNS
  71. return 0
  72. def holes(self):
  73. start_row = self.empty_rows()
  74. score = 0
  75. for col in range(COLUMNS):
  76. for row in range(start_row, self.nrows):
  77. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  78. break
  79. score += row - start_row + 1
  80. return score
  81. def move(self, *moves):
  82. deep = any(move in (GRAB, DROP, SWAP) for move in moves)
  83. s = self.copy(deep)
  84. s.moves += moves
  85. for move in moves:
  86. if move == LEFT:
  87. assert s.exa > 0
  88. s.exa -= 1
  89. elif move == RIGHT:
  90. assert s.exa < COLUMNS - 1
  91. s.exa += 1
  92. elif move == GRAB:
  93. assert not s.colbusy(s.exa)
  94. assert s.held == NOBLOCK
  95. row = s.colskip[s.exa]
  96. assert row < s.nrows
  97. i = row * COLUMNS + s.exa
  98. s.held = s.blocks[i]
  99. s.blocks[i] = NOBLOCK
  100. s.colskip[s.exa] += 1
  101. s.ungroup(i)
  102. elif move == DROP:
  103. assert not s.colbusy(s.exa)
  104. assert s.held != NOBLOCK
  105. row = s.colskip[s.exa]
  106. assert row > 0
  107. i = (row - 1) * COLUMNS + s.exa
  108. s.blocks[i] = s.held
  109. s.held = NOBLOCK
  110. s.colskip[s.exa] -= 1
  111. s.regroup(i)
  112. elif move == SWAP:
  113. assert not s.colbusy(s.exa)
  114. row = s.colskip[s.exa]
  115. i = row * COLUMNS + s.exa
  116. j = i + COLUMNS
  117. assert j < len(s.blocks)
  118. bi = s.blocks[i]
  119. bj = s.blocks[j]
  120. if bi != bj:
  121. s.blocks[i] = NOBLOCK
  122. s.blocks[j] = NOBLOCK
  123. s.ungroup(i)
  124. s.ungroup(j)
  125. s.blocks[j] = bi
  126. s.regroup(j)
  127. s.blocks[i] = bj
  128. s.regroup(i)
  129. return s
  130. def ungroup(self, i):
  131. assert self.blocks[i] == NOBLOCK
  132. visited = set()
  133. oldid = self.groups[i]
  134. for nb in neighbors(i, self.blocks):
  135. if self.groups[nb] == oldid:
  136. newgroup = self.scan_group(nb, visited)
  137. if newgroup:
  138. self.maxgroup = newid = self.maxgroup + 1
  139. for j in newgroup:
  140. self.groups[j] = newid
  141. self.groupsizes[j] = len(newgroup)
  142. self.groups[i] = 0
  143. self.groupsizes[i] = 0
  144. def regroup(self, i):
  145. assert self.blocks[i] != NOBLOCK
  146. self.maxgroup = newid = self.maxgroup + 1
  147. newgroup = self.scan_group(i, set())
  148. for j in newgroup:
  149. self.groups[j] = newid
  150. self.groupsizes[j] = len(newgroup)
  151. def scan_group(self, start, visited):
  152. def scan(i):
  153. if i not in visited:
  154. yield i
  155. visited.add(i)
  156. for nb in neighbors(i, self.blocks):
  157. if self.blocks[nb] == block:
  158. yield from scan(nb)
  159. block = self.blocks[start]
  160. return tuple(scan(start))
  161. def fragmentation(self):
  162. """
  163. Minimize the sum of dist(i,j) between all blocks i,j of the same color.
  164. Magnify vertical distances to avoid column stacking.
  165. """
  166. def dist(i, j):
  167. yi, xi = divmod(i, COLUMNS)
  168. yj, xj = divmod(j, COLUMNS)
  169. # for blocks in the same group, only count vertical distance so
  170. # that groups are spread out horizontally
  171. if self.groups[i] == self.groups[j]:
  172. return abs(yj - yi)
  173. return abs(xj - xi) + abs(yj - yi) * 2 - 1
  174. colors = {}
  175. for i, block in enumerate(self.blocks):
  176. if block != NOBLOCK:
  177. colors.setdefault(block, []).append(i)
  178. return sum(dist(i, j)
  179. for blocks in colors.values()
  180. for i, j in combinations(blocks, 2))
  181. def group_leaders(self):
  182. seen = set()
  183. for i, groupid in enumerate(self.groups):
  184. if groupid > 0 and groupid not in seen:
  185. seen.add(groupid)
  186. yield i
  187. def points(self):
  188. points = 0
  189. for leader in self.group_leaders():
  190. block = self.blocks[leader]
  191. size = self.groupsizes[leader]
  192. if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE:
  193. points += size
  194. elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE:
  195. points += self.maxrows()
  196. return -points
  197. def gen_moves(self):
  198. yield self
  199. for src in self.gen_shift(not self.colbusy(self.exa)):
  200. yield from src.gen_stationary()
  201. for get in src.gen_get():
  202. for dst in get.gen_shift(False):
  203. yield from dst.gen_put()
  204. def gen_shift(self, allow_noshift):
  205. if allow_noshift:
  206. yield self
  207. left = self
  208. for i in range(self.exa):
  209. left = left.move(LEFT)
  210. yield left
  211. right = self
  212. for i in range(COLUMNS - self.exa - 1):
  213. right = right.move(RIGHT)
  214. yield right
  215. def gen_stationary(self):
  216. # SWAP
  217. # GRAB, SWAP, DROP
  218. # GRAB, SWAP, DROP, SWAP
  219. # SWAP, GRAB, SWAP, DROP
  220. # SWAP, GRAB, SWAP, DROP, SWAP
  221. if not self.colbusy(self.exa):
  222. avail = self.colrows(self.exa)
  223. if avail >= 2:
  224. swap = self.move(SWAP)
  225. yield swap
  226. if avail >= 3:
  227. grab = self.move(GRAB, SWAP, DROP)
  228. yield grab
  229. yield grab.move(SWAP)
  230. swap = swap.move(GRAB, SWAP, DROP)
  231. yield swap
  232. yield swap.move(SWAP)
  233. def gen_get(self):
  234. # GRAB
  235. # SWAP, GRAB
  236. # GRAB, SWAP, DROP, SWAP, GRAB
  237. if not self.colbusy(self.exa):
  238. avail = self.colrows(self.exa)
  239. if avail >= 1:
  240. grab = self.move(GRAB)
  241. yield grab
  242. if avail >= 2:
  243. yield self.move(SWAP, GRAB)
  244. if avail >= 3:
  245. yield grab.move(SWAP, DROP, SWAP, GRAB)
  246. def gen_put(self):
  247. # DROP
  248. # DROP, SWAP
  249. # DROP, SWAP, GRAB, SWAP, DROP
  250. if not self.colbusy(self.exa):
  251. avail = self.colrows(self.exa)
  252. drop = self.move(DROP)
  253. yield drop
  254. if avail >= 1:
  255. swap = drop.move(SWAP)
  256. yield swap
  257. if avail >= 2:
  258. yield swap.move(GRAB, SWAP, DROP)
  259. def force(self, *moves):
  260. state = self.move(*moves)
  261. state.score = ()
  262. return state
  263. def solve(self):
  264. assert self.exa is not None
  265. if self.held != NOBLOCK:
  266. return self.force(DROP)
  267. pool = deque(self.gen_moves())
  268. if len(pool) == 0:
  269. return self.force()
  270. best_score = ()
  271. for key in self.score_keys():
  272. if len(pool) == 1:
  273. break
  274. for state in pool:
  275. state.score = key(state)
  276. best = min(state.score for state in pool)
  277. best_score += (best,)
  278. for i in range(len(pool)):
  279. state = pool.popleft()
  280. if state.score == best:
  281. pool.append(state)
  282. best = pool.popleft()
  283. best.score = best_score
  284. return best
  285. def score_keys(self):
  286. cls = self.__class__
  287. colsize = self.nrows - 2
  288. if colsize >= COLSIZE_PANIC:
  289. return cls.holes, cls.points, cls.nmoves, cls.fragmentation
  290. if colsize >= COLSIZE_PRIO:
  291. return cls.causes_panic, cls.points, cls.holes, \
  292. cls.fragmentation, cls.nmoves
  293. return cls.points, cls.fragmentation, cls.holes, cls.nmoves
  294. def print_groupsizes(self):
  295. for start in range(len(self.groupsizes) - COLUMNS, -1, -COLUMNS):
  296. print(' '.join('%-2d' % g for g in self.groupsizes[start:start + COLUMNS]))
  297. def print_groups(self):
  298. for start in range(len(self.groups) - COLUMNS, -1, -COLUMNS):
  299. print(' '.join('%-2d' % g for g in self.groups[start:start + COLUMNS]))
  300. def print(self):
  301. print_board(self.blocks, self.exa, self.held)
  302. def tostring(self):
  303. stream = io.StringIO()
  304. with redirect_stdout(stream):
  305. self.print()
  306. return stream.getvalue()
  307. def has_same_exa(self, state):
  308. return self.exa == state.exa and self.held == state.held
  309. def nmoves(self):
  310. return len(self.moves)
  311. def delay(self):
  312. return moves_delay(self.moves)
  313. def keys(self):
  314. return moves_to_keys(self.moves)
  315. def loops(self, prev):
  316. return self.moves and \
  317. self.exa == prev.exa and \
  318. self.moves == prev.moves and \
  319. self.score == prev.score
  320. def get_colskip(blocks):
  321. def colskip(col):
  322. for row, block in enumerate(blocks[col::COLUMNS]):
  323. if block != NOBLOCK:
  324. return row
  325. return len(blocks) // COLUMNS
  326. return list(map(colskip, range(COLUMNS)))
  327. def get_busy(blocks, colskip):
  328. mask = 0
  329. for col, skip in enumerate(colskip):
  330. start = (skip + 1) * COLUMNS + col
  331. colbusy = NOBLOCK in blocks[start::COLUMNS]
  332. mask |= colbusy << col
  333. return mask
  334. def scan_group(blocks, i, block, visited):
  335. yield i
  336. visited.add(i)
  337. for nb in neighbors(i, blocks):
  338. if blocks[nb] == block and nb not in visited:
  339. yield from scan_group(blocks, nb, block, visited)
  340. def get_groups(blocks):
  341. groupid = 0
  342. groups = [0] * len(blocks)
  343. groupsizes = [0] * len(blocks)
  344. visited = set()
  345. for i, block in enumerate(blocks):
  346. if block != NOBLOCK and i not in visited:
  347. groupid += 1
  348. group = tuple(scan_group(blocks, i, block, visited))
  349. for j in group:
  350. groups[j] = groupid
  351. groupsizes[j] = len(group)
  352. return groups, groupsizes, groupid
  353. def neighbors(i, blocks):
  354. y, x = divmod(i, COLUMNS)
  355. if x > 0:
  356. yield i - 1
  357. if x < COLUMNS - 1:
  358. yield i + 1
  359. if y > 0:
  360. yield i - COLUMNS
  361. if y < len(blocks) // COLUMNS - 1:
  362. yield i + COLUMNS
  363. def move_to_key(move):
  364. return 'jjkadl'[move]
  365. def moves_to_keys(moves):
  366. return ''.join(move_to_key(move) for move in moves)
  367. def moves_delay(moves):
  368. return sum(MOVE_DELAYS[m] for m in moves)
  369. if __name__ == '__main__':
  370. import sys
  371. from PIL import Image
  372. board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
  373. state = State.detect(board)
  374. print('parsed:')
  375. state.print()
  376. print()
  377. print('groups:')
  378. state.print_groups()
  379. print()
  380. print('groupsizes:')
  381. state.print_groupsizes()
  382. print()
  383. start = time.time()
  384. newstate = state.solve()
  385. end = time.time()
  386. print('best move:', newstate.keys())
  387. print('score:', newstate.score)
  388. print('elapsed:', round((end - start) * 1000, 1), 'ms')
  389. print()
  390. print('target after move:')
  391. newstate.print()
  392. #print()
  393. #print('generated moves:')
  394. #for state in state.gen_moves():
  395. # print(state.keys())