Bladeren bron

Tweak strategy parameters, add delay on each move to avoid inaccurate state parsing, refactor some code

Taddeus Kroes 5 jaren geleden
bovenliggende
commit
3975c688d5
3 gewijzigde bestanden met toevoegingen van 221 en 160 verwijderingen
  1. 68 46
      bot.py
  2. 3 3
      interaction.py
  3. 150 111
      strategy.py

+ 68 - 46
bot.py

@@ -1,10 +1,15 @@
 #!/usr/bin/env python3
 import os
 import time
+from collections import deque
 from itertools import count
-from strategy import State, moves_to_keys
-from interaction import get_exapunks_window, focus_window, \
-                        screenshot_board, press_keys, listen_keys
+from Xlib import error
+from strategy import State
+from interaction import get_exapunks_window, focus_window, screenshot_board, \
+                        press_keys, listen_keys, KEY_DELAY
+
+
+MAX_SPEED_ROWS = 3
 
 
 def save_screenshot(win):
@@ -19,46 +24,63 @@ def save_screenshot(win):
 
 
 if __name__ == '__main__':
-    win = get_exapunks_window()
-    focus_window(win)
-
-    listen_keys({'s': lambda: save_screenshot(win)})
-
-    prev_score = None
-
-    while True:
-        board = screenshot_board(win)
-
-        try:
-            state = State.detect(board)
-        except (TypeError, AssertionError):
-            print('error during parsing, wait for a bit')
-            time.sleep(.1)
-            continue
-
-        print('\033c', end='')
-        print('parsed:')
-        state.print()
-        print()
-
-        start = time.time()
-        moves = state.solve()
-        end = time.time()
-        print('thought for %.4f seconds' % (end - start))
-
-        if moves:
-            print('moves:', moves_to_keys(moves))
-            points, newstate = state.simulate(moves)
-            score = newstate.score(points, moves, state)
-            print('     score:', score)
-            print('prev score:', prev_score)
-            print()
-            prev_score = score
-
-            print('target after moves:')
-            newstate.print()
-            print()
-
-            press_keys(win, moves_to_keys(moves))
-        else:
-            print('no moves')
+    try:
+        win = get_exapunks_window()
+        focus_window(win)
+
+        listen_keys({'s': lambda: save_screenshot(win)})
+
+        solutions = deque([], maxlen=3)
+
+        while True:
+            try:
+                board = screenshot_board(win)
+
+                state = State.detect(board)
+                print('\033c', end='')
+                print('parsed:')
+                state.print()
+                print()
+
+                start = time.time()
+                solution = state.solve()
+                end = time.time()
+                print('thought for', round((end - start) * 1000, 1), 'milliseconds')
+            except (TypeError, AssertionError):
+                print('\rerror during parsing, wait for a bit...', end='')
+                time.sleep(0.05)
+                continue
+            except error.BadMatch:
+                print('\rEXAPUNKS window lost, wait for a bit...', end='')
+                time.sleep(0.5)
+                continue
+
+            if len(solutions) == 3 and solution.loops(solutions.popleft()):
+                print('\rloop detected, wait for a bit...', end='')
+                time.sleep(0.03)
+            elif solution.moves:
+                print('moves:', solution.keys())
+                print('     score:', solution.score)
+                if solutions:
+                    print('prev score:', solutions[-1].score)
+                print()
+
+                print('target after moves:')
+                solution.newstate.print()
+
+                press_keys(win, solution.keys())
+
+                keys_delay = len(solution.moves) * 2 * KEY_DELAY
+                moves_delay = max(0, solution.delay() - keys_delay)
+                print('wait for',  moves_delay, 'ms')
+                time.sleep(moves_delay / 1000)
+            elif state.nrows() - 2 <= MAX_SPEED_ROWS:
+                print('no moves, speed up')
+                press_keys(win, 'l')
+                time.sleep(0.03)
+            else:
+                print('no moves')
+
+            solutions.append(solution)
+    except KeyboardInterrupt:
+        print('interrupted, quitting')

+ 3 - 3
interaction.py

@@ -10,7 +10,7 @@ BOARD_X = 367
 BOARD_Y = 129
 BOARD_WIDTH = 420
 BOARD_HEIGHT = 638
-KEY_DELAY = 0.015
+KEY_DELAY = 14  # milliseconds
 
 
 disp = display.Display()
@@ -60,11 +60,11 @@ def press_keys(window, keys):
 
         ext.xtest.fake_input(disp, X.KeyPress, keycode)
         disp.sync()
