import io import time from contextlib import redirect_stdout from itertools import combinations, islice from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \ detect_held, print_board, is_basic, is_bomb, \ bomb_to_basic GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6) MOVE_DELAYS = ( # in milliseconds 50, # GRAB 50, # DROP 50, # SWAP 30, # LEFT 30, # RIGHT 30, # SPEED ) GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB)) PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP)) MIN_BASIC_GROUP_SIZE = 4 MIN_BOMB_GROUP_SIZE = 2 POINTS_DEPTH = 3 FRAG_DEPTH = 5 DEFRAG_PRIO = 4 COLSIZE_PRIO = 5 COLSIZE_PANIC = 8 COLSIZE_MAX = 9 BOMB_POINTS = 1 MIN_ROWS = 2 class State: def __init__(self, blocks, exa, held): self.blocks = blocks self.exa = exa self.held = held def grabbing_or_dropping(self): skip = self.colskip(self.exa) i = (skip + 1) * COLUMNS + self.exa return i < len(self.blocks) and self.blocks[i] == NOBLOCK def iter_columns(self): nrows = self.nrows() def gen_col(col): for row in range(nrows): i = row * COLUMNS + col if self.blocks[i] != NOBLOCK: yield i for col in range(COLUMNS): yield gen_col(col) @classmethod def detect(cls, board, pad=2): blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board)) exa = detect_exa(board) held = detect_held(board, exa) return cls(blocks, exa, held) def copy(self): return State(list(self.blocks), self.exa, self.held) def causes_panic(self): return self.max_colsize() >= COLSIZE_PANIC def max_colsize(self): return self.nrows() - self.empty_rows() def empty_rows(self): for i, block in enumerate(self.blocks): if block != NOBLOCK: return i // COLUMNS return 0 def holes(self): start_row = self.empty_rows() total_rows = self.nrows() score = 0 for col in range(COLUMNS): for row in range(start_row, total_rows): if self.blocks[row * COLUMNS + col] != NOBLOCK: break score += row - start_row + 1 return score def score(self, points, moves, prev): prev_colsize = prev.nrows() - 2 #delay = moves_delay(moves) delay = len(moves) # Don't care about defragging for few rows, just score points quickly. # This saves computation time which in turn makes for nice combos when # the bot speeds around throwing blocks on top of each other. if prev_colsize < DEFRAG_PRIO: return -points, delay holes = self.holes() frag = self.fragmentation() # When rows start stacking up, start defragmenting colors to make # opportunities for scoring points. if prev_colsize < COLSIZE_PRIO: return -points, frag, holes, delay # When they stack higher, start moving blocks down into holes before # continuing to defragment. if prev_colsize < COLSIZE_PANIC: panic = int(self.causes_panic()) return panic, -points, holes, frag, delay # Column heights are getting out of hand, just move shit DOWN. return holes, delay, -points, frag def solutions(self): for moves in self.gen_moves(): try: yield Solution(self, moves) except AssertionError: pass def colskip(self, col): nrows = self.nrows() for row in range(nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: return row return nrows def find_unmovable_blocks(self): unmoveable = set() bombed = set() for block, group in self.find_groups(): if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: for i in group: unmoveable.add(i) elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: bombed.add(bomb_to_basic(block)) for i in group: unmoveable.add(i) for i, block in enumerate(self.blocks): if block in bombed: unmoveable.add(i) return unmoveable def simulate(self, moves): s = self.copy() #points = 0 # avoid swapping/grabbing currently exploding items #unmoveable = s.find_unmovable_blocks() for move in moves: if move == LEFT: assert s.exa > 0 s.exa -= 1 elif move == RIGHT: assert s.exa < COLUMNS - 1 s.exa += 1 elif move == GRAB: assert s.held == NOBLOCK row = s.colskip(s.exa) assert row < s.nrows() i = row * COLUMNS + s.exa #assert i not in unmoveable s.held = s.blocks[i] s.blocks[i] = NOBLOCK elif move == DROP: assert s.held != NOBLOCK row = s.colskip(s.exa) assert row > 0 i = row * COLUMNS + s.exa s.blocks[i - COLUMNS] = s.held s.held = NOBLOCK #points += s.remove_blocks() elif move == SWAP: row = s.colskip(s.exa) assert row < s.nrows() - 2 i = row * COLUMNS + s.exa j = i + COLUMNS #assert i not in unmoveable #assert j not in unmoveable s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i] #points += s.remove_blocks() if moves and self.max_colsize() < COLSIZE_MAX: assert s.max_colsize() <= COLSIZE_MAX points = s.remove_blocks() return points, s def find_groups(self, depth=POINTS_DEPTH, minsize=2): def follow_group(i, block, group): if self.blocks[i] == block and i not in visited: group.append(i) visited.add(i) for nb in self.neighbors(i): follow_group(nb, block, group) visited = set() for col in self.iter_columns(): for i in islice(col, depth): block = self.blocks[i] group = [] follow_group(i, block, group) if len(group) >= minsize: yield block, group def neighbors(self, i): row, col = divmod(i, COLUMNS) if col > 0 and self.blocks[i - 1] != NOBLOCK: yield i - 1 if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK: yield i + 1 if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK: yield i - COLUMNS if row < self.nrows() - 1 and self.blocks[i + COLUMNS] != NOBLOCK: yield i + COLUMNS def fragmentation(self, depth=FRAG_DEPTH): """ Minimize the sum of dist(i,j) between all blocks i,j of the same color. Magnify vertical distances to avoid column stacking. """ def dist(i, j): yi, xi = divmod(i, COLUMNS) yj, xj = divmod(j, COLUMNS) # for blocks in the same group, only count vertical distance so # that groups are spread out horizontally if groups[i] == groups[j]: return abs(yj - yi) return abs(xj - xi) + abs(yj - yi) * 2 - 1 colors = {} groups = {} groupsizes = {} for groupid, (block, group) in enumerate(self.find_groups(depth, 1)): colors.setdefault(block, []).extend(group) for i in group: groups[i] = groupid groupsizes[i] = len(group) return sum(dist(i, j) # * (1 + 2 * is_bomb(block)) for block, color in colors.items() for i, j in combinations(color, 2)) def remove_blocks(self): removed = 0 for block, group in self.find_groups(): if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: removed += len(group) elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: removed += BOMB_POINTS return removed #def remove_blocks(self): # remove = [] # for block, group in self.find_groups(): # if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE: # remove.extend(group) # elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE: # remove.extend(group) # remove.extend(i for i, other in enumerate(self.blocks) # if other == bomb_to_basic(block)) # remove.sort() # removed = 0 # prev = None # for i in remove: # if i != prev: # while self.blocks[i] != NOBLOCK: # self.blocks[i] = self.blocks[i - COLUMNS] # i -= COLUMNS # removed += 1 # prev = i # if removed: # removed += self.remove_blocks() # return removed def has_explosion(self): return any(is_bomb(block) and any(self.blocks[j] == block for j in self.neighbors(i)) for i, block in enumerate(self.blocks)) def gen_moves(self): yield () def make_move(diff): direction = RIGHT if diff > 0 else LEFT return abs(diff) * (direction,) ignore_exa_column = self.grabbing_or_dropping() for src in range(COLUMNS): mov1 = make_move(src - self.exa) if mov1 or not ignore_exa_column: yield mov1 + (SWAP,) yield mov1 + (GRAB, SWAP, DROP) yield mov1 + (SWAP, GRAB, SWAP, DROP) for dst in range(COLUMNS): if dst != src: mov2 = make_move(dst - src) for get in GET: for put in PUT: yield mov1 + get + mov2 + put def solve(self): assert self.exa is not None if self.held != NOBLOCK: return Solution(self, (DROP,)) if self.nrows() < MIN_ROWS: return Solution(self, ()) if self.has_explosion(): return Solution(self, ()) return min(self.solutions()) def print(self): print_board(self.blocks, self.exa, self.held) def tostring(self): stream = io.StringIO() with redirect_stdout(stream): self.print() return stream.getvalue() def nrows(self): return len(self.blocks) // COLUMNS def has_same_exa(self, state): return self.exa == state.exa and self.held == state.held class Solution: def __init__(self, state, moves): self.state = state self.moves = moves points, self.newstate = state.simulate(moves) self.score = self.newstate.score(points, moves, state) def __lt__(self, other): return self.score < other.score def loops(self, prev_prev): return self.moves and \ self.state.exa == prev_prev.state.exa and \ self.moves == prev_prev.moves and \ self.score == prev_prev.score def delay(self): return moves_delay(self.moves) def keys(self): return moves_to_keys(self.moves) def move_to_key(move): return 'jjkadl'[move] def moves_to_keys(moves): return ''.join(move_to_key(move) for move in moves) def moves_delay(moves): return sum(MOVE_DELAYS[m] for m in moves) if __name__ == '__main__': import sys from PIL import Image board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV') state = State.detect(board) print('parsed:') state.print() print() start = time.time() solution = state.solve() end = time.time() print('best moves:', solution.keys()) print('elapsed:', round((end - start) * 1000, 1), 'ms') print() print('target after moves:') solution.newstate.print() print() for solution in sorted(state.solutions()): print('move %18s:' % solution.keys(), solution.score)