Parcourir la source

Dynamic programming: reuse changes made by common prefix moves

Taddeus Kroes il y a 5 ans
Parent
commit
d0515a6812
2 fichiers modifiés avec 170 ajouts et 89 suppressions
  1. 15 8
      bot.py
  2. 155 81
      strategy.py

+ 15 - 8
bot.py

@@ -56,10 +56,8 @@ if __name__ == '__main__':
                 vprint_state(state)
                 vprint()
 
-                start = time.time()
-                newstate = state.solve()
-                end = time.time()
-                vprint('thought for', round((end - start) * 1000, 1), 'ms')
+                assert state.exa is not None
+
             except (TypeError, AssertionError):
                 vprint('\rerror during parsing, wait for a bit...', end='')
                 time.sleep(0.050)
@@ -69,14 +67,23 @@ if __name__ == '__main__':
                 time.sleep(0.500)
                 continue
 
+            try:
+                start = time.time()
+                newstate = state.solve()
+                end = time.time()
+                vprint('thought for', round((end - start) * 1000, 1), 'ms')
+            except AssertionError:
+                print('error board 99:')
+                state.print()
+                board.convert('RGB').save('screens/board99.png')
+                break
+
             if state.held == NOBLOCK and any(map(newstate.loops, buf)):
                 vprint('\rloop detected, wait for a bit...', end='')
                 time.sleep(0.03)
             elif newstate.moves:
                 vprint('moves:', newstate.keys())
-                vprint('     score:', newstate.score)
-                if buf:
-                    vprint('prev score:', buf[-1].score)
+                vprint('score:', newstate.score)
                 vprint()
 
                 vprint('target after moves:')
@@ -88,7 +95,7 @@ if __name__ == '__main__':
                 #moves_delay = max(0, newstate.delay() - keys_delay)
                 #vprint('wait for',  moves_delay, 'ms')
                 #time.sleep(moves_delay / 1000)
-                time.sleep(0.075)
+                time.sleep(0.080)
             elif state.nrows - 2 <= MAX_SPEED_ROWS:
                 vprint('no moves, speed up')
                 press_keys(win, 'l')

+ 155 - 81
strategy.py

@@ -2,6 +2,7 @@ import io
 import time
 from collections import deque
 from contextlib import redirect_stdout
+from copy import copy
 from itertools import combinations, islice
 from detection import COLUMNS, NOBLOCK, detect_blocks, detect_exa, \
                       detect_held, print_board, is_basic, is_bomb
@@ -17,8 +18,6 @@ MOVE_DELAYS = (
     30,  # RIGHT
     30,  # SPEED
 )
-GET = ((GRAB,), (SWAP, GRAB), (GRAB, SWAP, DROP, SWAP, GRAB))
-PUT = ((DROP,), (DROP, SWAP), (DROP, SWAP, GRAB, SWAP, DROP))
 MIN_BASIC_GROUP_SIZE = 4
 MIN_BOMB_GROUP_SIZE = 2
 POINTS_DEPTH = 3
@@ -31,25 +30,39 @@ BOMB_POINTS = 5
 
 
 class State:
-    def __init__(self, blocks, exa, held, colskip=None):
+    def __init__(self, blocks, exa, held, colskip, busy, moves, placed, grabbed):
         self.blocks = blocks
         self.exa = exa
         self.held = held
-        self.moves = ()
-        self.score = ()
-        self.nrows = len(self.blocks) // COLUMNS
-
-        if colskip is None:
-            colskip = []
-            for col in range(COLUMNS):
-                for row in range(self.nrows):
-                    if self.blocks[row * COLUMNS + col] != NOBLOCK:
-                        colskip.append(row)
-                        break
-                else:
-                    colskip.append(self.nrows)
-
         self.colskip = colskip
+        self.busy = busy
+        self.moves = moves
+        self.placed = placed
+        self.grabbed = grabbed
+        self.nrows = len(blocks) // COLUMNS
+
+    @classmethod
+    def detect(cls, board, pad=2):
+        blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
+        exa = detect_exa(board)
+        held = detect_held(board, exa)
+        colskip = get_colskip(blocks)
+        busy = get_busy(blocks, colskip)
+        return cls(blocks, exa, held, colskip, busy, (), set(), {})
+
+    def copy(self, deep):
+        mcopy = copy if deep else lambda x: x
+        return self.__class__(mcopy(self.blocks),
+                              self.exa,
+                              self.held,
+                              mcopy(self.colskip),
+                              self.busy,
+                              self.moves,
+                              mcopy(self.placed),
+                              mcopy(self.grabbed))
+
+    def colbusy(self, col):
+        return (self.busy >> col) & 1
 
     def grabbing_or_dropping(self):
         skip = self.colskip[self.exa]
