| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 |
- import io
- import time
- from collections import deque
- from contextlib import redirect_stdout
- from copy import copy
- from itertools import combinations
- from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
- detect_held, print_board, is_basic, is_bomb
- GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
- MOVE_DELAYS = (
- # in milliseconds
- 50, # GRAB
- 50, # DROP
- 50, # SWAP
- 30, # LEFT
- 30, # RIGHT
- 30, # SPEED
- )
- MIN_BASIC_GROUP_SIZE = 4
- MIN_BOMB_GROUP_SIZE = 2
- COLSIZE_PRIO = 5
- COLSIZE_PANIC = 8
- BOMB_POINTS = 5
- class State:
- def __init__(self, blocks, exa, held, colskip, busy, moves, groups, groupsizes, maxgroup):
- self.blocks = blocks
- self.exa = exa
- self.held = held
- self.colskip = colskip
- self.busy = busy
- self.moves = moves
- self.groups = groups
- self.groupsizes = groupsizes
- self.maxgroup = maxgroup
- self.nrows = len(blocks) // COLUMNS
- @classmethod
- def detect(cls, board, pad=2):
- blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
- exa = detect_exa(board)
- held = detect_held(board, exa)
- colskip = get_colskip(blocks)
- busy = get_busy(blocks, colskip)
- groups, groupsizes, maxgroup = get_groups(blocks)
- return cls(bytearray(blocks), exa, held, bytearray(colskip), busy, (),
- bytearray(groups), bytearray(groupsizes), maxgroup)
- def copy(self, deep):
- mcopy = copy if deep else lambda x: x
- return self.__class__(mcopy(self.blocks),
- self.exa,
- self.held,
- mcopy(self.colskip),
- self.busy,
- self.moves,
- mcopy(self.groups),
- mcopy(self.groupsizes),
- self.maxgroup)
- def colbusy(self, col):
- return (self.busy >> col) & 1
- def colrows(self, col):
- return self.nrows - self.colskip[col]
- def maxrows(self):
- return max(map(self.colrows, range(COLUMNS)))
- def causes_panic(self):
- return self.max_colsize() >= COLSIZE_PANIC
- def max_colsize(self):
- return self.nrows - self.empty_rows()
- def empty_rows(self):
- for i, block in enumerate(self.blocks):
- if block != NOBLOCK:
- return i // COLUMNS
- return 0
- def holes(self):
- start_row = self.empty_rows()
- score = 0
- for col in range(COLUMNS):
- for row in range(start_row, self.nrows):
- if self.blocks[row * COLUMNS + col] != NOBLOCK:
- break
- score += row - start_row + 1
- return score
- def locked(self, i):
- block = self.blocks[i]
- if block == NOBLOCK:
- return False
- if is_basic(block):
- return self.groupsizes[i] >= MIN_BASIC_GROUP_SIZE
- assert is_bomb(block)
- return self.groupsizes[i] >= MIN_BOMB_GROUP_SIZE
- def move(self, *moves):
- deep = any(move in (GRAB, DROP, SWAP) for move in moves)
- s = self.copy(deep)
- s.moves += moves
- for move in moves:
- if move == LEFT:
- assert s.exa > 0
- s.exa -= 1
- elif move == RIGHT:
- assert s.exa < COLUMNS - 1
- s.exa += 1
- elif move == GRAB:
- assert not s.colbusy(s.exa)
- assert s.held == NOBLOCK
- row = s.colskip[s.exa]
- assert row < s.nrows
- i = row * COLUMNS + s.exa
- if not s.locked(i):
- s.colskip[s.exa] += 1
- s.held = s.remove(i)
- elif move == DROP:
- if s.held != NOBLOCK:
- row = s.colskip[s.exa]
- assert row > 0
- i = (row - 1) * COLUMNS + s.exa
- s.colskip[s.exa] -= 1
- s.place(i, s.held)
- s.held = NOBLOCK
- elif move == SWAP:
- assert not s.colbusy(s.exa)
- row = s.colskip[s.exa]
- i = row * COLUMNS + s.exa
- j = i + COLUMNS
- if j < len(s.blocks) and not s.locked(i) and not s.locked(j):
- bi = s.blocks[i]
- bj = s.blocks[j]
- assert bi != NOBLOCK
- assert bj != NOBLOCK
- if bi != bj:
- s.blocks[i] = NOBLOCK
- s.blocks[j] = NOBLOCK
- visited = set()
- s.ungroup(i, visited)
- s.ungroup(j, visited)
- s.place(j, bi)
- s.place(i, bj)
- return s
- def remove(self, i):
- block = self.blocks[i]
- assert block != NOBLOCK
- self.blocks[i] = NOBLOCK
- self.ungroup(i, set())
- return block
- def ungroup(self, i, visited):
- assert self.blocks[i] == NOBLOCK
- oldid = self.groups[i]
- for nb in neighbors(i, self.blocks):
- if self.groups[nb] == oldid:
- self.create_group(nb, visited)
- self.groups[i] = 0
- self.groupsizes[i] = 0
- def place(self, i, block):
- assert block != NOBLOCK
- assert self.blocks[i] == NOBLOCK
- self.blocks[i] = block
- self.create_group(i, set())
- def create_group(self, start, visited):
- def scan(i):
- if i not in visited:
- yield i
- visited.add(i)
- for nb in neighbors(i, self.blocks):
- if self.blocks[nb] == block:
- yield from scan(nb)
- block = self.blocks[start]
- group = tuple(scan(start))
- if group:
- self.maxgroup = newid = self.maxgroup + 1
- for j in group:
- self.groups[j] = newid
- self.groupsizes[j] = len(group)
- def fragmentation(self):
- """
- Minimize the sum of dist(i,j) between all blocks i,j of the same color.
- Magnify vertical distances to avoid column stacking.
- """
- def dist(i, j):
- yi, xi = divmod(i, COLUMNS)
- yj, xj = divmod(j, COLUMNS)
- # for blocks in the same group, only count vertical distance so
- # that groups are spread out horizontally
- if self.groups[i] == self.groups[j]:
- return abs(yj - yi)
- return abs(xj - xi) + abs(yj - yi) * 2 - 1
- colors = {}
- for i, block in enumerate(self.blocks):
- if block != NOBLOCK:
- colors.setdefault(block, []).append(i)
- return sum(dist(i, j)
- for blocks in colors.values()
- for i, j in combinations(blocks, 2))
- def group_leaders(self):
- seen = set()
- for i, groupid in enumerate(self.groups):
- if groupid > 0 and groupid not in seen:
- seen.add(groupid)
- yield i
- def points(self):
- points = 0
- for leader in self.group_leaders():
- block = self.blocks[leader]
- size = self.groupsizes[leader]
- if is_basic(block) and size >= MIN_BASIC_GROUP_SIZE:
- points += size
- elif is_bomb(block) and size >= MIN_BOMB_GROUP_SIZE:
- points += self.maxrows()
- return -points
- def gen_moves(self):
- yield self
- for src in self.gen_shift(not self.colbusy(self.exa)):
- yield from src.gen_stationary()
- for get in src.gen_get():
- for dst in get.gen_shift(False):
- yield from dst.gen_put()
- def gen_shift(self, allow_noshift):
- if allow_noshift:
- yield self
- left = self
- for i in range(self.exa):
- left = left.move(LEFT)
- yield left
- right = self
- for i in range(COLUMNS - self.exa - 1):
- right = right.move(RIGHT)
- yield right
- def gen_stationary(self):
- # SWAP
- # GRAB, SWAP, DROP
- # GRAB, SWAP, DROP, SWAP
- # SWAP, GRAB, SWAP, DROP
- # SWAP, GRAB, SWAP, DROP, SWAP
- if not self.colbusy(self.exa):
- avail = self.colrows(self.exa)
- if avail >= 2:
- swap = self.move(SWAP)
- yield swap
- if avail >= 3:
- grab = self.move(GRAB, SWAP, DROP)
- yield grab
- yield grab.move(SWAP)
- swap = swap.move(GRAB, SWAP, DROP)
- yield swap
- yield swap.move(SWAP)
- def gen_get(self):
- # GRAB
- # SWAP, GRAB
- # GRAB, SWAP, DROP, SWAP, GRAB
- if not self.colbusy(self.exa):
- avail = self.colrows(self.exa)
- if avail >= 1:
- grab = self.move(GRAB)
- yield grab
- if avail >= 2:
- yield self.move(SWAP, GRAB)
- if avail >= 3:
- yield grab.move(SWAP, DROP, SWAP, GRAB)
- def gen_put(self):
- # DROP
- # DROP, SWAP
- # DROP, SWAP, GRAB, SWAP, DROP
- if not self.colbusy(self.exa):
- avail = self.colrows(self.exa)
- drop = self.move(DROP)
- yield drop
- if avail >= 1:
- swap = drop.move(SWAP)
- yield swap
- if avail >= 2:
- yield swap.move(GRAB, SWAP, DROP)
- def force(self, *moves):
- state = self.move(*moves)
- state.score = ()
- return state
- def solve(self):
- assert self.exa is not None
- if self.held != NOBLOCK:
- return self.force(DROP)
- pool = deque(self.gen_moves())
- if len(pool) == 0:
- return self.force()
- best_score = ()
- for key in self.score_keys():
- if len(pool) == 1:
- break
- for state in pool:
- state.score = key(state)
- best = min(state.score for state in pool)
- best_score += (best,)
- for i in range(len(pool)):
- state = pool.popleft()
- if state.score == best:
- pool.append(state)
- best = pool.popleft()
- best.score = best_score
- return best
- def score_keys(self):
- cls = self.__class__
- colsize = self.nrows - 2
- if colsize >= COLSIZE_PANIC:
- return cls.holes, cls.points, cls.nmoves, cls.fragmentation
- if colsize >= COLSIZE_PRIO:
- return cls.causes_panic, cls.points, cls.holes, \
- cls.fragmentation, cls.nmoves
- return cls.points, cls.fragmentation, cls.holes, cls.nmoves
- def print_groupsizes(self):
- for start in range(len(self.groupsizes) - COLUMNS, -1, -COLUMNS):
- print(' '.join('%-2d' % g for g in self.groupsizes[start:start + COLUMNS]))
- def print_groups(self):
- for start in range(len(self.groups) - COLUMNS, -1, -COLUMNS):
- print(' '.join('%-2d' % g for g in self.groups[start:start + COLUMNS]))
- def print(self):
- print_board(self.blocks, self.exa, self.held)
- def tostring(self):
- stream = io.StringIO()
- with redirect_stdout(stream):
- self.print()
- return stream.getvalue()
- def has_same_exa(self, state):
- return self.exa == state.exa and self.held == state.held
- def nmoves(self):
- return len(self.moves)
- def delay(self):
- return moves_delay(self.moves)
- def keys(self):
- return moves_to_keys(self.moves)
- def loops(self, prev):
- return self.moves and \
- self.exa == prev.exa and \
- self.moves == prev.moves and \
- self.score == prev.score
- def get_colskip(blocks):
- def colskip(col):
- for row, block in enumerate(blocks[col::COLUMNS]):
- if block != NOBLOCK:
- return row
- return len(blocks) // COLUMNS
- return list(map(colskip, range(COLUMNS)))
- def get_busy(blocks, colskip):
- mask = 0
- for col, skip in enumerate(colskip):
- start = (skip + 1) * COLUMNS + col
- colbusy = NOBLOCK in blocks[start::COLUMNS]
- mask |= colbusy << col
- return mask
- def scan_group(blocks, i, block, visited):
- yield i
- visited.add(i)
- for nb in neighbors(i, blocks):
- if blocks[nb] == block and nb not in visited:
- yield from scan_group(blocks, nb, block, visited)
- def get_groups(blocks):
- groupid = 0
- groups = [0] * len(blocks)
- groupsizes = [0] * len(blocks)
- visited = set()
- for i, block in enumerate(blocks):
- if block != NOBLOCK and i not in visited:
- groupid += 1
- group = tuple(scan_group(blocks, i, block, visited))
- for j in group:
- groups[j] = groupid
- groupsizes[j] = len(group)
- return groups, groupsizes, groupid
- def neighbors(i, blocks):
- y, x = divmod(i, COLUMNS)
- if x > 0:
- yield i - 1
- if x < COLUMNS - 1:
- yield i + 1
- if y > 0:
- yield i - COLUMNS
- if y < len(blocks) // COLUMNS - 1:
- yield i + COLUMNS
- def move_to_key(move):
- return 'jjkadl'[move]
- def moves_to_keys(moves):
- return ''.join(move_to_key(move) for move in moves)
- def moves_delay(moves):
- return sum(MOVE_DELAYS[m] for m in moves)
- if __name__ == '__main__':
- import sys
- from PIL import Image
- board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
- state = State.detect(board)
- print('parsed:')
- state.print()
- print()
- print('groups:')
- state.print_groups()
- print()
- print('groupsizes:')
- state.print_groupsizes()
- print()
- start = time.time()
- newstate = state.solve()
- end = time.time()
- print('best move:', newstate.keys())
- print('score:', newstate.score)
- print('elapsed:', round((end - start) * 1000, 1), 'ms')
- print()
- print('target after move:')
- newstate.print()
- #print()
- #print('generated moves:')
- #for state in state.gen_moves():
- # print(state.keys())
|