-        time.sleep(KEY_DELAY)
+        time.sleep(KEY_DELAY / 1000)
 
         ext.xtest.fake_input(disp, X.KeyRelease, keycode)
         disp.sync()
-        time.sleep(KEY_DELAY)
+        time.sleep(KEY_DELAY / 1000)
 
 
 def listen_keys(handlers):

+ 150 - 111
strategy.py

@@ -8,19 +8,27 @@ from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
 
 
 GRAB, DROP, SWAP, LEFT, RIGHT, SPEED = range(6)
+MOVE_DELAYS = (
+    # in milliseconds
+    50,  # GRAB
+    50,  # DROP
+    50,  # SWAP
+    30,  # LEFT
+    30,  # RIGHT
+    30,  # SPEED
+)
 GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
 PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
 MIN_BASIC_GROUP_SIZE = 4
 MIN_BOMB_GROUP_SIZE = 2
-FIND_GROUPS_DEPTH = 3
-FRAG_DEPTH = 3
-DEFRAG_PRIO = 3
+POINTS_DEPTH = 3
+FRAG_DEPTH = 5
+DEFRAG_PRIO = 4
 COLSIZE_PRIO = 5
 COLSIZE_PANIC = 7
-COLSIZE_MAX = 8
+COLSIZE_MAX = 9
 BOMB_POINTS = 1
 MIN_ROWS = 2
-MAX_SPEED_ROWS = 3
 
 
 class State:
@@ -29,7 +37,7 @@ class State:
         self.exa = exa
         self.held = held
 
-    def grabbing_of_dropping(self):
+    def grabbing_or_dropping(self):
         skip = self.colskip(self.exa)
         i = (skip + 1) * COLUMNS + self.exa
         return i < len(self.blocks) and self.blocks[i] == NOBLOCK
@@ -56,53 +64,52 @@ class State:
     def copy(self):
         return State(list(self.blocks), self.exa, self.held)
 
-    def colsizes(self):
-        for col in range(COLUMNS):
-            yield self.nrows() - self.colskip(col)
+    def causes_panic(self):
+        return self.max_colsize() >= COLSIZE_PANIC
 
-    def colsize_panic(self):
-        return int(max(self.colsizes()) >= COLSIZE_PANIC)
+    def max_colsize(self):
+        return self.nrows() - self.empty_rows()
 
-    def empty_column_score(self):
-        skip = 0
+    def empty_rows(self):
         for i, block in enumerate(self.blocks):
             if block != NOBLOCK:
-                skip = i // COLUMNS
-                break
+                return i // COLUMNS
+        return 0
 
-        nrows = self.nrows()
+    def holes(self):
+        start_row = self.empty_rows()
+        total_rows = self.nrows()
         score = 0
         for col in range(COLUMNS):
-            for row in range(skip, nrows):
+            for row in range(start_row, total_rows):
                 if self.blocks[row * COLUMNS + col] != NOBLOCK:
                     break
-                score += row - skip + 1
+                score += row - start_row + 1
         return score
 
     def score(self, points, moves, prev):
-        prev_colsize = max(prev.colsizes())
-        points = self.score_points()
-
-        if prev_colsize >= COLSIZE_PANIC:
-            colsize = self.empty_column_score()
-            #frag = self.fragmentation()
-            return colsize, len(moves), -points #, frag
+        prev_colsize = prev.nrows() - 2
+
+        #if prev_colsize >= COLSIZE_PANIC:
+        #    holes = self.holes()
+        #    frag = self.fragmentation()
+        #    return holes, moves_delay(moves), -points, frag
+        if prev_colsize >= COLSIZE_PRIO:
+            holes = self.holes()
+            frag = self.fragmentation()
+            return -points, holes, frag, moves_delay(moves)
         elif prev_colsize >= DEFRAG_PRIO:
-            colsize = self.empty_column_score()
+            holes = self.holes()
             frag = self.fragmentation()
-            panic = self.colsize_panic()
-            return -points, panic, frag, colsize, len(moves)
-        elif prev_colsize >= COLSIZE_PRIO:
-            colsize = self.empty_column_score()
-            return -points, colsize, len(moves)
+            panic = int(self.causes_panic())
+            return -points, panic, frag, holes, moves_delay(moves)
         else:
-            return -points, len(moves)
+            return -points, moves_delay(moves)
 
-    def score_moves(self):
+    def solutions(self):
         for moves in self.gen_moves():
             try:
-                points, newstate = self.simulate(moves)
-                yield newstate.score(points, moves, self), moves
+                yield Solution(self, moves)
             except AssertionError:
                 pass
 
