strategy.py 12 KB


  1. import io
  2. import time
  3. from contextlib import redirect_stdout
  4. from itertools import combinations, islice
  5. from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
  6. detect_held, print_board, is_basic, is_bomb, \
  7. bomb_to_basic
  8. GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
  9. MOVE_DELAYS = (
  10. # in milliseconds
  11. 50, # GRAB
  12. 50, # DROP
  13. 50, # SWAP
  14. 30, # LEFT
  15. 30, # RIGHT
  16. 30, # SPEED
  17. )
  18. GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
  19. PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
  20. MIN_BASIC_GROUP_SIZE = 4
  21. MIN_BOMB_GROUP_SIZE = 2
  22. POINTS_DEPTH = 3
  23. FRAG_DEPTH = 5
  24. DEFRAG_PRIO = 4
  25. COLSIZE_PRIO = 5
  26. COLSIZE_PANIC = 8
  27. COLSIZE_MAX = 9
  28. BOMB_POINTS = 1
  29. MIN_ROWS = 2
  30. class State:
  31. def __init__(self, blocks, exa, held):
  32. self.blocks = blocks
  33. self.exa = exa
  34. self.held = held
  35. def grabbing_or_dropping(self):
  36. skip = self.colskip(self.exa)
  37. i = (skip + 1) * COLUMNS + self.exa
  38. return i < len(self.blocks) and self.blocks[i] == NOBLOCK
  39. def iter_columns(self):
  40. nrows = self.nrows()
  41. def gen_col(col):
  42. for row in range(nrows):
  43. i = row * COLUMNS + col
  44. if self.blocks[i] != NOBLOCK:
  45. yield i
  46. for col in range(COLUMNS):
  47. yield gen_col(col)
  48. @classmethod
  49. def detect(cls, board, pad=2):
  50. blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
  51. exa = detect_exa(board)
  52. held = detect_held(board, exa)
  53. return cls(blocks, exa, held)
  54. def copy(self):
  55. return State(list(self.blocks), self.exa, self.held)
  56. def causes_panic(self):
  57. return self.max_colsize() >= COLSIZE_PANIC
  58. def max_colsize(self):
  59. return self.nrows() - self.empty_rows()
  60. def empty_rows(self):
  61. for i, block in enumerate(self.blocks):
  62. if block != NOBLOCK:
  63. return i // COLUMNS
  64. return 0
  65. def holes(self):
  66. start_row = self.empty_rows()
  67. total_rows = self.nrows()
  68. score = 0
  69. for col in range(COLUMNS):
  70. for row in range(start_row, total_rows):
  71. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  72. break
  73. score += row - start_row + 1
  74. return score
  75. def score(self, points, moves, prev):
  76. prev_colsize = prev.nrows() - 2
  77. #delay = moves_delay(moves)
  78. delay = len(moves)
  79. # Don't care about defragging for few rows, just score points quickly.
  80. # This saves computation time which in turn makes for nice combos when
  81. # the bot speeds around throwing blocks on top of each other.
  82. if prev_colsize < DEFRAG_PRIO:
  83. return -points, delay
  84. holes = self.holes()
  85. frag = self.fragmentation()
  86. # When rows start stacking up, start defragmenting colors to make
  87. # opportunities for scoring points.
  88. if prev_colsize < COLSIZE_PRIO:
  89. return -points, frag, holes, delay
  90. # When they stack higher, start moving blocks down into holes before
  91. # continuing to defragment.
  92. if prev_colsize < COLSIZE_PANIC:
  93. panic = int(self.causes_panic())
  94. return panic, -points, holes, frag, delay
  95. # Column heights are getting out of hand, just move shit DOWN.
  96. return holes, delay, -points, frag
  97. def solutions(self):
  98. for moves in self.gen_moves():
  99. try:
  100. yield Solution(self, moves)
  101. except AssertionError:
  102. pass
  103. def colskip(self, col):
  104. nrows = self.nrows()
  105. for row in range(nrows):
  106. if self.blocks[row * COLUMNS + col] != NOBLOCK:
  107. return row
  108. return nrows
  109. def find_unmovable_blocks(self):
  110. unmoveable = set()
  111. bombed = set()
  112. for block, group in self.find_groups():
  113. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  114. for i in group:
  115. unmoveable.add(i)
  116. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  117. bombed.add(bomb_to_basic(block))
  118. for i in group:
  119. unmoveable.add(i)
  120. for i, block in enumerate(self.blocks):
  121. if block in bombed:
  122. unmoveable.add(i)
  123. return unmoveable
  124. def simulate(self, moves):
  125. s = self.copy()
  126. #points = 0
  127. # avoid swapping/grabbing currently exploding items
  128. #unmoveable = s.find_unmovable_blocks()
  129. for move in moves:
  130. if move == LEFT:
  131. assert s.exa > 0
  132. s.exa -= 1
  133. elif move == RIGHT:
  134. assert s.exa < COLUMNS - 1
  135. s.exa += 1
  136. elif move == GRAB:
  137. assert s.held == NOBLOCK
  138. row = s.colskip(s.exa)
  139. assert row < s.nrows()
  140. i = row * COLUMNS + s.exa
  141. #assert i not in unmoveable
  142. s.held = s.blocks[i]
  143. s.blocks[i] = NOBLOCK
  144. elif move == DROP:
  145. assert s.held != NOBLOCK
  146. row = s.colskip(s.exa)
  147. assert row > 0
  148. i = row * COLUMNS + s.exa
  149. s.blocks[i - COLUMNS] = s.held
  150. s.held = NOBLOCK
  151. #points += s.remove_blocks()
  152. elif move == SWAP:
  153. row = s.colskip(s.exa)
  154. assert row < s.nrows() - 2
  155. i = row * COLUMNS + s.exa
  156. j = i + COLUMNS
  157. #assert i not in unmoveable
  158. #assert j not in unmoveable
  159. s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i]
  160. #points += s.remove_blocks()
  161. if moves and self.max_colsize() < COLSIZE_MAX:
  162. assert s.max_colsize() <= COLSIZE_MAX
  163. points = s.remove_blocks()
  164. return points, s
  165. def find_groups(self, depth=POINTS_DEPTH, minsize=2):
  166. def follow_group(i, block, group):
  167. if self.blocks[i] == block and i not in visited:
  168. group.append(i)
  169. visited.add(i)
  170. for nb in self.neighbors(i):
  171. follow_group(nb, block, group)
  172. visited = set()
  173. for col in self.iter_columns():
  174. for i in islice(col, depth):
  175. block = self.blocks[i]
  176. group = []
  177. follow_group(i, block, group)
  178. if len(group) >= minsize:
  179. yield block, group
  180. def neighbors(self, i):
  181. row, col = divmod(i, COLUMNS)
  182. if col > 0 and self.blocks[i - 1] != NOBLOCK:
  183. yield i - 1
  184. if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK:
  185. yield i + 1
  186. if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK:
  187. yield i - COLUMNS
  188. if row < self.nrows() - 1 and self.blocks[i + COLUMNS] != NOBLOCK:
  189. yield i + COLUMNS
  190. def fragmentation(self, depth=FRAG_DEPTH):
  191. """
  192. Minimize the sum of dist(i,j) between all blocks i,j of the same color.
  193. Magnify vertical distances to avoid column stacking.
  194. """
  195. def dist(i, j):
  196. yi, xi = divmod(i, COLUMNS)
  197. yj, xj = divmod(j, COLUMNS)
  198. # for blocks in the same group, only count vertical distance so
  199. # that groups are spread out horizontally
  200. if groups[i] == groups[j]:
  201. return abs(yj - yi)
  202. return abs(xj - xi) + abs(yj - yi) * 2 - 1
  203. colors = {}
  204. groups = {}
  205. groupsizes = {}
  206. for groupid, (block, group) in enumerate(self.find_groups(depth, 1)):
  207. colors.setdefault(block, []).extend(group)
  208. for i in group:
  209. groups[i] = groupid
  210. groupsizes[i] = len(group)
  211. return sum(dist(i, j) # * (1 + 2 * is_bomb(block))
  212. for block, color in colors.items()
  213. for i, j in combinations(color, 2))
  214. def remove_blocks(self):
  215. removed = 0
  216. for block, group in self.find_groups():
  217. if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  218. removed += len(group)
  219. elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  220. removed += BOMB_POINTS
  221. return removed
  222. #def remove_blocks(self):
  223. # remove = []
  224. # for block, group in self.find_groups():
  225. # if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
  226. # remove.extend(group)
  227. # elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
  228. # remove.extend(group)
  229. # remove.extend(i for i, other in enumerate(self.blocks)
  230. # if other == bomb_to_basic(block))
  231. # remove.sort()
  232. # removed = 0
  233. # prev = None
  234. # for i in remove:
  235. # if i != prev:
  236. # while self.blocks[i] != NOBLOCK:
  237. # self.blocks[i] = self.blocks[i - COLUMNS]
  238. # i -= COLUMNS
  239. # removed += 1
  240. # prev = i
  241. # if removed:
  242. # removed += self.remove_blocks()
  243. # return removed
  244. def has_explosion(self):
  245. return any(is_bomb(block) and
  246. any(self.blocks[j] == block for j in self.neighbors(i))
  247. for i, block in enumerate(self.blocks))
  248. def gen_moves(self):
  249. yield ()
  250. def make_move(diff):
  251. direction = RIGHT if diff > 0 else LEFT
  252. return abs(diff) * (direction,)
  253. ignore_exa_column = self.grabbing_or_dropping()
  254. for src in range(COLUMNS):
  255. mov1 = make_move(src - self.exa)
  256. if mov1 or not ignore_exa_column:
  257. yield mov1 + (SWAP,)
  258. yield mov1 + (GRAB, SWAP, DROP)
  259. yield mov1 + (SWAP, GRAB, SWAP, DROP)
  260. for dst in range(COLUMNS):
  261. if dst != src:
  262. mov2 = make_move(dst - src)
  263. for get in GET:
  264. for put in PUT:
  265. yield mov1 + get + mov2 + put
  266. def solve(self):
  267. assert self.exa is not None
  268. if self.held != NOBLOCK:
  269. return Solution(self, (DROP,))
  270. if self.nrows() < MIN_ROWS:
  271. return Solution(self, ())
  272. if self.has_explosion():
  273. return Solution(self, ())
  274. return min(self.solutions())
  275. def print(self):
  276. print_board(self.blocks, self.exa, self.held)
  277. def tostring(self):
  278. stream = io.StringIO()
  279. with redirect_stdout(stream):
  280. self.print()
  281. return stream.getvalue()
  282. def nrows(self):
  283. return len(self.blocks) // COLUMNS
  284. def has_same_exa(self, state):
  285. return self.exa == state.exa and self.held == state.held
  286. class Solution:
  287. def __init__(self, state, moves):
  288. self.state = state
  289. self.moves = moves
  290. points, self.newstate = state.simulate(moves)
  291. self.score = self.newstate.score(points, moves, state)
  292. def __lt__(self, other):
  293. return self.score < other.score
  294. def loops(self, prev_prev):
  295. return self.moves and \
  296. self.state.exa == prev_prev.state.exa and \
  297. self.moves == prev_prev.moves and \
  298. self.score == prev_prev.score
  299. def delay(self):
  300. return moves_delay(self.moves)
  301. def keys(self):
  302. return moves_to_keys(self.moves)
  303. def move_to_key(move):
  304. return 'jjkadl'[move]
  305. def moves_to_keys(moves):
  306. return ''.join(move_to_key(move) for move in moves)
  307. def moves_delay(moves):
  308. return sum(MOVE_DELAYS[m] for m in moves)
  309. if __name__ == '__main__':
  310. import sys
  311. from PIL import Image
  312. board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
  313. state = State.detect(board)
  314. print('parsed:')
  315. state.print()
  316. print()
  317. start = time.time()
  318. solution = state.solve()
  319. end = time.time()
  320. print('best moves:', solution.keys())
  321. print('elapsed:', round((end - start) * 1000, 1), 'ms')
  322. print()
  323. print('target after moves:')
  324. solution.newstate.print()
  325. print()
  326. for solution in sorted(state.solutions()):
  327. print('move %18s:' % solution.keys(), solution.score)