| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- import io
- import time
- from collections import deque
- from contextlib import redirect_stdout
- from itertools import combinations, islice
- from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
- detect_held, print_board, is_basic, is_bomb
- GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
- MOVE_DELAYS = (
- # in milliseconds
- 50, # GRAB
- 50, # DROP
- 50, # SWAP
- 30, # LEFT
- 30, # RIGHT
- 30, # SPEED
- )
- GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
- PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
- MIN_BASIC_GROUP_SIZE = 4
- MIN_BOMB_GROUP_SIZE = 2
- POINTS_DEPTH = 3
- FRAG_DEPTH = 4
- DEFRAG_PRIO = 4
- COLSIZE_PRIO = 5
- COLSIZE_PANIC = 8
- COLSIZE_MAX = 9
- BOMB_POINTS = 5
- class State:
- def __init__(self, blocks, exa, held, colskip=None):
- self.blocks = blocks
- self.exa = exa
- self.held = held
- self.moves = ()
- self.score = ()
- self.nrows = len(self.blocks) // COLUMNS
- if colskip is None:
- colskip = []
- for col in range(COLUMNS):
- for row in range(self.nrows):
- if self.blocks[row * COLUMNS + col] != NOBLOCK:
- colskip.append(row)
- break
- else:
- colskip.append(self.nrows)
- self.colskip = colskip
- def grabbing_or_dropping(self):
- skip = self.colskip[self.exa]
- i = (skip + 1) * COLUMNS + self.exa
- return i < len(self.blocks) and self.blocks[i] == NOBLOCK
- def iter_columns(self):
- def gen_col(col):
- for row in range(self.nrows):
- i = row * COLUMNS + col
- if self.blocks[i] != NOBLOCK:
- yield i
- for col in range(COLUMNS):
- yield gen_col(col)
- @classmethod
- def detect(cls, board, pad=2):
- blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
- exa = detect_exa(board)
- held = detect_held(board, exa)
- return cls(blocks, exa, held)
- def copy(self):
- return self.__class__(list(self.blocks), self.exa, self.held,
- list(self.colskip))
- def causes_panic(self):
- return self.max_colsize() >= COLSIZE_PANIC
- def max_colsize(self):
- return self.nrows - self.empty_rows()
- def empty_rows(self):
- for i, block in enumerate(self.blocks):
- if block != NOBLOCK:
- return i // COLUMNS
- return 0
- def holes(self):
- start_row = self.empty_rows()
- score = 0
- for col in range(COLUMNS):
- for row in range(start_row, self.nrows):
- if self.blocks[row * COLUMNS + col] != NOBLOCK:
- break
- score += row - start_row + 1
- return score
- def move(self, moves):
- s = self.copy() if moves else self
- s.moves = moves
- s.placed = set()
- s.grabbed = {}
- for move in moves:
- if move == LEFT:
- assert s.exa > 0
- s.exa -= 1
- elif move == RIGHT:
- assert s.exa < COLUMNS - 1
- s.exa += 1
- elif move == GRAB:
- assert s.held == NOBLOCK
- row = s.colskip[s.exa]
- assert row < s.nrows
- i = row * COLUMNS + s.exa
- s.held = s.blocks[i]
- s.blocks[i] = NOBLOCK
- s.grabbed[i] = s.held
- s.colskip[s.exa] += 1
- elif move == DROP:
- assert s.held != NOBLOCK
- row = s.colskip[s.exa]
- assert row > 0
- i = (row - 1) * COLUMNS + s.exa
- s.blocks[i] = s.held
- s.held = NOBLOCK
- s.placed.add(i)
- s.colskip[s.exa] -= 1
- elif move == SWAP:
- row = s.colskip[s.exa]
- i = row * COLUMNS + s.exa
- j = i + COLUMNS
- assert j < len(s.blocks)
- bi = s.blocks[i]
- bj = s.blocks[j]
- if bi != bj:
- s.blocks[i] = bj
- s.blocks[j] = bi
- s.grabbed[i] = bi
- s.grabbed[j] = bj
- s.placed.add(i)
- s.placed.add(j)
- if moves and self.max_colsize() < COLSIZE_MAX:
- assert s.max_colsize() <= COLSIZE_MAX
- return s
- def find_groups(self, depth=POINTS_DEPTH, minsize=2):
- def follow_group(i, block, group):
- if self.blocks[i] == block and i not in visited:
- group.append(i)
- visited.add(i)
- for nb in self.neighbors(i):
- follow_group(nb, block, group)
- visited = set()
- for col in self.iter_columns():
- for i in islice(col, depth):
- block = self.blocks[i]
- group = []
- follow_group(i, block, group)
- if len(group) >= minsize:
- yield block, group
- def neighbors(self, i):
- row, col = divmod(i, COLUMNS)
- if col > 0 and self.blocks[i - 1] != NOBLOCK:
- yield i - 1
- if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK:
- yield i + 1
- if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK:
- yield i - COLUMNS
- if row < self.nrows - 1 and self.blocks[i + COLUMNS] != NOBLOCK:
- yield i + COLUMNS
- def fragmentation(self, depth=FRAG_DEPTH):
- """
- Minimize the sum of dist(i,j) between all blocks i,j of the same color.
- Magnify vertical distances to avoid column stacking.
- """
- def dist(i, j):
- yi, xi = divmod(i, COLUMNS)
- yj, xj = divmod(j, COLUMNS)
- # for blocks in the same group, only count vertical distance so
- # that groups are spread out horizontally
- if groups[i] == groups[j]:
- return abs(yj - yi)
- return abs(xj - xi) + abs(yj - yi) * 2 - 1
- colors = {}
- groups = {}
- groupsizes = {}
- for groupid, (block, group) in enumerate(self.find_groups(depth, 1)):
- colors.setdefault(block, []).extend(group)
- for i in group:
- groups[i] = groupid
- groupsizes[i] = len(group)
- return sum(dist(i, j)
- for block, color in colors.items()
- for i, j in combinations(color, 2))
- def points(self):
- def group_size(start):
- work = [start]
- visited.add(start)
- size = 0
- block = self.blocks[start]
- while work:
- i = work.pop()
- # avoid giving points to moving a block within the same group
- if self.grabbed.get(i, None) == block:
- return 0
- if self.blocks[i] == block:
- size += 1
- for nb in self.neighbors(i):
- if nb not in visited:
- visited.add(nb)
- work.append(nb)
- return size
- points = 0
- visited = set()
- for i in self.placed:
- if i not in visited:
- block = self.blocks[i]
- size = group_size(i)
- if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE:
- points += size
- elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE:
- points += BOMB_POINTS
- return -points
- def gen_moves(self):
- yield ()
- def shift_exa(diff):
- direction = RIGHT if diff > 0 else LEFT
- return abs(diff) * (direction,)
- ignore_exa_column = self.grabbing_or_dropping()
- for src in range(COLUMNS):
- mov1 = shift_exa(src - self.exa)
- if mov1 or not ignore_exa_column:
- yield mov1 + (SWAP,)
- yield mov1 + (GRAB, SWAP, DROP)
- yield mov1 + (SWAP, GRAB, SWAP, DROP)
- yield mov1 + (GRAB, SWAP, DROP, SWAP)
- yield mov1 + (SWAP, GRAB, SWAP, DROP, SWAP)
- for dst in range(COLUMNS):
- if dst != src:
- mov2 = shift_exa(dst - src)
- for get in GET:
- for put in PUT:
- yield mov1 + get + mov2 + put
- def gen_valid_moves(self):
- for moves in self.gen_moves():
- try:
- yield self.move(moves)
- except AssertionError:
- pass
- def solve(self):
- assert self.exa is not None
- if self.held != NOBLOCK:
- return self.move((DROP,))
- valid = deque(self.gen_valid_moves())
- if len(valid) == 0:
- return self.move(())
- best_score = ()
- for key in self.score_keys():
- if len(valid) == 1:
- break
- for state in valid:
- state.score = key(state)
- best = min(state.score for state in valid)
- best_score += (best,)
- for i in range(len(valid)):
- state = valid.popleft()
- if state.score == best:
- valid.append(state)
- best = valid.popleft()
- best.score = best_score
- return best
- def score_keys(self):
- cls = self.__class__
- colsize = self.nrows - 2
- if colsize >= COLSIZE_PANIC:
- return cls.holes, cls.nmoves, cls.points, cls.fragmentation
- if colsize >= COLSIZE_PRIO:
- return cls.causes_panic, cls.points, cls.holes, \
- cls.fragmentation, cls.nmoves
- return cls.points, cls.fragmentation, cls.holes, cls.nmoves
- def print(self):
- print_board(self.blocks, self.exa, self.held)
- def tostring(self):
- stream = io.StringIO()
- with redirect_stdout(stream):
- self.print()
- return stream.getvalue()
- def has_same_exa(self, state):
- return self.exa == state.exa and self.held == state.held
- def nmoves(self):
- return len(self.moves)
- def delay(self):
- return moves_delay(self.moves)
- def keys(self):
- return moves_to_keys(self.moves)
- def __lt__(self, other):
- return self.score < other.score
- def loops(self, prev):
- return self.moves and \
- self.exa == prev.exa and \
- self.moves == prev.moves and \
- self.score == prev.score
- def move_to_key(move):
- return 'jjkadl'[move]
- def moves_to_keys(moves):
- return ''.join(move_to_key(move) for move in moves)
- def moves_delay(moves):
- return sum(MOVE_DELAYS[m] for m in moves)
- if __name__ == '__main__':
- import sys
- from PIL import Image
- board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
- state = State.detect(board)
- print('parsed:')
- state.print()
- print()
- start = time.time()
- newstate = state.solve()
- end = time.time()
- print('best move:', newstate.keys())
- print('score:', newstate.score)
- print('elapsed:', round((end - start) * 1000, 1), 'ms')
- print()
- print('target after move:')
- newstate.print()
|