Published on

Hatagawa II - BlackHat MEA 2025 CTF (Quals)

Authors
blackhat

Hello! everyone, a Web/Cryptography CTF player from the Team HasHash. I'll be sharing my solutions for the crypto challenges I solved in this CTF. I hope you find them interesting!

Hatagawa II

Category: Cryptography

Description: It is said that once in a while, flags might float by when watching over the Hatagawa — but this time, the river disguises them in a slightly trickier way.

First look at the code they provided to us.

Challenge Script:

#!/usr/bin/env python3
#
# BlackHat MEA CTF 2025 Qualifiers :: Hatagawa II
#

# Documentation imports
from __future__ import annotations
from typing import Tuple, List, Dict, NewType, Union

# Native imports
from secrets import randbelow
import os

# External dependencies
# None

# Flag import
FLAG = os.environ.get('DYN_FLAG', 'BHFlagY{' + b'D3BUGG1NG_1S_FUN'.hex() + '}')
if isinstance(FLAG, str):
    FLAG = FLAG.encode()
    FFMT = (FLAG[:FLAG.index(b'{') + 1], b'}')


# Helper functions
# None


# Challenge classes
class Kawa:
    """ 川 """
    def __init__(self, par: Tuple[int], seed: int) -> None:
        self.a, self.c, self.m = par
        self.x = seed

    def Get(self) -> bytes:
        """ Generates and outputs the next internal state as bytes. """
        self.x = (self.a * self.x + self.c) & self.m
        return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big')

