||
- import time
- from collections import deque
- from itertools import islice
- from parser import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
- detect_held, print_board, is_basic, is_bomb, bomb_to_basic
- GRAB, DROP, SWAP, LEFT, RIGHT = range(5)
- GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
- PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
- #REVERSE = [DROP, GRAB, SWAP, RIGHT, LEFT]
- MIN_BASIC_GROUP_SIZE = 4
- MIN_BOMB_GROUP_SIZE = 2
- FIND_GROUPS_DEPTH = 20
- FRAG_DEPTH = 20
- COLSIZE_PRIO = 5
- class State:
- def __init__(self, blocks, exa, held):
- assert exa is not None
- self.blocks = blocks
- self.exa = exa
- self.held = held
- def has_holes(self):
- return any(self.blocks[i] == NOBLOCK
- for col in self.iter_columns()
- for i in col)
- def iter_columns(self):
- nrows = self.nrows()
- def gen_col(col):
- for row in range(nrows):
- i = row * COLUMNS + col
- if self.blocks[i] != NOBLOCK:
- yield i
- for col in range(COLUMNS):
- yield gen_col(col)
- @classmethod
- def detect(cls, board):
- blocks = [NOBLOCK] * (COLUMNS * 2) + list(detect_blocks(board))
- exa = detect_exa(board)
- held = detect_held(board, exa)
- return cls(blocks, exa, held)
- def copy(self):
- return State(list(self.blocks), self.exa, self.held)
- def colsizes(self):
- for col in range(COLUMNS):
- yield self.nrows() - self.colskip(col)
- def score(self, points, moves, prev):
- frag = self.fragmentation()
- colsizes = list(self.colsizes())
- mincol = min(colsizes)
- maxcol = max(colsizes)
- colsize_score = maxcol, colsizes.count(maxcol), -mincol
- #colsize_score = tuple(sorted(colsizes, reverse=True))
- #if prev.nrows() >= 6:
- # return colsize_score, -points, frag, len(moves)
- prev_colsize = max(prev.colsizes())
- if prev_colsize >= COLSIZE_PRIO:
- return colsize_score, frag, -points, len(moves)
- else:
- return -points, frag, colsize_score, len(moves)
- def score_moves(self):
- # clear exploding blocks before computing colsize
- prev = self.copy()
- prev.score_points()
- for moves in self.gen_moves():
- try:
- points, newstate = self.simulate(moves)
- yield newstate.score(points, moves, prev), moves
- except AssertionError:
- pass
- def colskip(self, col):
- nrows = self.nrows()
- for row in range(nrows):
- if self.blocks[row * COLUMNS + col] != NOBLOCK:
- return row
- return nrows
- def find_unmovable_blocks(self):
- unmoveable = set()
- bombed = set()
- for block, group in self.find_groups():
- if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
- for i in group:
- unmoveable.add(i)
- elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
- bombed.add(bomb_to_basic(block))
- for i in group:
- unmoveable.add(i)
- for i, block in enumerate(self.blocks):
- if block in bombed:
- unmoveable.add(i)
- return unmoveable
- def simulate(self, moves):
- s = self.copy()
- points = 0
- # clear the current board before planning the next move
- #s.score_points()
- if not moves:
- return s.score_points(), s
- # avoid swapping/grabbing currently exploding items
- unmoveable = s.find_unmovable_blocks()
- for move in moves:
- if move == LEFT:
- assert s.exa > 0
- s.exa -= 1
- elif move == RIGHT:
- assert s.exa < COLUMNS - 1
- s.exa += 1
- elif move == GRAB:
- assert s.held == NOBLOCK
- row = s.colskip(s.exa)
- assert row < s.nrows()
- i = row * COLUMNS + s.exa
- assert i not in unmoveable
- s.held = s.blocks[i]
- s.blocks[i] = NOBLOCK
- elif move == DROP:
- assert s.held != NOBLOCK
- row = s.colskip(s.exa)
- assert row > 0
- i = row * COLUMNS + s.exa
- s.blocks[i - COLUMNS] = s.held
- s.held = NOBLOCK
- points += s.score_points()
- elif move == SWAP:
- row = s.colskip(s.exa)
- assert row < s.nrows() - 2
- i = row * COLUMNS + s.exa
- j = i + COLUMNS
- assert i not in unmoveable
- assert j not in unmoveable
- s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i]
- points += s.score_points()
- return points, s
- def find_groups(self, depth=FIND_GROUPS_DEPTH):
- def follow_group(i, block, group):
- if self.blocks[i] == block and i not in visited:
- group.append(i)
- visited.add(i)
- for nb in self.neighbors(i):
- follow_group(nb, block, group)
- visited = set()
- for col in self.iter_columns():
- for i in islice(col, depth):
- block = self.blocks[i]
- group = []
- follow_group(i, block, group)
- if len(group) > 1:
- #for j in group:
- # visited.add(j)
- yield block, group
- def neighbors(self, i):
- def gen_indices():
- row, col = divmod(i, COLUMNS)
- if col > 0:
- yield i - 1
- if col < COLUMNS - 1:
- yield i + 1
- if row > 0:
- yield i - COLUMNS
- if row < self.nrows() - 1:
- yield i + COLUMNS
- for j in gen_indices():
- if self.blocks[j] != NOBLOCK:
- yield j
- def fragmentation(self, depth=FRAG_DEPTH):
- """
- Sum the minimum distance from every block in the first 3 layers to the
- closest block of the same color.
- """
- def find_closest(i):
- block = self.blocks[i]
- work = deque([(i, -1)])
- visited = {i}
- while work:
- i, dist = work.popleft()
- if dist >= 0 and self.blocks[i] == block:
- return dist
- for j in self.neighbors(i):
- if j not in visited:
- visited.add(j)
- work.append((j, dist + 1))
- # only one of this type -> don't count as fragmented
- return 0
- return sum(find_closest(i)
- for col in self.iter_columns()
- for i in islice(col, depth))
- def score_points(self, multiplier=1):
- remove = []
- points = 0
- for block, group in self.find_groups():
- if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
- remove.extend(group)
- points += len(group) * multiplier
- elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
- #points += 10
- remove.extend(group)
- for i, other in enumerate(self.blocks):
- if other == bomb_to_basic(block):
- remove.append(i)
- remove.sort()
- prev = None
- for i in remove:
- if i != prev:
- while self.blocks[i] != NOBLOCK:
- self.blocks[i] = self.blocks[i - COLUMNS]
- i -= COLUMNS
- prev = i
- if points:
- points += self.score_points(min(2, multiplier * 2))
- return points
- def gen_moves(self):
- yield ()
- for src in range(COLUMNS):
- diff = src - self.exa
- direction = RIGHT if diff > 0 else LEFT
- mov1 = abs(diff) * (direction,)
- yield mov1 + (SWAP,)
- yield mov1 + (GRAB, SWAP, DROP)
- for dst in range(COLUMNS):
- diff = dst - src
- direction = RIGHT if diff > 0 else LEFT
- mov2 = abs(diff) * (direction,)
- for i, get in enumerate(GET):
- if i > 1 or diff != 0:
- for put in PUT:
- yield mov1 + get + mov2 + put
- def solve(self):
- if self.held != NOBLOCK:
- return (DROP,)
- score, moves = min(self.score_moves())
- return moves
- def print(self):
- print_board(self.blocks, self.exa, self.held)
- def nrows(self):
- return len(self.blocks) // COLUMNS
- def moves_to_keys(moves):
- return ''.join('jjkad'[move] for move in moves)
- if __name__ == '__main__':
- import sys
- from PIL import Image
- #from pprint import pprint
- board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
- state = State.detect(board)
- print('parsed:')
- state.print()
- print()
- start = time.time()
- moves = state.solve()
- print('moves:', moves_to_keys(moves))
- end = time.time()
- print('elapsed:', end - start)
- print()
- print('target after moves:')
- points, newstate = state.simulate(moves)
- newstate.print()
- print()
- #for score, moves in sorted(state.score_moves()):
- # print('move %18s:' % moves_to_keys(moves), score)
- # #print('moves:', moves_to_keys(moves), moves)
- # #print('score:', score)
- #print('\nmoves:', moves_to_keys(state.solve()))
|