@@ -134,7 +141,7 @@ class State:
 
     def simulate(self, moves):
         s = self.copy()
-        points = 0
+        #points = 0
 
         # avoid swapping/grabbing currently exploding items
         #unmoveable = s.find_unmovable_blocks()
@@ -161,7 +168,7 @@ class State:
                 i = row * COLUMNS + s.exa
                 s.blocks[i - COLUMNS] = s.held
                 s.held = NOBLOCK
-                #points += s.score_points()
+                #points += s.remove_blocks()
             elif move == SWAP:
                 row = s.colskip(s.exa)
                 assert row < s.nrows() - 2
@@ -170,14 +177,15 @@ class State:
                 #assert i not in unmoveable
                 #assert j not in unmoveable
                 s.blocks[i], s.blocks[j] = s.blocks[j], s.blocks[i]
-                #points += s.score_points()
+                #points += s.remove_blocks()
 
-        if moves and max(self.colsizes()) < COLSIZE_MAX:
-            assert max(s.colsizes()) <= COLSIZE_MAX
+        if moves and self.max_colsize() < COLSIZE_MAX:
+            assert s.max_colsize() <= COLSIZE_MAX
 
+        points = s.remove_blocks()
         return points, s
 
-    def find_groups(self, depth=FIND_GROUPS_DEPTH, minsize=2):
+    def find_groups(self, depth=POINTS_DEPTH, minsize=2):
         def follow_group(i, block, group):
             if self.blocks[i] == block and i not in visited:
                 group.append(i)
@@ -196,20 +204,15 @@ class State:
                     yield block, group
 
     def neighbors(self, i):
