import io import time from contextlib import redirect_stdout from itertools import combinations, islice from parse import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \ detect_held, print_board, is_basic, is_bomb, bomb_to_basic GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6) GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB)) PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP)) MIN_BASIC_GROUP_SIZE = 4 MIN_BOMB_GROUP_SIZE = 2 FIND_GROUPS_DEPTH = 3 FRAG_DEPTH = 3 DEFRAG_PRIO = 3 COLSIZE_PRIO = 5 COLSIZE_PANIC = 7 COLSIZE_MAX = 8 BOMB_POINTS = 1 MIN_ROWS = 2 MAX_SPEED_ROWS = 3 class State: def __init__(self, blocks, exa, held): self.blocks = blocks self.exa = exa self.held = held def grabbing_of_dropping(self): skip = self.colskip(self.exa) i = (skip + 1) * COLUMNS + self.exa return i < len(self.blocks) and self.blocks[i] == NOBLOCK def iter_columns(self): nrows = self.nrows() def gen_col(col): for row in range(nrows): i = row * COLUMNS + col if self.blocks[i] != NOBLOCK: yield i for col in range(COLUMNS): yield gen_col(col) @classmethod def detect(cls, board, pad=2): blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board)) exa = detect_exa(board) held = detect_held(board, exa) return cls(blocks, exa, held) def copy(self): return State(list(self.blocks), self.exa, self.held) def colsizes(self): for col in range(COLUMNS): yield self.nrows() - self.colskip(col) def colsize_panic(self): return int(max(self.colsizes()) >= COLSIZE_PANIC) def empty_column_score(self): skip = 0 for i, block in enumerate(self.blocks): if block != NOBLOCK: skip = i // COLUMNS break nrows = self.nrows() score = 0 for col in range(COLUMNS): for row in range(skip, nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: break score += row - skip + 1 return score def score(self, points, moves, prev): prev_colsize = max(prev.colsizes()) points = self.score_points() if prev_colsize >= COLSIZE_PANIC: colsize = self.empty_column_score() #frag = self.fragmentation() return colsize, len(moves), -points #, frag elif prev_colsize >= DEFRAG_PRIO: colsize = self.empty_column_score() frag = self.fragmentation() panic = self.colsize_panic() return -points, panic, frag, colsize, len(moves) elif prev_colsize >= COLSIZE_PRIO: colsize = self.empty_column_score() return -points, colsize, len(moves) else: return -points, len(moves) def score_moves(self): for moves in self.gen_moves(): try: points, newstate = self.simulate(moves) yield newstate.score(points, moves, self), moves except AssertionError: pass def colskip(self, col): nrows = self.nrows() for row in range(nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: return row return nrows def find_unmovable_blocks(self): unmoveable = set() bombed = set() for block, group in self.find_groups(): if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: for i in group: unmoveable.add(i) elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: bombed.add(bomb_to_basic(block)) for i in group: unmoveable.add(i) for i, block in enumerate(self.blocks): if block in bombed: unmoveable.add(i) return unmoveable def simulate(self, moves): s = self.copy() points = 0 # avoid swapping/grabbing currently exploding items #unmoveable = s.find_unmovable_blocks() for move in moves: if move == LEFT: assert s.exa > 0 s.exa -= 1 elif move == RIGHT: assert s.exa < COLUMNS - 1 s.exa += 1 elif move == GRAB: assert s.held == NOBLOCK row = s.colskip(s.exa) assert row < s.nrows() i = row * COLUMNS + s.exa #assert i not in unmoveable s.held = s.blocks[i] s.blocks[i] = NOBLOCK elif move == DROP: assert s.held != NOBLOCK row = s.colskip(s.exa) assert row > 0 i = row * COLUMNS + s.exa s.blocks[i - COLUMNS] = s.held s.held = NOBLOCK #points += s.score_points() elif move == SWAP: row = s.colskip(s.exa) assert row < s.nrows() - 2 i = row * COLUMNS + s.exa j = i + COLUMNS #assert i not in unmoveable #assert j not in unmoveable s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i] #points += s.score_points() if moves and max(self.colsizes()) < COLSIZE_MAX: assert max(s.colsizes()) <= COLSIZE_MAX return points, s def find_groups(self, depth=FIND_GROUPS_DEPTH, minsize=2): def follow_group(i, block, group): if self.blocks[i] == block and i not in visited: group.append(i) visited.add(i) for nb in self.neighbors(i): follow_group(nb, block, group) visited = set() for col in self.iter_columns(): for i in islice(col, depth): block = self.blocks[i] group = [] follow_group(i, block, group) if len(group) >= minsize: yield block, group def neighbors(self, i): def gen_indices(): row, col = divmod(i, COLUMNS) if col > 0: yield i - 1 if col < COLUMNS - 1: yield i + 1 if row > 0: yield i - COLUMNS if row < self.nrows() - 1: yield i + COLUMNS for j in gen_indices(): if self.blocks[j] != NOBLOCK: yield j def fragmentation(self, depth=FRAG_DEPTH): """ Minimize the sum of dist(i,j) between all blocks i,j of the same color. Magnify vertical distances to avoid column stacking. """ def dist(i, j): yi, xi = divmod(i, COLUMNS) yj, xj = divmod(j, COLUMNS) # for blocks in the same group, only count vertical distance so that # groups are spread out horizontally if groups[i] == groups[j]: return abs(yj - yi) return abs(xj - xi) + abs(yj - yi) * 2 - 1 colors = {} groups = {} groupsizes = {} for groupid, (block, group) in enumerate(self.find_groups(depth, 1)): colors.setdefault(block, []).extend(group) for i in group: groups[i] = groupid groupsizes[i] = len(group) return sum(dist(i, j) # * (1 + 2 * is_bomb(block)) for block, color in colors.items() for i, j in combinations(color, 2)) def score_points(self, multiplier=1): #remove = [] points = 0 for block, group in self.find_groups(): if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: #remove.extend(group) points += len(group) * multiplier elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: points += BOMB_POINTS #remove.extend(group) #for i, other in enumerate(self.blocks): # if other == bomb_to_basic(block): # remove.append(i) #remove.sort() #prev = None #for i in remove: # if i != prev: # while self.blocks[i] != NOBLOCK: # self.blocks[i] = self.blocks[i - COLUMNS] # i -= COLUMNS # prev = i #if points: # points += self.score_points(min(2, multiplier * 2)) return points def has_explosion(self): return any(is_bomb(block) and any(self.blocks[j] == block for j in self.neighbors(i)) for i, block in enumerate(self.blocks)) def gen_moves(self): yield () def make_move(diff): direction = RIGHT if diff > 0 else LEFT return abs(diff) * (direction,) for src in range(COLUMNS): mov1 = make_move(src - self.exa) yield mov1 + (SWAP,) yield mov1 + (GRAB, SWAP, DROP) yield mov1 + (SWAP, GRAB, SWAP, DROP) for dst in range(COLUMNS): if dst != src: mov2 = make_move(dst - src) for get in GET: for put in PUT: yield mov1 + get + mov2 + put def solve(self): assert self.exa is not None if self.held != NOBLOCK: return (DROP,) if self.nrows() < MIN_ROWS: return () if self.grabbing_of_dropping(): return () if self.has_explosion(): return () score, moves = min(self.score_moves()) if not moves and max(self.colsizes()) <= MAX_SPEED_ROWS: return (SPEED,) return moves def print(self): print_board(self.blocks, self.exa, self.held) def tostring(self): stream = io.StringIO() with redirect_stdout(stream): self.print() return stream.getvalue() def nrows(self): return len(self.blocks) // COLUMNS def moves_to_keys(moves): return ''.join('jjkadl'[move] for move in moves) if __name__ == '__main__': import sys from PIL import Image board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV') state = State.detect(board) print('parsed:') state.print() print() print('empty cols:', state.empty_column_score()) print() start = time.time() moves = state.solve() end = time.time() print('moves:', moves_to_keys(moves)) print('elapsed:', end - start) print() print('target after moves:') points, newstate = state.simulate(moves) newstate.print() print() for score, moves in sorted(state.score_moves()): print('move %18s:' % moves_to_keys(moves), score)