Ver código fonte

Add strategy solver and first working bot

Taddeus Kroes 5 anos atrás
pai
commit
bdbd073a1f
2 arquivos alterados com 376 adições e 0 exclusões
  1. 55 0
      bot.py
  2. 321 0
      strategy.py

+ 55 - 0
bot.py

@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+import time
+from strategy import State, moves_to_keys
+from interaction import get_exapunks_window, focus_window, \
+                       screenshot_board, press_keys
+
+
+if __name__ == '__main__':
+    win = get_exapunks_window()
+    focus_window(win)
+    prev_score = None
+
+    while True:
+        board = screenshot_board(win)
+
+        try:
+            state = State.detect(board)
+        except (TypeError, AssertionError):
+            print('error during parsing, wait for a bit')
+            time.sleep(.1)
+            continue
+
+        print('\033c', end='')
+        print('parsed:')
+        state.print()
+        print()
+
+        #if state.has_holes():
+        #    print('parsed board has holes, wait for a bit')
+        #    time.sleep(.1)
+        #    continue
+
+        moves = state.solve()
+
+        if moves:
+            print('moves:', moves_to_keys(moves))
+            points, newstate = state.simulate(moves)
+            score = newstate.score(points, moves, state)
+            print('     score:', score)
+            print('prev score:', prev_score)
+            empty_points, empty_state = state.simulate(())
+            print('  () score:', empty_state.score(empty_points, (), state))
+            print()
+            prev_score = score
+
+            print('target after moves:')
+            newstate.print()
+            print()
+
+            press_keys(win, moves_to_keys(moves))
+        else:
+            print('no moves')
+            press_keys(win, 'l')
+
+        time.sleep(.1)

+ 321 - 0
strategy.py

