import time from collections import deque from itertools import islice from parser import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \ detect_held, print_board, is_basic, is_bomb, bomb_to_basic GRAB, DROP, SWAP, LEFT, RIGHT = range(5) GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB)) PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP)) #REVERSE = [DROP, GRAB, SWAP, RIGHT, LEFT] MIN_BASIC_GROUP_SIZE = 4 MIN_BOMB_GROUP_SIZE = 2 FIND_GROUPS_DEPTH = 20 FRAG_DEPTH = 20 COLSIZE_PRIO = 5 class State: def __init__(self, blocks, exa, held): assert exa is not None self.blocks = blocks self.exa = exa self.held = held def has_holes(self): return any(self.blocks[i] == NOBLOCK for col in self.iter_columns() for i in col) def iter_columns(self): nrows = self.nrows() def gen_col(col): for row in range(nrows): i = row * COLUMNS + col if self.blocks[i] != NOBLOCK: yield i for col in range(COLUMNS): yield gen_col(col) @classmethod def detect(cls, board): blocks = [NOBLOCK] * (COLUMNS * 2) + list(detect_blocks(board)) exa = detect_exa(board) held = detect_held(board, exa) return cls(blocks, exa, held) def copy(self): return State(list(self.blocks), self.exa, self.held) def colsizes(self): for col in range(COLUMNS): yield self.nrows() - self.colskip(col) def score(self, points, moves, prev): frag = self.fragmentation() colsizes = list(self.colsizes()) mincol = min(colsizes) maxcol = max(colsizes) colsize_score = maxcol, colsizes.count(maxcol), -mincol #colsize_score = tuple(sorted(colsizes, reverse=True)) #if prev.nrows() >= 6: # return colsize_score, -points, frag, len(moves) prev_colsize = max(prev.colsizes()) if prev_colsize >= COLSIZE_PRIO: return colsize_score, frag, -points, len(moves) else: return -points, frag, colsize_score, len(moves) def score_moves(self): # clear exploding blocks before computing colsize prev = self.copy() prev.score_points() for moves in self.gen_moves(): try: points, newstate = self.simulate(moves) yield newstate.score(points, moves, prev), moves except AssertionError: pass def colskip(self, col): nrows = self.nrows() for row in range(nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: return row return nrows def find_unmovable_blocks(self): unmoveable = set() bombed = set() for block, group in self.find_groups(): if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: for i in group: unmoveable.add(i) elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: bombed.add(bomb_to_basic(block)) for i in group: unmoveable.add(i) for i, block in enumerate(self.blocks): if block in bombed: unmoveable.add(i) return unmoveable def simulate(self, moves): s = self.copy() points = 0 # clear the current board before planning the next move #s.score_points() if not moves: return s.score_points(), s # avoid swapping/grabbing currently exploding items unmoveable = s.find_unmovable_blocks() for move in moves: if move == LEFT: assert s.exa > 0 s.exa -= 1 elif move == RIGHT: assert s.exa < COLUMNS - 1 s.exa += 1 elif move == GRAB: assert s.held == NOBLOCK row = s.colskip(s.exa) assert row < s.nrows() i = row * COLUMNS + s.exa assert i not in unmoveable s.held = s.blocks[i] s.blocks[i] = NOBLOCK elif move == DROP: assert s.held != NOBLOCK row = s.colskip(s.exa) assert row > 0 i = row * COLUMNS + s.exa s.blocks[i - COLUMNS] = s.held s.held = NOBLOCK points += s.score_points() elif move == SWAP: row = s.colskip(s.exa) assert row < s.nrows() - 2 i = row * COLUMNS + s.exa j = i + COLUMNS assert i not in unmoveable assert j not in unmoveable s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i] points += s.score_points() return points, s def find_groups(self, depth=FIND_GROUPS_DEPTH): def follow_group(i, block, group): if self.blocks[i] == block and i not in visited: group.append(i) visited.add(i) for nb in self.neighbors(i): follow_group(nb, block, group) visited = set() for col in self.iter_columns(): for i in islice(col, depth): block = self.blocks[i] group = [] follow_group(i, block, group) if len(group) > 1: #for j in group: # visited.add(j) yield block, group def neighbors(self, i): def gen_indices(): row, col = divmod(i, COLUMNS) if col > 0: yield i - 1 if col < COLUMNS - 1: yield i + 1 if row > 0: yield i - COLUMNS if row < self.nrows() - 1: yield i + COLUMNS for j in gen_indices(): if self.blocks[j] != NOBLOCK: yield j def fragmentation(self, depth=FRAG_DEPTH): """ Sum the minimum distance from every block in the first 3 layers to the closest block of the same color. """ def find_closest(i): block = self.blocks[i] work = deque([(i, -1)]) visited = {i} while work: i, dist = work.popleft() if dist >= 0 and self.blocks[i] == block: return dist for j in self.neighbors(i): if j not in visited: visited.add(j) work.append((j, dist + 1)) # only one of this type -> don't count as fragmented return 0 return sum(find_closest(i) for col in self.iter_columns() for i in islice(col, depth)) def score_points(self, multiplier=1): remove = [] points = 0 for block, group in self.find_groups(): if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: remove.extend(group) points += len(group) * multiplier elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: #points += 10 remove.extend(group) for i, other in enumerate(self.blocks): if other == bomb_to_basic(block): remove.append(i) remove.sort() prev = None for i in remove: if i != prev: while self.blocks[i] != NOBLOCK: self.blocks[i] = self.blocks[i - COLUMNS] i -= COLUMNS prev = i if points: points += self.score_points(min(2, multiplier * 2)) return points def gen_moves(self): yield () for src in range(COLUMNS): diff = src - self.exa direction = RIGHT if diff > 0 else LEFT mov1 = abs(diff) * (direction,) yield mov1 + (SWAP,) yield mov1 + (GRAB, SWAP, DROP) for dst in range(COLUMNS): diff = dst - src direction = RIGHT if diff > 0 else LEFT mov2 = abs(diff) * (direction,) for i, get in enumerate(GET): if i > 1 or diff != 0: for put in PUT: yield mov1 + get + mov2 + put def solve(self): if self.held != NOBLOCK: return (DROP,) score, moves = min(self.score_moves()) return moves def print(self): print_board(self.blocks, self.exa, self.held) def nrows(self): return len(self.blocks) // COLUMNS def moves_to_keys(moves): return ''.join('jjkad'[move] for move in moves) if __name__ == '__main__': import sys from PIL import Image #from pprint import pprint board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV') state = State.detect(board) print('parsed:') state.print() print() start = time.time() moves = state.solve() print('moves:', moves_to_keys(moves)) end = time.time() print('elapsed:', end - start) print() print('target after moves:') points, newstate = state.simulate(moves) newstate.print() print() #for score, moves in sorted(state.score_moves()): # print('move %18s:' % moves_to_keys(moves), score) # #print('moves:', moves_to_keys(moves), moves) # #print('score:', score) #print('\nmoves:', moves_to_keys(state.solve()))