@@ -66,17 +79,6 @@ class State:
         for col in range(COLUMNS):
             yield gen_col(col)
 
-    @classmethod
-    def detect(cls, board, pad=2):
-        blocks = [NOBLOCK] * (COLUMNS * pad) + list(detect_blocks(board))
-        exa = detect_exa(board)
-        held = detect_held(board, exa)
-        return cls(blocks, exa, held)
-
-    def copy(self):
-        return self.__class__(list(self.blocks), self.exa, self.held,
-                              list(self.colskip))
-
     def causes_panic(self):
         return self.max_colsize() >= COLSIZE_PANIC
 
@@ -99,38 +101,48 @@ class State:
                 score += row - start_row + 1
         return score
 
-    def move(self, moves):
-        s = self.copy() if moves else self
-        s.moves = moves
-        s.placed = set()
-        s.grabbed = {}
+    def colrows(self, col):
+        return self.nrows - self.colskip[col]
+
+    def move(self, *moves):
+        deep = any(move in (GRAB, DROP, SWAP) for move in moves)
+        s = self.copy(deep)
+        s.moves += moves
 
         for move in moves:
             if move == LEFT:
                 assert s.exa > 0
                 s.exa -= 1
+
             elif move == RIGHT:
                 assert s.exa < COLUMNS - 1
                 s.exa += 1
+
             elif move == GRAB:
+                assert not s.colbusy(s.exa)
                 assert s.held == NOBLOCK
                 row = s.colskip[s.exa]
                 assert row < s.nrows
                 i = row * COLUMNS + s.exa
-                s.held = s.blocks[i]
+                s.grabbed[i] = s.held = s.blocks[i]
                 s.blocks[i] = NOBLOCK
                 s.grabbed[i] = s.held
                 s.colskip[s.exa] += 1
+
             elif move == DROP:
+                assert not s.colbusy(s.exa)
                 assert s.held != NOBLOCK
                 row = s.colskip[s.exa]
                 assert row > 0
+                # XXX assert s.nrows - row < COLSIZE_MAX
                 i = (row - 1) * COLUMNS + s.exa
                 s.blocks[i] = s.held
                 s.held = NOBLOCK
                 s.placed.add(i)
                 s.colskip[s.exa] -= 1
+
             elif move == SWAP:
+                assert not s.colbusy(s.exa)
                 row = s.colskip[s.exa]
                 i = row * COLUMNS + s.exa
                 j = i + COLUMNS
@@ -145,9 +157,6 @@ class State:
                     s.placed.add(i)
                     s.placed.add(j)
 
-        if moves and self.max_colsize() < COLSIZE_MAX:
-            assert s.max_colsize() <= COLSIZE_MAX
-
         return s
 
     def find_groups(self, depth=POINTS_DEPTH, minsize=2):
@@ -247,66 +256,110 @@ class State:
         return -points
 
     def gen_moves(self):