@@ -0,0 +1,321 @@
+import time
+from collections import deque
+from itertools import islice
+from parser import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
+                   detect_held, print_board, is_basic, is_bomb, bomb_to_basic
+
+
+GRAB, DROP, SWAP, LEFT, RIGHT = range(5)
+GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
+PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
+#REVERSE = [DROP, GRAB, SWAP, RIGHT, LEFT]
+MIN_BASIC_GROUP_SIZE = 4
+MIN_BOMB_GROUP_SIZE = 2
+FIND_GROUPS_DEPTH = 20
+FRAG_DEPTH = 20
+COLSIZE_PRIO = 5
+
+
+class State:
+    def __init__(self, blocks, exa, held):
+        assert exa is not None
+        self.blocks = blocks
+        self.exa = exa
+        self.held = held
+
+    def has_holes(self):
+        return any(self.blocks[i] == NOBLOCK
+                   for col in self.iter_columns()
+                   for i in col)
+
+    def iter_columns(self):
+        nrows = self.nrows()
+
+        def gen_col(col):
+            for row in range(nrows):
+                i = row * COLUMNS + col
+                if self.blocks[i] != NOBLOCK:
+                    yield i
+
+        for col in range(COLUMNS):
+            yield gen_col(col)
+
+    @classmethod
+    def detect(cls, board):
+        blocks = [NOBLOCK] * (COLUMNS * 2) + list(detect_blocks(board))
+        exa = detect_exa(board)
+        held = detect_held(board, exa)
+        return cls(blocks, exa, held)
+
+    def copy(self):
+        return State(list(self.blocks), self.exa, self.held)
+
+    def colsizes(self):
+        for col in range(COLUMNS):
+            yield self.nrows() - self.colskip(col)
+
+    def score(self, points, moves, prev):
+        frag = self.fragmentation()
+        colsizes = list(self.colsizes())
+        mincol = min(colsizes)
+        maxcol = max(colsizes)
+        colsize_score = maxcol, colsizes.count(maxcol), -mincol
+        #colsize_score = tuple(sorted(colsizes, reverse=True))
+        #if prev.nrows() >= 6:
+        #    return colsize_score, -points, frag, len(moves)
+
+        prev_colsize = max(prev.colsizes())
+
+        if prev_colsize >= COLSIZE_PRIO:
+            return colsize_score, frag, -points, len(moves)
+        else:
+            return -points, frag, colsize_score, len(moves)
+
+    def score_moves(self):
+        # clear exploding blocks before computing colsize
+        prev = self.copy()
+        prev.score_points()
+
+        for moves in self.gen_moves():
+            try:
+                points, newstate = self.simulate(moves)
+                yield newstate.score(points, moves, prev), moves
+            except AssertionError:
+                pass
+
+    def colskip(self, col):
+        nrows = self.nrows()
+        for row in range(nrows):
+            if self.blocks[row * COLUMNS + col] != NOBLOCK:
+                return row
+        return nrows
+
+    def find_unmovable_blocks(self):
+        unmoveable = set()
+        bombed = set()
+
+        for block, group in self.find_groups():
+            if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
+                for i in group:
+                    unmoveable.add(i)
+            elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
+                bombed.add(bomb_to_basic(block))
+                for i in group:
+                    unmoveable.add(i)
+
+        for i, block in enumerate(self.blocks):
+            if block in bombed:
+                unmoveable.add(i)
+
+        return unmoveable
+
+    def simulate(self, moves):
+        s = self.copy()
+        points = 0
+
+        # clear the current board before planning the next move
+        #s.score_points()
+
+        if not moves:
+            return s.score_points(), s
+
+        # avoid swapping/grabbing currently exploding items
+        unmoveable = s.find_unmovable_blocks()
+
+        for move in moves:
+            if move == LEFT:
+                assert s.exa > 0
+                s.exa -= 1
+            elif move == RIGHT:
+                assert s.exa < COLUMNS - 1
+                s.exa += 1
+            elif move == GRAB:
+                assert s.held == NOBLOCK
+                row = s.colskip(s.exa)
+                assert row < s.nrows()
+                i = row * COLUMNS + s.exa
+                assert i not in unmoveable
+                s.held = s.blocks[i]
+                s.blocks[i] = NOBLOCK
+            elif move == DROP:
+                assert s.held != NOBLOCK
+                row = s.colskip(s.exa)
+                assert row > 0
+                i = row * COLUMNS + s.exa
+                s.blocks[i - COLUMNS] = s.held
+                s.held = NOBLOCK
+                points += s.score_points()
+            elif move == SWAP:
+                row = s.colskip(s.exa)
+                assert row < s.nrows() - 2
+                i = row * COLUMNS + s.exa
+                j = i + COLUMNS
+                assert i not in unmoveable
+                assert j not in unmoveable
+                s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i]
+                points += s.score_points()
+
+        return points, s
+
+    def find_groups(self, depth=FIND_GROUPS_DEPTH):
+        def follow_group(i, block, group):
+            if self.blocks[i] == block and i not in visited:
+                group.append(i)
+                visited.add(i)
+                for nb in self.neighbors(i):
+                    follow_group(nb, block, group)
+
+        visited = set()
+
+        for col in self.iter_columns():
+            for i in islice(col, depth):
+                block = self.blocks[i]
+                group = []
+                follow_group(i, block, group)
+                if len(group) > 1:
+                    #for j in group:
+                    #    visited.add(j)
+                    yield block, group
+
+    def neighbors(self, i):
+        def gen_indices():
+            row, col = divmod(i, COLUMNS)
+            if col > 0:
+                yield i - 1
+            if col < COLUMNS - 1:
+                yield i + 1
+            if row > 0:
+                yield i - COLUMNS
+            if row < self.nrows() - 1:
+                yield i + COLUMNS
+
+        for j in gen_indices():
+            if self.blocks[j] != NOBLOCK:
+                yield j
+
+    def fragmentation(self, depth=FRAG_DEPTH):
+        """
+        Sum the minimum distance from every block in the first 3 layers to the
+        closest block of the same color.
+        """
+        def find_closest(i):
+            block = self.blocks[i]
+            work = deque([(i, -1)])
+            visited = {i}
+
+            while work:
+                i, dist = work.popleft()
+
+                if dist >= 0 and self.blocks[i] == block:
+                    return dist
+
+                for j in self.neighbors(i):
+                    if j not in visited:
+                        visited.add(j)
+                        work.append((j, dist + 1))
+
+            # only one of this type -> don't count as fragmented
+            return 0
+
+        return sum(find_closest(i)
+                   for col in self.iter_columns()
+                   for i in islice(col, depth))
+
+    def score_points(self, multiplier=1):
+        remove = []
+        points = 0
+
+        for block, group in self.find_groups():
+            if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
+                remove.extend(group)
+                points += len(group) * multiplier
+            elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
+                #points += 10
+                remove.extend(group)
+                for i, other in enumerate(self.blocks):
+                    if other == bomb_to_basic(block):
+                        remove.append(i)
+
+        remove.sort()
+        prev = None
+        for i in remove:
+            if i != prev:
+                while self.blocks[i] != NOBLOCK:
+                    self.blocks[i] = self.blocks[i - COLUMNS]
+                    i -= COLUMNS
+            prev = i
+
+        if points:
+            points += self.score_points(min(2, multiplier * 2))
+        return points
+
+    def gen_moves(self):
+        yield ()
+
+        for src in range(COLUMNS):
+            diff = src - self.exa
+            direction = RIGHT if diff > 0 else LEFT
+            mov1 = abs(diff) * (direction,)
+
+            yield mov1 + (SWAP,)
+            yield mov1 + (GRAB, SWAP, DROP)
+
+            for dst in range(COLUMNS):
+                diff = dst - src
+                direction = RIGHT if diff > 0 else LEFT
+                mov2 = abs(diff) * (direction,)
+
+                for i, get in enumerate(GET):
+                    if i > 1 or diff != 0:
+                        for put in PUT:
+                            yield mov1 + get + mov2 + put
+
+    def solve(self):
+        if self.held != NOBLOCK:
+            return (DROP,)
+
+        score, moves = min(self.score_moves())
+        return moves
+
+    def print(self):
+        print_board(self.blocks, self.exa, self.held)
+
+    def nrows(self):
+        return len(self.blocks) // COLUMNS
+
+
+
+def moves_to_keys(moves):
+    return ''.join('jjkad'[move] for move in moves)
+
+
+if __name__ == '__main__':
+    import sys
+    from PIL import Image
+    #from pprint import pprint
+
+    board = Image.open('screens/board%d.png' % int(sys.argv[1])).convert('HSV')
+    state = State.detect(board)
+
+    print('parsed:')
+    state.print()
+    print()
+
+    start = time.time()
+    moves = state.solve()
+    print('moves:', moves_to_keys(moves))
+    end = time.time()
+    print('elapsed:', end - start)
+    print()
+
+    print('target after moves:')
+    points, newstate = state.simulate(moves)
+    newstate.print()
+    print()
+
+    #for score, moves in sorted(state.score_moves()):
+    #    print('move %18s:' % moves_to_keys(moves), score)
+    #    #print('moves:', moves_to_keys(moves), moves)
+    #    #print('score:', score)
+
+    #print('\nmoves:', moves_to_keys(state.solve()))