import io import time from collections import deque from contextlib import redirect_stdout from copy import copy from itertools import combinations, islice from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \ detect_held, print_board, is_basic, is_bomb GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6) MOVE_DELAYS = ( # in milliseconds 50, # GRAB 50, # DROP 50, # SWAP 30, # LEFT 30, # RIGHT 30, # SPEED ) MIN_BASIC_GROUP_SIZE = 4 MIN_BOMB_GROUP_SIZE = 2 POINTS_DEPTH = 3 FRAG_DEPTH = 4 DEFRAG_PRIO = 4 COLSIZE_PRIO = 5 COLSIZE_PANIC = 8 COLSIZE_MAX = 9 BOMB_POINTS = 5 class State: def __init__(self, blocks, exa, held, colskip, busy, moves, placed, grabbed): self.blocks = blocks self.exa = exa self.held = held self.colskip = colskip self.busy = busy self.moves = moves self.placed = placed self.grabbed = grabbed self.nrows = len(blocks) // COLUMNS @classmethod def detect(cls, board, pad=2): blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board)) exa = detect_exa(board) held = detect_held(board, exa) colskip = get_colskip(blocks) busy = get_busy(blocks, colskip) return cls(blocks, exa, held, colskip, busy, (), set(), {}) def copy(self, deep): mcopy = copy if deep else lambda x: x return self.__class__(mcopy(self.blocks), self.exa, self.held, mcopy(self.colskip), self.busy, self.moves, mcopy(self.placed), mcopy(self.grabbed)) def colbusy(self, col): return (self.busy >> col) & 1 def grabbing_or_dropping(self): skip = self.colskip[self.exa] i = (skip + 1) * COLUMNS + self.exa return i < len(self.blocks) and self.blocks[i] == NOBLOCK def iter_columns(self): def gen_col(col): for row in range(self.nrows): i = row * COLUMNS + col if self.blocks[i] != NOBLOCK: yield i for col in range(COLUMNS): yield gen_col(col) def causes_panic(self): return self.max_colsize() >= COLSIZE_PANIC def max_colsize(self): return self.nrows - self.empty_rows() def empty_rows(self): for i, block in enumerate(self.blocks): if block != NOBLOCK: return i // COLUMNS return 0 def holes(self): start_row = self.empty_rows() score = 0 for col in range(COLUMNS): for row in range(start_row, self.nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: break score += row - start_row + 1 return score def colrows(self, col): return self.nrows - self.colskip[col] def move(self, *moves): deep = any(move in (GRAB, DROP, SWAP) for move in moves) s = self.copy(deep) s.moves += moves for move in moves: if move == LEFT: assert s.exa > 0 s.exa -= 1 elif move == RIGHT: assert s.exa < COLUMNS - 1 s.exa += 1 elif move == GRAB: assert not s.colbusy(s.exa) assert s.held == NOBLOCK row = s.colskip[s.exa] assert row < s.nrows i = row * COLUMNS + s.exa s.grabbed[i] = s.held = s.blocks[i] s.blocks[i] = NOBLOCK s.grabbed[i] = s.held s.colskip[s.exa] += 1 elif move == DROP: assert not s.colbusy(s.exa) assert s.held != NOBLOCK row = s.colskip[s.exa] assert row > 0 # XXX assert s.nrows - row < COLSIZE_MAX i = (row - 1) * COLUMNS + s.exa s.blocks[i] = s.held s.held = NOBLOCK s.placed.add(i) s.colskip[s.exa] -= 1 elif move == SWAP: assert not s.colbusy(s.exa) row = s.colskip[s.exa] i = row * COLUMNS + s.exa j = i + COLUMNS assert j < len(s.blocks) bi = s.blocks[i] bj = s.blocks[j] if bi != bj: s.blocks[i] = bj s.blocks[j] = bi s.grabbed[i] = bi s.grabbed[j] = bj s.placed.add(i) s.placed.add(j) return s def find_groups(self, depth=POINTS_DEPTH, minsize=2): def follow_group(i, block, group): if self.blocks[i] == block and i not in visited: group.append(i) visited.add(i) for nb in self.neighbors(i): follow_group(nb, block, group) visited = set() for col in self.iter_columns(): for i in islice(col, depth): block = self.blocks[i] group = [] follow_group(i, block, group) if len(group) >= minsize: yield block, group def neighbors(self, i): row, col = divmod(i, COLUMNS) if col > 0 and self.blocks[i - 1] != NOBLOCK: yield i - 1 if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK: yield i + 1 if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK: yield i - COLUMNS if row < self.nrows - 1 and self.blocks[i + COLUMNS] != NOBLOCK: yield i + COLUMNS def fragmentation(self, depth=FRAG_DEPTH): """ Minimize the sum of dist(i,j) between all blocks i,j of the same color. Magnify vertical distances to avoid column stacking. """ def dist(i, j): yi, xi = divmod(i, COLUMNS) yj, xj = divmod(j, COLUMNS) # for blocks in the same group, only count vertical distance so # that groups are spread out horizontally if groups[i] == groups[j]: return abs(yj - yi) return abs(xj - xi) + abs(yj - yi) * 2 - 1 colors = {} groups = {} groupsizes = {} for groupid, (block, group) in enumerate(self.find_groups(depth, 1)): colors.setdefault(block, []).extend(group) for i in group: groups[i] = groupid groupsizes[i] = len(group) return sum(dist(i, j) for block, color in colors.items() for i, j in combinations(color, 2)) def points(self): def group_size(start): work = [start] visited.add(start) size = 0 block = self.blocks[start] while work: i = work.pop() # avoid giving points to moving a block within the same group if self.grabbed.get(i, None) == block: return 0 if self.blocks[i] == block: size += 1 for nb in self.neighbors(i): if nb not in visited: visited.add(nb) work.append(nb) return size points = 0 visited = set() for i in self.placed: if i not in visited: block = self.blocks[i] size = group_size(i) if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE: points += size elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE: points += BOMB_POINTS return -points def gen_moves(self): yield self for src in self.gen_shift(not self.grabbing_or_dropping()): yield from src.gen_stationary() for get in src.gen_get(): for dst in get.gen_shift(False): yield from dst.gen_put() def gen_shift(self, allow_noshift): if allow_noshift: yield self left = self for i in range(self.exa): left = left.move(LEFT) yield left right = self for i in range(COLUMNS - self.exa - 1): right = right.move(RIGHT) yield right def gen_stationary(self): # SWAP # GRAB, SWAP, DROP # GRAB, SWAP, DROP, SWAP # SWAP, GRAB, SWAP, DROP # SWAP, GRAB, SWAP, DROP, SWAP if not self.colbusy(self.exa): avail = self.colrows(self.exa) if avail >= 2: swap = self.move(SWAP) yield swap if avail >= 3: grab = self.move(GRAB, SWAP, DROP) yield grab yield grab.move(SWAP) swap = swap.move(GRAB, SWAP, DROP) yield swap yield swap.move(SWAP) def gen_get(self): # GRAB # SWAP, GRAB # GRAB, SWAP, DROP, SWAP, GRAB if not self.colbusy(self.exa): avail = self.colrows(self.exa) if avail >= 1: grab = self.move(GRAB) yield grab if avail >= 2: yield self.move(SWAP, GRAB) if avail >= 3: yield grab.move(SWAP, DROP, SWAP, GRAB) def gen_put(self): # DROP # DROP, SWAP # DROP, SWAP, GRAB, SWAP, DROP if not self.colbusy(self.exa): avail = self.colrows(self.exa) drop = self.move(DROP) yield drop if avail >= 1: swap = drop.move(SWAP) yield swap if avail >= 2: yield swap.move(GRAB, SWAP, DROP) def force(self, *moves): state = self.move(*moves) state.score = () return state def solve(self): assert self.exa is not None if self.held != NOBLOCK: return self.force(DROP) pool = deque(self.gen_moves()) if len(pool) == 0: return self.force() best_score = () for key in self.score_keys(): if len(pool) == 1: break for state in pool: state.score = key(state) best = min(state.score for state in pool) best_score += (best,) for i in range(len(pool)): state = pool.popleft() if state.score == best: pool.append(state) best = pool.popleft() best.score = best_score return best def score_keys(self): cls = self.__class__ colsize = self.nrows - 2 if colsize >= COLSIZE_PANIC: return cls.holes, cls.nmoves, cls.points, cls.fragmentation if colsize >= COLSIZE_PRIO: return cls.causes_panic, cls.points, cls.holes, \ cls.fragmentation, cls.nmoves return cls.points, cls.fragmentation, cls.holes, cls.nmoves def print(self): print_board(self.blocks, self.exa, self.held) def tostring(self): stream = io.StringIO() with redirect_stdout(stream): self.print() return stream.getvalue() def has_same_exa(self, state): return self.exa == state.exa and self.held == state.held def nmoves(self): return len(self.moves) def delay(self): return moves_delay(self.moves) def keys(self): return moves_to_keys(self.moves) def loops(self, prev): return self.moves and \ self.exa == prev.exa and \ self.moves == prev.moves and \ self.score == prev.score def get_colskip(blocks): def colskip(col): for row, block in enumerate(blocks[col::COLUMNS]): if block != NOBLOCK: return row return len(blocks) // COLUMNS return list(map(colskip, range(COLUMNS))) def get_busy(blocks, colskip): mask = 0 for col, skip in enumerate(colskip): start = (skip + 1) * COLUMNS + col colbusy = NOBLOCK in blocks[start::COLUMNS] mask |= colbusy << col return mask def move_to_key(move): return 'jjkadl'[move] def moves_to_keys(moves): return ''.join(move_to_key(move) for move in moves) def moves_delay(moves): return sum(MOVE_DELAYS[m] for m in moves) if __name__ == '__main__': import sys from PIL import Image board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV') state = State.detect(board) print('parsed:') state.print() print() start = time.time() newstate = state.solve() end = time.time() print('best move:', newstate.keys()) print('score:', newstate.score) print('elapsed:', round((end - start) * 1000, 1), 'ms') print() print('target after move:') newstate.print() #print() #print('generated moves:') #for state in state.gen_moves(): # print(state.keys())