class Hata:
    """ 旗 """
    def __init__(self, entropy: object) -> None:
        self.entropy = entropy

    def Strip(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
        """ Strips a message from given bytestring formatting. """
        return bytes.fromhex(msg.replace(fmt[0], b'').replace(fmt[1], b'').decode())

    def Unstrip(self, cip: str, fmt: Tuple[bytes, bytes]) -> str:
        """ Add given string formatting to a ciphertext. """
        return fmt[0].decode() + cip + fmt[1].decode()

    def Encrypt(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
        """ Encrypts a message (except its formatting) using a one-time pad generated from entropy source. """
        msg = self.Strip(msg, fmt)
        otp = b''
        while len(otp) <= len(msg):
            otp += self.entropy.Get()
        cip = bytes([x ^ y for x,y in zip(msg, otp)])
        return self.Unstrip(cip.hex(), fmt)


# Main loop
if __name__ == "__main__":

    # Challenge parameters
    MOD = 2**64 - 1
    MUL = (randbelow(MOD >> 3) << 3) | 5
    ADD = randbelow(MOD) | 1


    # Challenge setup
    hatagawa = Hata(Kawa((MUL, ADD, MOD), randbelow(MOD)))

    RIVER = r"""|
|  ////\\\,,\///,,,,,\,/,\\,//////,\,,\\\,,\\\/,,,\,,//,\\\,
|   ~ ~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~~ ~  ~~~    ~~~ ~~~ ~~   ~~
|    ~~~~~~~  ~~~~~~   ~~~ ~ ~~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~
|   ~~~     {}      ~
|    ~~~  ~   ~~~~~  ~~~~ ~ ~~~~   ~~~~~ ~~~~   ~~~ ~ ~~~~~
|   ~~~ ~  ~~~~~  ~  ~~  ~  ~~~~ ~~~ ~~   ~~ ~~~~~~~ ~ ~~
|  \\\\\'''\\'////'//'\''\\\/'''\''//'\\\''\///''''\'/'\\'//"""
    print(RIVER.format(' '*17 + '旗    川' + ' '*16))


    # Main loop
    userOptions = ['Stay a while...']
    TUI = "|\n|  Menu:\n|    " + "\n|    ".join('[' + i[0] + ']' + i[1:] for i in userOptions) + "\n|    [W]alk away\n|"

    while True:
        try:

            print(TUI)
            choice = input('|  > ').lower()

            # [W]alk away
            if choice == 'w':
                print("|\n|  [~] You turn your back to the river...\n|")
                break

            # [S]tay a while...
            elif choice == 's':

                print("|\n|  [~] Look! A flag..? For FREE?!")
                print(RIVER.format(hatagawa.Encrypt(FLAG, FFMT)))

            else:
                print("|\n|  [!] Invalid choice.")

        except KeyboardInterrupt:
            print("\n|\n|  [~] Goodbye ~ !\n|")
            break

        except Exception as e:
            print('|\n|  [!] {}'.format(e))

The code may seem big, but it's quite straightforward. Let's break it down into crucial snippets of the code.


Crucial Snippets

1. 64-bit LCG Core

self.x = (self.a * self.x + self.c) & self.m   # m = 2**64 - 1
return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big')  # 8 bytes

This is a Linear Congruential Generator (LCG):

  • & (2**64 - 1) effectively performs arithmetic modulo 2^64 (because m = 2^64 − 1 and masking keeps the low 64 bits)
  • The keystream block is the full internal 64-bit state rendered as 8 bytes (big endian)

Each call to Get() leaks the entire state of the LCG — extremely fragile if any plaintext is known or related.


2. Parameter Structure

MOD = 2**64 - 1
MUL = (randbelow(MOD >> 3) << 3) | 5   # ⇒ a ≡ 5 (mod 8)
ADD = randbelow(MOD) | 1               # ⇒ c is odd
  • a is constrained to 5 mod 8 (and thus odd)
  • c is forced odd
  • These structural constraints heavily shrink the search space for (a, c) and make the LCG "nice" to reason about modulo 2^64

3. Strip / Unstrip Logic (Format Handling)

def Strip(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
    return bytes.fromhex(msg.replace(fmt[0], b'').replace(fmt[1], b'').decode())

def Unstrip(self, cip: str, fmt: Tuple[bytes, bytes]) -> str:
    return fmt[0].decode() + cip + fmt[1].decode()
  • FFMT is (b"BHFlagY{", b"}")
  • Strip:
    • Removes the prefix and suffix from msg (the flag)
    • The inner part is hex: for the default flag, b"D3BUGG1NG_1S_FUN".hex()
    • bytes.fromhex(...) converts that hex string into the raw binary plaintext
  • Encrypt works on that stripped binary, not on the ASCII "BHFlagY{...}" directly
  • Unstrip:
    • Takes the resulting ciphertext bytes, hex-encodes them to a string
    • Rewraps as BHFlagY{<hex_ciphertext>}

Visually, each time we press S, we see:

BHFlagY{<hex of (plaintext XOR keystream)>}

The outer formatting is constant and known; only the inner hex changes between samples.


4. OTP Generation and Reuse

def Encrypt(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
    msg = self.Strip(msg, fmt)
    otp = b''
    while len(otp) <= len(msg):
        otp += self.entropy.Get()
    cip = bytes([x ^ y for x,y in zip(msg, otp)])
    return self.Unstrip(cip.hex(), fmt)
  • The OTP is built by concatenating successive PRNG states (Get() calls)
  • The PRNG is not reset between user choices:
    • First press: uses states S[0], S[1], S[2], ...
    • Second press: uses later states S[3], S[4], S[5], ... (depending on how many are consumed)
  • Every press encrypts the same underlying plaintext bytes (the stripped flag), but with different, sequential chunks of the LCG stream

This is classic "many-time pad" behavior — a one-time pad used multiple times with a deterministic keystream.


Vulnerability / Bug

The core cryptographic issues:

1. LCG as a Keystream Generator

An LCG over 64 bits with known structure is not secure. Its outputs are linearly related, and with enough observations you can reconstruct the parameters.

2. Full State Leak per Block

Each 8-byte keystream block is literally the internal state x in big-endian form. Any relation between plaintext and ciphertext gives a direct relation to state.

3. No Reseeding / Global PRNG

The same global Kawa instance is used across all Stay presses. Every use consumes more states from the same chain, giving multiple ciphertexts of the same underlying message under different consecutive states.

4. Plaintext Invariance Across Samples

The plaintext being encrypted (after Strip) is the same bytes every time:

P (binary) = bytes.fromhex(inner_flag_hex)

For each block index b and sample k:

P[b] = S[3k + b] XOR C[k,b]

So for a fixed block index b, all S[3k + b] XOR C[k,b] must be equal across k. This gives a set of strong algebraic constraints linking the states and ciphertext blocks.

5. Strong Parameter Constraints

The multipliers and increment are restricted (a mod 8 = 5, c odd). That makes the solution space for the LCG parameters significantly smaller and makes solving via SMT (Z3) very feasible.

The combination of:

  • Deterministic LCG keystream
  • Reuse of OTP across multiple encryptions of the same plaintext
  • Full-state leakage
  • Strong algebraic constraints from plaintext invariance

Makes the scheme completely breakable with a few samples.


My Approach

Step 1: Collect Multiple Ciphertext Samples

We exploit the network interface:

  • Connect to the remote service with pwntools.remote
  • Repeatedly send S to trigger new encryptions
  • Each response prints an ASCII art river with a formatted flag inside: ... BHFlagY{<hex_ciphertext>} ...
  • We use a regex to extract {[0-9a-f]+} and store each inner hex string as a ciphertext sample C_k

Code fragment:

def parse_flag_block(text: bytes) -> Tuple[str, bytes]:
    m = re.search(rb'([^\s{}]+)\{([0-9a-f]+)\}', text)
    if not m:
        m2 = re.search(rb'\{([0-9a-f]+)\}', text)
        if not m2:
            raise ValueError("Could not find {hex} in response.")
        prefix = "BHFlagY"
        hex_payload = m2.group(1).decode()
    else:
        prefix = m.group(1).decode()
        hex_payload = m.group(2).decode()
    return prefix, bytes.fromhex(hex_payload)

def collect_ciphertexts(host: str, port: int, num_samples: int):
    r = remote(host, port)
    r.recvuntil(b'> ')
    cblocks = []
    prefix_seen = None
    for _ in range(num_samples):
        r.sendline(b'S')
        chunk = r.recvuntil(b'|  > ', drop=False)
        pref, cbytes = parse_flag_block(chunk)
        if prefix_seen is None:
            prefix_seen = pref
        cblocks.append(cbytes)
    r.sendline(b'W')
    r.close()
    return prefix_seen, cblocks

We grab ~8 samples to have enough constraints.


Step 2: Split Ciphertexts into Aligned Blocks

The plaintext after Strip is arbitrary bytes; we only know that the same bytes are used for every encryption. We split each ciphertext byte string into blocks corresponding to 64-bit keystream outputs:

def split_blocks(cblocks: List[bytes]):
    N = len(cblocks[0])
    B = ceil(N / 8)          # number of blocks
    last_len = N - 8*(B-1)   # length of last partial block
    obs_blocks = []
    for cb in cblocks:
        blocks = []
        off = 0
        for b in range(B):
            L = 8 if b < B-1 else last_len
            blocks.append(cb[off:off+L])
            off += L
        obs_blocks.append(blocks)
    return N, B, last_len, obs_blocks

Each obs_blocks[k][b] is the b-th ciphertext block for sample k.

We then "top-align" each block into a 64-bit word:

def top_aligned_u64(block_bytes: bytes) -> int:
    L = len(block_bytes)
    v = int.from_bytes(block_bytes, 'big')
    shift = 8 * (8 - L)
    return (v << shift) & MASK64

So the bytes occupy the high bytes of a 64-bit integer, making it easy to compare "top bits" across samples even when the last block is shorter than 8 bytes.


Step 3: Build Z3 Model for LCG + Plaintext Invariance

We construct a bit-vector SMT system:

Unknowns:

  • a, c as 64-bit BitVecs
  • S[i] for each emitted state (3 per sample)

Constraints:

  1. Parameter shape

    s.add((a & 7) == 5)  # a ≡ 5 (mod 8)
    s.add((c & 1) == 1)  # c odd
    
  2. LCG transitions

    for i in range(total_states - 1):
        s.add(S[i+1] == a * S[i] + c)  # modulo 2^64 via bit-vector wrap
    
  3. Plaintext invariance across samples / blocks

    For each block index b and sample k:

    Let:

    • C[0,b] be the top-aligned ciphertext block from sample 0
    • C[k,b] from sample k
    • Plaintext P_b is the same for all k, so:
    P_b = S[b]       XOR C[0,b]
        = S[3k + b] XOR C[k,b]
    

    Implemented as:

    for b in range(B):
        L = 8 if b < B-1 else last_len
        top_hi = 63
        top_lo = 64 - 8*L
    
        C0b = BitVecVal(top_aligned_u64(obs[0][b]), W)
        for k in range(T):
            Ckb = BitVecVal(top_aligned_u64(obs[k][b]), W)
            left  = S[3*k + b] ^ Ckb
            right = S[b]       ^ C0b
            if L == 8:
                s.add(left == right)
            else:
                s.add(Extract(top_hi, top_lo, left) == Extract(top_hi, top_lo, right))
    

    For full blocks we equate all bits; for the shorter last block we equate only the top 8*L bits.


Step 4: Ask Z3 to Solve and Recover (a, c) and Plaintext

Once constraints are set:

if s.check() != sat:
    return None

m = s.model()
a_val = m[a].as_long() & MASK64
c_val = m[c].as_long() & MASK64

Then we recover the plaintext bytes by reusing:

plain = bytearray()
for b in range(B):
    L = 8 if b < B-1 else last_len
    Sb  = (m[S[b]].as_long() & MASK64)
    C0b = top_aligned_u64(obs[0][b])
    Pb64 = Sb ^ C0b
    pb = Pb64.to_bytes(8, 'big')[:L]  # take TOP L bytes
    plain += pb

This plain corresponds to the binary message produced by Strip (the pre-OTP bytes). The original flag format is:

prefix + "{" + plain.hex() + "}"

So we reconstruct the final flag string as:

true_flag = f"{prefix}{{{plain.hex()}}}"

Which matches the original FLAG.


Final Solution Script

Here is the full solver:

#!/usr/bin/env python3
"""
BlackHat MEA CTF 2025 Quals :: Hatagawa II
             Advanced LCG Attack with Z3
"""

import sys
import re
from math import ceil
from typing import List, Tuple

from pwn import remote
from z3 import Solver, BitVec, BitVecVal, Extract, sat

W = 64
MASK64 = (1 << 64) - 1

# ---------------- I/O helpers ----------------

def parse_flag_block(text: bytes) -> Tuple[str, bytes]:
    m = re.search(rb'([^\s{}]+)\{([0-9a-f]+)\}', text)
    if not m:
        m2 = re.search(rb'\{([0-9a-f]+)\}', text)
        if not m2:
            raise ValueError("Could not find {hex} in response.")
        prefix = "BHFlagY"
        hex_payload = m2.group(1).decode()
    else:
        prefix = m.group(1).decode()
        hex_payload = m.group(2).decode()
    return prefix, bytes.fromhex(hex_payload)

def collect_ciphertexts(host: str, port: int, num_samples: int) -> Tuple[str, List[bytes]]:
    r = remote(host, port)
    r.recvuntil(b'> ')
    cblocks = []
    prefix_seen = None
    for _ in range(num_samples):
        r.sendline(b'S')
        chunk = r.recvuntil(b'|  > ', drop=False)
        pref, cbytes = parse_flag_block(chunk)
        if prefix_seen is None:
            prefix_seen = pref
        cblocks.append(cbytes)
    r.sendline(b'W')
    try:
        r.recv(timeout=0.2)
    except Exception:
        pass
    r.close()
    return prefix_seen, cblocks

# ---------------- Z3 construction ----------------

def split_blocks(cblocks: List[bytes]):
    N = len(cblocks[0])
    if any(len(cb) != N for cb in cblocks):
        raise ValueError("Inconsistent ciphertext lengths.")
    B = ceil(N / 8)
    last_len = N - 8*(B-1) if B > 0 else 0

    obs_blocks = []
    for cb in cblocks:
        blocks = []
        off = 0
        for b in range(B):
            L = 8 if b < B-1 else last_len
            blocks.append(cb[off:off+L])
            off += L
        obs_blocks.append(blocks)
    return N, B, last_len, obs_blocks

def top_aligned_u64(block_bytes: bytes) -> int:
    """Place these bytes at the TOP of a 64-bit word (big-endian block)."""
    L = len(block_bytes)
    v = int.from_bytes(block_bytes, 'big')
    shift = 8 * (8 - L)
    return (v << shift) & MASK64

def solve(cblocks: List[bytes]):
    T = len(cblocks)
    N, B, last_len, obs = split_blocks(cblocks)

    # Total emitted states across T presses: 3 per press.
    total_states = 3 * T

    s = Solver()
    a = BitVec('a', W)
    c = BitVec('c', W)
    S = [BitVec(f's_{i}', W) for i in range(total_states)]

    # Parameter shape: a ≡ 5 (mod 8); c odd
    s.add((a & 7) == 5)
    s.add((c & 1) == 1)

    # LCG transitions across all emitted states
    for i in range(total_states - 1):
        s.add(S[i+1] == a * S[i] + c)  # modulo 2^64 via bit-vector wrap

    # Plaintext invariance across samples:
    # For block b, top bytes of (S[3k+b] XOR C[k,b]) equal those of (S[b] XOR C[0,b]).
    for b in range(B):
        L = 8 if b < B-1 else last_len
        top_hi = 63
        top_lo = 64 - 8*L

        C0b = BitVecVal(top_aligned_u64(obs[0][b]), W)
        for k in range(T):
            Ckb = BitVecVal(top_aligned_u64(obs[k][b]), W)
            left  = S[3*k + b] ^ Ckb
            right = S[b]       ^ C0b
            if L == 8:
                s.add(left == right)
            else:
                s.add(Extract(top_hi, top_lo, left) == Extract(top_hi, top_lo, right))

    if s.check() != sat:
        return None

    m = s.model()
    a_val = m[a].as_long() & MASK64
    c_val = m[c].as_long() & MASK64

    # Recover plaintext bytes from the base sample k=0
    plain = bytearray()
    for b in range(B):
        L = 8 if b < B-1 else last_len
        Sb  = (m[S[b]].as_long() & MASK64)
        C0b = top_aligned_u64(obs[0][b])
        Pb64 = Sb ^ C0b
        pb = Pb64.to_bytes(8, 'big')[:L]  # take TOP L bytes
        plain += pb

    return a_val, c_val, bytes(plain)

# ---------------- main ----------------

def main():
    host = sys.argv[1] if len(sys.argv) > 1 else "34.252.33.37"
    port = int(sys.argv[2]) if len(sys.argv) > 2 else 32303
    samples = int(sys.argv[3]) if len(sys.argv) > 3 else 8

    print(f"[*] Connecting to {host}:{port} and collecting {samples} samples...")
    prefix, cblocks = collect_ciphertexts(host, port, samples)
    print(f"[*] Prefix detected: {prefix}")
    print(f"[*] Ciphertext length: {len(cblocks[0])} bytes ({len(cblocks[0])*2} hex chars)")

    res = solve(cblocks)
    if res is None:
        print("[!] UNSAT. Try increasing samples (e.g., 10–12).")
        sys.exit(2)

    a_val, c_val, plain = res
    true_flag = f"{prefix}{{{plain.hex()}}}"
    print(f"[+] a = 0x{a_val:016x}, c = 0x{c_val:016x}")
    print(f"[+] True flag recovered:\n{true_flag}")

if __name__ == "__main__":
    main()

References

Happy hacking!