-        def gen_indices():
-            row, col = divmod(i, COLUMNS)
-            if col > 0:
-                yield i - 1
-            if col < COLUMNS - 1:
-                yield i + 1
-            if row > 0:
-                yield i - COLUMNS
-            if row < self.nrows() - 1:
-                yield i + COLUMNS
-
-        for j in gen_indices():
-            if self.blocks[j] != NOBLOCK:
-                yield j
+        row, col = divmod(i, COLUMNS)
+        if col > 0 and self.blocks[i - 1] != NOBLOCK:
+            yield i - 1
+        if col < COLUMNS - 1 and self.blocks[i + 1] != NOBLOCK:
+            yield i + 1
+        if row > 0 and self.blocks[i - COLUMNS] != NOBLOCK:
+            yield i - COLUMNS
+        if row < self.nrows() - 1 and self.blocks[i + COLUMNS] != NOBLOCK:
+            yield i + COLUMNS
 
     def fragmentation(self, depth=FRAG_DEPTH):
         """
@@ -220,8 +223,8 @@ class State:
             yi, xi = divmod(i, COLUMNS)
             yj, xj = divmod(j, COLUMNS)
 
-            # for blocks in the same group, only count vertical distance so that
-            # groups are spread out horizontally
+            # for blocks in the same group, only count vertical distance so
+            # that groups are spread out horizontally
             if groups[i] == groups[j]:
                 return abs(yj - yi)
 
@@ -241,33 +244,43 @@ class State:
                    for block, color in colors.items()
                    for i, j in combinations(color, 2))
 
-    def score_points(self, multiplier=1):
-        #remove = []
-        points = 0
+    def remove_blocks(self):
+        removed = 0
 
         for block, group in self.find_groups():
             if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
-                #remove.extend(group)
-                points += len(group) * multiplier
+                removed += len(group)
             elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
-                points += BOMB_POINTS
-                #remove.extend(group)
-                #for i, other in enumerate(self.blocks):
-                #    if other == bomb_to_basic(block):
-                #        remove.append(i)
-
-        #remove.sort()
-        #prev = None
-        #for i in remove:
-        #    if i != prev:
-        #        while self.blocks[i] != NOBLOCK:
-        #            self.blocks[i] = self.blocks[i - COLUMNS]
-        #            i -= COLUMNS
-        #    prev = i
-
-        #if points:
-        #    points += self.score_points(min(2, multiplier * 2))
-        return points
+                removed += BOMB_POINTS
+
+        return removed
+
+    #def remove_blocks(self):
+    #    remove = []
+
+    #    for block, group in self.find_groups():
+    #        if is_basic(block) and len(group) >= MIN_BASIC_GROUP_SIZE:
+    #            remove.extend(group)
+    #        elif is_bomb(block) and len(group) >= MIN_BOMB_GROUP_SIZE:
+    #            remove.extend(group)
+    #            remove.extend(i for i, other in enumerate(self.blocks)
+    #                          if other == bomb_to_basic(block))
+
+    #    remove.sort()
+    #    removed = 0
+    #    prev = None
+    #    for i in remove:
+    #        if i != prev:
+    #            while self.blocks[i] != NOBLOCK:
+    #                self.blocks[i] = self.blocks[i - COLUMNS]
+    #                i -= COLUMNS
+    #            removed += 1
+    #        prev = i
+
+    #    if removed:
+    #        removed += self.remove_blocks()
+
+    #    return removed
 
     def has_explosion(self):
         return any(is_bomb(block) and
@@ -281,38 +294,35 @@ class State:
             direction = RIGHT if diff > 0 else LEFT
             return abs(diff) * (direction,)
 
+        ignore_exa_column = self.grabbing_or_dropping()
+
         for src in range(COLUMNS):
             mov1 = make_move(src - self.exa)
-            yield mov1 + (SWAP,)
-            yield mov1 + (GRAB, SWAP, DROP)
-            yield mov1 + (SWAP, GRAB, SWAP, DROP)
-
-            for dst in range(COLUMNS):
-                if dst != src:
-                    mov2 = make_move(dst - src)
-                    for get in GET:
-                        for put in PUT:
-                            yield mov1 + get + mov2 + put
+            if mov1 or not ignore_exa_column:
+                yield mov1 + (SWAP,)
+                yield mov1 + (GRAB, SWAP, DROP)
+                yield mov1 + (SWAP, GRAB, SWAP, DROP)
+
+                for dst in range(COLUMNS):
+                    if dst != src:
+                        mov2 = make_move(dst - src)
+                        for get in GET:
+                            for put in PUT:
+                                yield mov1 + get + mov2 + put
 
     def solve(self):
         assert self.exa is not None
 
         if self.held != NOBLOCK:
-            return (DROP,)
+            return Solution(self, (DROP,))
 
         if self.nrows() < MIN_ROWS:
-            return ()
-
-        if self.grabbing_of_dropping():
-            return ()
+            return Solution(self, ())
 
         if self.has_explosion():
-            return ()
+            return Solution(self, ())
 
-        score, moves = min(self.score_moves())
-        if not moves and max(self.colsizes()) <= MAX_SPEED_ROWS:
-            return (SPEED,)
-        return moves
+        return min(self.solutions())
 
     def print(self):
         print_board(self.blocks, self.exa, self.held)
@@ -326,10 +336,43 @@ class State:
     def nrows(self):
         return len(self.blocks) // COLUMNS
 
+    def has_same_exa(self, state):
+        return self.exa == state.exa and self.held == state.held
+
+
+class Solution:
+    def __init__(self, state, moves):
+        self.state = state
+        self.moves = moves
+        points, self.newstate = state.simulate(moves)
+        self.score = self.newstate.score(points, moves, state)
+
+    def __lt__(self, other):
+        return self.score < other.score
+
+    def loops(self, prev_prev):
+        return self.moves and \
+               self.state.exa == prev_prev.state.exa and \
+               self.moves == prev_prev.moves and \
+               self.score == prev_prev.score
+
+    def delay(self):
+        return moves_delay(self.moves)
+
+    def keys(self):
+        return moves_to_keys(self.moves)
+
+
+def move_to_key(move):
+    return 'jjkadl'[move]
 
 
 def moves_to_keys(moves):
-    return ''.join('jjkadl'[move] for move in moves)
+    return ''.join(move_to_key(move) for move in moves)
+
+
+def moves_delay(moves):
+    return sum(MOVE_DELAYS[m] for m in moves)
 
 
 if __name__ == '__main__':
@@ -343,20 +386,16 @@ if __name__ == '__main__':
     state.print()
     print()
 
-    print('empty cols:', state.empty_column_score())
-    print()
-
     start = time.time()
-    moves = state.solve()
+    solution = state.solve()
     end = time.time()
-    print('moves:', moves_to_keys(moves))
-    print('elapsed:', end - start)
+    print('best moves:', solution.keys())
+    print('elapsed:', round((end - start) * 1000, 1), 'ms')
     print()
 
     print('target after moves:')
-    points, newstate = state.simulate(moves)
-    newstate.print()
+    solution.newstate.print()
     print()
 
-    for score, moves in sorted(state.score_moves()):
-        print('move %18s:' % moves_to_keys(moves), score)
+    for solution in sorted(state.solutions()):
+        print('move %18s:' % solution.keys(), solution.score)