-        yield ()
-
-        def shift_exa(diff):
-            direction = RIGHT if diff > 0 else LEFT
-            return abs(diff) * (direction,)
-
-        ignore_exa_column = self.grabbing_or_dropping()
-
-        for src in range(COLUMNS):
-            mov1 = shift_exa(src - self.exa)
-            if mov1 or not ignore_exa_column:
-                yield mov1 + (SWAP,)
-                yield mov1 + (GRAB, SWAP, DROP)
-                yield mov1 + (SWAP, GRAB, SWAP, DROP)
-                yield mov1 + (GRAB, SWAP, DROP, SWAP)
-                yield mov1 + (SWAP, GRAB, SWAP, DROP, SWAP)
-
-                for dst in range(COLUMNS):
-                    if dst != src:
-                        mov2 = shift_exa(dst - src)
-                        for get in GET:
-                            for put in PUT:
-                                yield mov1 + get + mov2 + put
-
-    def gen_valid_moves(self):
-        for moves in self.gen_moves():
-            try:
-                yield self.move(moves)
-            except AssertionError:
-                pass
+        yield self
+
+        for src in self.gen_shift(not self.grabbing_or_dropping()):
+            yield from src.gen_stationary()
+
+            for get in src.gen_get():
+                for dst in get.gen_shift(False):
+                    yield from dst.gen_put()
+
+    def gen_shift(self, allow_noshift):
+        if allow_noshift:
+            yield self
+
+        left = self
+        for i in range(self.exa):
+            left = left.move(LEFT)
+            yield left
+
+        right = self
+        for i in range(COLUMNS - self.exa - 1):
+            right = right.move(RIGHT)
+            yield right
+
+    def gen_stationary(self):
+        # SWAP
+        # GRAB, SWAP, DROP
+        # GRAB, SWAP, DROP, SWAP
+        # SWAP, GRAB, SWAP, DROP
+        # SWAP, GRAB, SWAP, DROP, SWAP
+        if not self.colbusy(self.exa):
+            avail = self.colrows(self.exa)
+            if avail >= 2:
+                swap = self.move(SWAP)
+                yield swap
+                if avail >= 3:
+                    grab = self.move(GRAB, SWAP, DROP)
+                    yield grab
+                    yield grab.move(SWAP)
+                    swap = swap.move(GRAB, SWAP, DROP)
+                    yield swap
+                    yield swap.move(SWAP)
+
+    def gen_get(self):
+        # GRAB
+        # SWAP, GRAB
+        # GRAB, SWAP, DROP, SWAP, GRAB
+        if not self.colbusy(self.exa):
+            avail = self.colrows(self.exa)
+            if avail >= 1:
+                grab = self.move(GRAB)
+                yield grab
+                if avail >= 2:
+                    yield self.move(SWAP, GRAB)
+                    if avail >= 3:
+                        yield grab.move(SWAP, DROP, SWAP, GRAB)
+
+    def gen_put(self):
+        # DROP
+        # DROP, SWAP
+        # DROP, SWAP, GRAB, SWAP, DROP
+        if not self.colbusy(self.exa):
+            avail = self.colrows(self.exa)
+            drop = self.move(DROP)
+            yield drop
+            if avail >= 1:
+                swap = drop.move(SWAP)
+                yield swap
+                if avail >= 2:
+                    yield swap.move(GRAB, SWAP, DROP)
+
+    def force(self, *moves):
+        state = self.move(*moves)
+        state.score = ()
+        return state
 
     def solve(self):
         assert self.exa is not None
 
         if self.held != NOBLOCK:
-            return self.move((DROP,))
+            return self.force(DROP)
 
-        valid = deque(self.gen_valid_moves())
+        pool = deque(self.gen_moves())
 
-        if len(valid) == 0:
-            return self.move(())
+        if len(pool) == 0:
+            return self.force()
 
         best_score = ()
 
         for key in self.score_keys():
-            if len(valid) == 1:
+            if len(pool) == 1:
                 break
 
-            for state in valid:
+            for state in pool:
                 state.score = key(state)
 
-            best = min(state.score for state in valid)
+            best = min(state.score for state in pool)
             best_score += (best,)
 
-            for i in range(len(valid)):
-                state = valid.popleft()
+            for i in range(len(pool)):
+                state = pool.popleft()
                 if state.score == best:
-                    valid.append(state)
+                    pool.append(state)
 
-        best = valid.popleft()
+        best = pool.popleft()
         best.score = best_score
         return best
 
@@ -344,9 +397,6 @@ class State:
     def keys(self):
         return moves_to_keys(self.moves)
 
-    def __lt__(self, other):
-        return self.score < other.score
-
     def loops(self, prev):
         return self.moves and \
                self.exa == prev.exa and \
@@ -354,6 +404,25 @@ class State:
                self.score == prev.score
 
 
+def get_colskip(blocks):
+    def colskip(col):
+        for row, block in enumerate(blocks[col::COLUMNS]):
+            if block != NOBLOCK:
+                return row
+        return len(blocks) // COLUMNS
+
+    return list(map(colskip, range(COLUMNS)))
+
+
+def get_busy(blocks, colskip):
+    mask = 0
+    for col, skip in enumerate(colskip):
+        start = (skip + 1) * COLUMNS + col
+        colbusy = NOBLOCK in blocks[start::COLUMNS]
+        mask |= colbusy << col
+    return mask
+
+
 def move_to_key(move):
     return 'jjkadl'[move]
 
@@ -387,3 +456,8 @@ if __name__ == '__main__':
 
     print('target after move:')
     newstate.print()
+    #print()
+
+    #print('generated moves:')
+    #for state in state.gen_moves():
+    #    print(state.keys())