import io import time from collections import deque from contextlib import redirect_stdout from copy import copy from itertools import combinations from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \ detect_held, print_board, is_basic, is_bomb GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6) MOVE_DELAYS = ( # in milliseconds 50, # GRAB 50, # DROP 50, # SWAP 30, # LEFT 30, # RIGHT 30, # SPEED ) MIN_BASIC_GROUP_SIZE = 4 MIN_BOMB_GROUP_SIZE = 2 COLSIZE_PRIO = 5 COLSIZE_PANIC = 8 BOMB_POINTS = 5 class State: def __init__(self, blocks, exa, held, colskip, busy, moves, groups, groupsizes, maxgroup): self.blocks = blocks self.exa = exa self.held = held self.colskip = colskip self.busy = busy self.moves = moves self.groups = groups self.groupsizes = groupsizes self.maxgroup = maxgroup self.nrows = len(blocks) // COLUMNS @classmethod def detect(cls, board, pad=2): blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board)) exa = detect_exa(board) held = detect_held(board, exa) colskip = get_colskip(blocks) busy = get_busy(blocks, colskip) groups, groupsizes, maxgroup = get_groups(blocks) return cls(bytearray(blocks), exa, held, bytearray(colskip), busy, (), bytearray(groups), bytearray(groupsizes), maxgroup) def copy(self, deep): mcopy = copy if deep else lambda x: x return self.__class__(mcopy(self.blocks), self.exa, self.held, mcopy(self.colskip), self.busy, self.moves, mcopy(self.groups), mcopy(self.groupsizes), self.maxgroup) def colbusy(self, col): return (self.busy >> col) & 1 def colrows(self, col): return self.nrows - self.colskip[col] def maxrows(self): return max(map(self.colrows, range(COLUMNS))) def causes_panic(self): return self.max_colsize() >= COLSIZE_PANIC def max_colsize(self): return self.nrows - self.empty_rows() def empty_rows(self): for i, block in enumerate(self.blocks): if block != NOBLOCK: return i // COLUMNS return 0 def holes(self): start_row = self.empty_rows() score = 0 for col in range(COLUMNS): for row in range(start_row, self.nrows): if self.blocks[row * COLUMNS + col] != NOBLOCK: break score += row - start_row + 1 return score def move(self, *moves): deep = any(move in (GRAB, DROP, SWAP) for move in moves) s = self.copy(deep) s.moves += moves for move in moves: if move == LEFT: assert s.exa > 0 s.exa -= 1 elif move == RIGHT: assert s.exa < COLUMNS - 1 s.exa += 1 elif move == GRAB: assert not s.colbusy(s.exa) assert s.held == NOBLOCK row = s.colskip[s.exa] assert row < s.nrows i = row * COLUMNS + s.exa s.held = s.blocks[i] s.blocks[i] = NOBLOCK s.colskip[s.exa] += 1 s.ungroup(i) elif move == DROP: assert not s.colbusy(s.exa) assert s.held != NOBLOCK row = s.colskip[s.exa] assert row > 0 i = (row - 1) * COLUMNS + s.exa s.blocks[i] = s.held s.held = NOBLOCK s.colskip[s.exa] -= 1 s.regroup(i) elif move == SWAP: assert not s.colbusy(s.exa) row = s.colskip[s.exa] i = row * COLUMNS + s.exa j = i + COLUMNS assert j < len(s.blocks) bi = s.blocks[i] bj = s.blocks[j] if bi != bj: s.blocks[i] = NOBLOCK s.blocks[j] = NOBLOCK s.ungroup(i) s.ungroup(j) s.blocks[j] = bi s.regroup(j) s.blocks[i] = bj s.regroup(i) return s def ungroup(self, i): assert self.blocks[i] == NOBLOCK visited = set() oldid = self.groups[i] for nb in neighbors(i, self.blocks): if self.groups[nb] == oldid: newgroup = self.scan_group(nb, visited) if newgroup: self.maxgroup = newid = self.maxgroup + 1 for j in newgroup: self.groups[j] = newid self.groupsizes[j] = len(newgroup) self.groups[i] = 0 self.groupsizes[i] = 0 def regroup(self, i): assert self.blocks[i] != NOBLOCK self.maxgroup = newid = self.maxgroup + 1 newgroup = self.scan_group(i, set()) for j in newgroup: self.groups[j] = newid self.groupsizes[j] = len(newgroup) def scan_group(self, start, visited): def scan(i): if i not in visited: yield i visited.add(i) for nb in neighbors(i, self.blocks): if self.blocks[nb] == block: yield from scan(nb) block = self.blocks[start] return tuple(scan(start)) def fragmentation(self): """ Minimize the sum of dist(i,j) between all blocks i,j of the same color. Magnify vertical distances to avoid column stacking. """ def dist(i, j): yi, xi = divmod(i, COLUMNS) yj, xj = divmod(j, COLUMNS) # for blocks in the same group, only count vertical distance so # that groups are spread out horizontally if self.groups[i] == self.groups[j]: return abs(yj - yi) return abs(xj - xi) + abs(yj - yi) * 2 - 1 colors = {} for i, block in enumerate(self.blocks): if block != NOBLOCK: colors.setdefault(block, []).append(i) return sum(dist(i, j) for blocks in colors.values() for i, j in combinations(blocks, 2)) def group_leaders(self): seen = set() for i, groupid in enumerate(self.groups): if groupid > 0 and groupid not in seen: seen.add(groupid) yield i def points(self): points = 0 for leader in self.group_leaders(): block = self.blocks[leader] size = self.groupsizes[leader] if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE: points += size elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE: points += self.maxrows() return -points def gen_moves(self): yield self for src in self.gen_shift(not self.colbusy(self.exa)): yield from src.gen_stationary() for get in src.gen_get(): for dst in get.gen_shift(False): yield from dst.gen_put() def gen_shift(self, allow_noshift): if allow_noshift: yield self left = self for i in range(self.exa): left = left.move(LEFT) yield left right = self for i in range(COLUMNS - self.exa - 1): right = right.move(RIGHT) yield right def gen_stationary(self): # SWAP # GRAB, SWAP, DROP # GRAB, SWAP, DROP, SWAP # SWAP, GRAB, SWAP, DROP # SWAP, GRAB, SWAP, DROP, SWAP if not self.colbusy(self.exa): avail = self.colrows(self.exa) if avail >= 2: swap = self.move(SWAP) yield swap if avail >= 3: grab = self.move(GRAB, SWAP, DROP) yield grab yield grab.move(SWAP) swap = swap.move(GRAB, SWAP, DROP) yield swap yield swap.move(SWAP) def gen_get(self): # GRAB # SWAP, GRAB # GRAB, SWAP, DROP, SWAP, GRAB if not self.colbusy(self.exa): avail = self.colrows(self.exa) if avail >= 1: grab = self.move(GRAB) yield grab if avail >= 2: yield self.move(SWAP, GRAB) if avail >= 3: yield grab.move(SWAP, DROP, SWAP, GRAB) def gen_put(self): # DROP # DROP, SWAP # DROP, SWAP, GRAB, SWAP, DROP if not self.colbusy(self.exa): avail = self.colrows(self.exa) drop = self.move(DROP) yield drop if avail >= 1: swap = drop.move(SWAP) yield swap if avail >= 2: yield swap.move(GRAB, SWAP, DROP) def force(self, *moves): state = self.move(*moves) state.score = () return state def solve(self): assert self.exa is not None if self.held != NOBLOCK: return self.force(DROP) pool = deque(self.gen_moves()) if len(pool) == 0: return self.force() best_score = () for key in self.score_keys(): if len(pool) == 1: break for state in pool: state.score = key(state) best = min(state.score for state in pool) best_score += (best,) for i in range(len(pool)): state = pool.popleft() if state.score == best: pool.append(state) best = pool.popleft() best.score = best_score return best def score_keys(self): cls = self.__class__ colsize = self.nrows - 2 if colsize >= COLSIZE_PANIC: return cls.holes, cls.nmoves, cls.points, cls.fragmentation if colsize >= COLSIZE_PRIO: return cls.causes_panic, cls.points, cls.holes, \ cls.fragmentation, cls.nmoves return cls.points, cls.fragmentation, cls.holes, cls.nmoves def print(self): print_board(self.blocks, self.exa, self.held) def tostring(self): stream = io.StringIO() with redirect_stdout(stream): self.print() return stream.getvalue() def has_same_exa(self, state): return self.exa == state.exa and self.held == state.held def nmoves(self): return len(self.moves) def delay(self): return moves_delay(self.moves) def keys(self): return moves_to_keys(self.moves) def loops(self, prev): return self.moves and \ self.exa == prev.exa and \ self.moves == prev.moves and \ self.score == prev.score def get_colskip(blocks): def colskip(col): for row, block in enumerate(blocks[col::COLUMNS]): if block != NOBLOCK: return row return len(blocks) // COLUMNS return list(map(colskip, range(COLUMNS))) def get_busy(blocks, colskip): mask = 0 for col, skip in enumerate(colskip): start = (skip + 1) * COLUMNS + col colbusy = NOBLOCK in blocks[start::COLUMNS] mask |= colbusy << col return mask def scan_group(blocks, i, block, visited): yield i visited.add(i) for nb in neighbors(i, blocks): if blocks[nb] == block and nb not in visited: yield from scan_group(blocks, nb, block, visited) def get_groups(blocks): groupid = 0 groups = [0] * len(blocks) groupsizes = [0] * len(blocks) visited = set() for i, block in enumerate(blocks): if block != NOBLOCK and i not in visited: groupid += 1 group = tuple(scan_group(blocks, i, block, visited)) for j in group: groups[j] = groupid return groups, groupsizes, groupid def neighbors(i, blocks): y, x = divmod(i, COLUMNS) if x > 0: yield i - 1 if x < COLUMNS - 1: yield i + 1 if y > 0: yield i - COLUMNS if y < len(blocks) // COLUMNS - 1: yield i + COLUMNS def move_to_key(move): return 'jjkadl'[move] def moves_to_keys(moves): return ''.join(move_to_key(move) for move in moves) def moves_delay(moves): return sum(MOVE_DELAYS[m] for m in moves) if __name__ == '__main__': import sys from PIL import Image board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV') state = State.detect(board) print('parsed:') state.print() print() start = time.time() newstate = state.solve() end = time.time() print('best move:', newstate.keys()) print('score:', newstate.score) print('elapsed:', round((end - start) * 1000, 1), 'ms') print() print('target after move:') newstate.print() #print() #print('generated moves:') #for state in state.gen_moves(): # print(state.keys())