import io import time from collections import deque from contextlib import redirect_stdout from itertools import combinations, islice from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \ detect_held, print_board, is_basic, is_bomb, \ bomb_to_basic GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6) MOVE_DELAYS = ( # in milliseconds 50, # GRAB 50, # DROP 50, # SWAP 30, # LEFT 30, # RIGHT 30, # SPEED ) GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB)) PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP)) MIN_BASIC_GROUP_SIZE = 4 MIN_BOMB_GROUP_SIZE = 2 POINTS_DEPTH = 3 FRAG_DEPTH = 4 DEFRAG_PRIO = 4 COLSIZE_PRIO = 5 COLSIZE_PANIC = 8 COLSIZE_MAX = 9 BOMB_POINTS = 5 class State: def __init__(self, blocks, exa, held, colskip=None): self.blocks = blocks self.exa = exa self.held = held self.moves = () self.score = () self.nrows = len(self.blocks) // COLUMNS if colskip is None: colskip = [] for col in range(COLUMNS): for row in range(self.nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: colskip.append(row) break else: colskip.append(self.nrows) self.colskip = colskip def grabbing_or_dropping(self): skip = self.colskip[self.exa] i = (skip + 1) * COLUMNS + self.exa return i < len(self.blocks) and self.blocks[i] == NOBLOCK def iter_columns(self): def gen_col(col): for row in range(self.nrows): i = row * COLUMNS + col if self.blocks[i] != NOBLOCK: yield i for col in range(COLUMNS): yield gen_col(col) @classmethod def detect(cls, board, pad=2): blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board)) exa = detect_exa(board) held = detect_held(board, exa) return cls(blocks, exa, held) def copy(self): return State(list(self.blocks), self.exa, self.held, list(self.colskip)) def causes_panic(self): return self.max_colsize() >= COLSIZE_PANIC def max_colsize(self): return self.nrows - self.empty_rows() def empty_rows(self): for i, block in enumerate(self.blocks): if block != NOBLOCK: return i // COLUMNS return 0 def holes(self): start_row = self.empty_rows() score = 0 for col in range(COLUMNS): for row in range(start_row, self.nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: break score += row - start_row + 1 return score def score(self, points, moves, prev): prev_colsize = prev.nrows - 2 #delay = moves_delay(moves) delay = len(moves) # Don't care about defragging for few rows, just score points quickly. # This saves computation time which in turn makes for nice combos when # the bot speeds around throwing blocks on top of each other. if prev_colsize < DEFRAG_PRIO: return -points, delay holes = self.holes() frag = 0 if points else self.fragmentation() # When rows start stacking up, start defragmenting colors to make # opportunities for scoring points. if prev_colsize < COLSIZE_PRIO: return -points, frag, holes, delay # When they stack higher, start moving blocks down into holes before # continuing to defragment. if prev_colsize < COLSIZE_PANIC: panic = int(self.causes_panic()) return panic, -points, holes, frag, delay # Column heights are getting out of hand, just move shit DOWN. return holes, delay, -points, frag def find_unmovable_blocks(self): unmoveable = set() bombed = set() for block, group in self.find_groups(): if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: for i in group: unmoveable.add(i) elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: bombed.add(bomb_to_basic(block)) for i in group: unmoveable.add(i) for i, block in enumerate(self.blocks): if block in bombed: unmoveable.add(i) return unmoveable def move(self, moves): s = self.copy() if moves else self s.moves = moves # avoid swapping/grabbing currently exploding items #unmoveable = s.find_unmovable_blocks() s.placed = set() s.grabbed = {} for move in moves: if move == LEFT: assert s.exa > 0 s.exa -= 1 elif move == RIGHT: assert s.exa < COLUMNS - 1 s.exa += 1 elif move == GRAB: assert s.held == NOBLOCK row = s.colskip[s.exa] assert row < s.nrows i = row * COLUMNS + s.exa #assert i not in unmoveable s.held = s.blocks[i] s.blocks[i] = NOBLOCK s.grabbed[i] = s.held s.colskip[s.exa] += 1 elif move == DROP: assert s.held != NOBLOCK row = s.colskip[s.exa] assert row > 0 i = (row - 1) * COLUMNS + s.exa s.blocks[i] = s.held s.held = NOBLOCK s.placed.add(i) s.colskip[s.exa] -= 1 elif move == SWAP: row = s.colskip[s.exa] i = row * COLUMNS + s.exa j = i + COLUMNS assert j < len(s.blocks) #assert i not in unmoveable #assert j not in unmoveable bi = s.blocks[i] bj = s.blocks[j] if bi != bj: s.blocks[i] = bj s.blocks[j] = bi s.grabbed[i] = bi s.grabbed[j] = bj s.placed.add(i) s.placed.add(j) if moves and self.max_colsize() < COLSIZE_MAX: assert s.max_colsize() <= COLSIZE_MAX return s def find_groups(self, depth=POINTS_DEPTH, minsize=2): def follow_group(i, block, group): if self.blocks[i] == block and i not in visited: group.append(i) visited.add(i) for nb in self.neighbors(i): follow_group(nb, block, group) visited = set() for col in self.iter_columns(): for i in islice(col, depth): block = self.blocks[i] group = [] follow_group(i, block, group) if len(group) >= minsize: yield block, group def neighbors(self, i): row, col = divmod(i, COLUMNS) if col > 0 and self.blocks[i - 1] != NOBLOCK: yield i - 1 if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK: yield i + 1 if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK: yield i - COLUMNS if row < self.nrows - 1 and self.blocks[i + COLUMNS] != NOBLOCK: yield i + COLUMNS def fragmentation(self, depth=FRAG_DEPTH): """ Minimize the sum of dist(i,j) between all blocks i,j of the same color. Magnify vertical distances to avoid column stacking. """ def dist(i, j): yi, xi = divmod(i, COLUMNS) yj, xj = divmod(j, COLUMNS) # for blocks in the same group, only count vertical distance so # that groups are spread out horizontally if groups[i] == groups[j]: return abs(yj - yi) return abs(xj - xi) + abs(yj - yi) * 2 - 1 colors = {} groups = {} groupsizes = {} for groupid, (block, group) in enumerate(self.find_groups(depth, 1)): colors.setdefault(block, []).extend(group) for i in group: groups[i] = groupid groupsizes[i] = len(group) return sum(dist(i, j) for block, color in colors.items() for i, j in combinations(color, 2)) def points(self): def group_size(start): work = [start] visited.add(start) size = 0 block = self.blocks[start] while work: i = work.pop() # avoid giving points to moving a block within the same group if self.grabbed.get(i, None) == block: return 0 if self.blocks[i] == block: size += 1 for nb in self.neighbors(i): if nb not in visited: visited.add(nb) work.append(nb) return size points = 0 visited = set() for i in self.placed: if i not in visited: block = self.blocks[i] size = group_size(i) if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE: points += size elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE: points += BOMB_POINTS return -points def has_explosion(self): return any(is_bomb(block) and any(self.blocks[j] == block for j in self.neighbors(i)) for i, block in enumerate(self.blocks)) def gen_moves(self): yield () def shift_exa(diff): direction = RIGHT if diff > 0 else LEFT return abs(diff) * (direction,) ignore_exa_column = self.grabbing_or_dropping() for src in range(COLUMNS): mov1 = shift_exa(src - self.exa) if mov1 or not ignore_exa_column: yield mov1 + (SWAP,) yield mov1 + (GRAB, SWAP, DROP) yield mov1 + (SWAP, GRAB, SWAP, DROP) yield mov1 + (GRAB, SWAP, DROP, SWAP) yield mov1 + (SWAP, GRAB, SWAP, DROP, SWAP) for dst in range(COLUMNS): if dst != src: mov2 = shift_exa(dst - src) for get in GET: for put in PUT: yield mov1 + get + mov2 + put def gen_valid_moves(self): for moves in self.gen_moves(): try: yield self.move(moves) except AssertionError: pass def solve(self): assert self.exa is not None if self.held != NOBLOCK: return self.move((DROP,)) valid = deque(self.gen_valid_moves()) if len(valid) == 0: return self.move(()) best_score = () for key in self.score_keys(): if len(valid) == 1: break for state in valid: state.score = key(state) best = min(state.score for state in valid) best_score += (best,) for i in range(len(valid)): state = valid.popleft() if state.score == best: valid.append(state) best = valid.popleft() best.score = best_score return best def score_keys(self): cls = self.__class__ colsize = self.nrows - 2 if colsize >= COLSIZE_PANIC: return cls.holes, cls.nmoves, cls.points, cls.fragmentation if colsize >= COLSIZE_PRIO: return cls.causes_panic, cls.points, cls.holes, cls.fragmentation, cls.nmoves return cls.points, cls.fragmentation, cls.holes, cls.nmoves def print(self): print_board(self.blocks, self.exa, self.held) def tostring(self): stream = io.StringIO() with redirect_stdout(stream): self.print() return stream.getvalue() def has_same_exa(self, state): return self.exa == state.exa and self.held == state.held def nmoves(self): return len(self.moves) def delay(self): return moves_delay(self.moves) def keys(self): return moves_to_keys(self.moves) def __lt__(self, other): return self.score < other.score def loops(self, prev): return self.moves and \ self.exa == prev.exa and \ self.moves == prev.moves and \ self.score == prev.score def move_to_key(move): return 'jjkadl'[move] def moves_to_keys(moves): return ''.join(move_to_key(move) for move in moves) def moves_delay(moves): return sum(MOVE_DELAYS[m] for m in moves) if __name__ == '__main__': import sys from PIL import Image board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV') state = State.detect(board) print('parsed:') state.print() print() start = time.time() newstate = state.solve() end = time.time() print('best move:', newstate.keys()) print('score:', newstate.score) print('elapsed:', round((end - start) * 1000, 1), 'ms') print() print('target after move:') newstate.print() print() #for solution in sorted(state.solutions()): # print('move %18s:' % solution.keys(), solution.score)