Published on

Hatagawa - BlackhatMEA 2025 CTF (Quals)

Authors
blackhat

Hello! everyone, a Web/Cryptography CTF player from the Team HasHash. I’ll be sharing my solutions for the crypto challenges I solved in this CTF. I hope you find them interesting!

Hatagawa

Category: Cryptography
Description: It is said that once in a while, flags might float by when watching over the Hatagawa.

First look at the code they provided to us.

Challenge Script:

#!/usr/bin/env python3
#
# BlackHat MEA CTF 2025 Qualifiers :: Hatagawa I
#
#

# Documentation imports
from __future__ import annotations
from typing import Tuple, List, Dict, NewType, Union

# Native imports
from secrets import randbelow
import os

# External dependencies
# None

# Flag import
FLAG = os.environ.get('DYN_FLAG', 'BHFlagY{this_is_a_fake_flag}')
if isinstance(FLAG, str):
    FLAG = FLAG.encode()


# Helper functions
# None


# Challenge classes
class Kawa:
    """ 川 """
    def __init__(self, par: Tuple[int], seed: int) -> None:
        self.a, self.c, self.m = par
        self.x = seed

    def Get(self) -> bytes:
        """ Generates and outputs the next internal state as bytes. """
        self.x = (self.a * self.x + self.c) & self.m
        return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big')

class Hata:
    """ 旗 """
    def __init__(self, entropy: object) -> None:
        self.entropy = entropy

    def Encrypt(self, msg: bytes) -> bytes:
        """ Encrypts a message using a one-time pad generated from entropy source. """
        otp = b''
        while len(otp) < len(msg):
            otp += self.entropy.Get()
        return bytes([x ^ y for x,y in zip(msg, otp)])


# Main loop
if __name__ == "__main__":

    # Challenge parameters
    MOD = 2**64 - 1
    MUL = (randbelow(MOD >> 3) << 3) | 5
    ADD = randbelow(MOD) | 1


    # Challenge setup
    hatagawa = Hata(Kawa((MUL, ADD, MOD), randbelow(MOD)))

    RIVER = r"""|
|  ////\\\,,\///,,,,,\,/,\\,//////,\,,\\\,,\\\/,,,\,,//,\\\,
|   ~ ~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~~ ~  ~~~    ~~~ ~~~ ~~   ~~
|    ~~~~~~~  ~~~~~~   ~~~ ~ ~~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~
|   ~~~ {} ~
|    ~~~  ~   ~~~~~  ~~~~ ~ ~~~~   ~~~~~ ~~~~   ~~~ ~ ~~~~~
|   ~~~ ~  ~~~~~  ~  ~~  ~  ~~~~ ~~~ ~~   ~~ ~~~~~~~ ~ ~~
|  \\\\\'''\\'////'//'\''\\\/'''\''//'\\\''\///''''\'/'\\'//"""
    print(RIVER.format(' '*21 + '旗    川' + ' '*21))


    # Main loop
    userOptions = ['Stay a while...']
    TUI = "|\n|  Menu:\n|    " + "\n|    ".join('[' + i[0] + ']' + i[1:] for i in userOptions) + "\n|    [W]alk away\n|"

    while True:
        try:

            print(TUI)
            choice = input('|  > ').lower()

            # [W]alk away
            if choice == 'w':
                print("|\n|  [~] You turn your back to the river...\n|")
                break

            # [S]tay a while...
            elif choice == 's':

                print("|\n|  [~] Look! Is that a flag floating by?")
                print(RIVER.format(hatagawa.Encrypt(FLAG).hex()))

            else:
                print("|\n|  [!] Invalid choice.")

        except KeyboardInterrupt:
            print("\n|\n|  [~] Goodbye ~ !\n|")
            break

        except Exception as e:
            print('|\n|  [!] {}'.format(e))

The code may seem big, but it’s quite straightforward. Let’s break it down into Crutial Snippets of the code.

Crutial Snippets

  1. 64-bit LCG update (wraps at 2^64)
self.x = (self.a * self.x + self.c) & self.m   # self.m = 2**64 - 1
  • This is a linear congruential generator (LCG).
  • & (2**64 - 1) keeps only the lowest 64 bits ⇒ arithmetic is effectively mod 2^64.
  • Mod 2^64 is special: you can strip common powers of two, then invert the odd part easily.
  1. The keystream is the full state
return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big')  # 8 bytes
  • Every block leaks the entire internal 64-bit state as 8 bytes.
  • If you know 8 bytes of plaintext at that position, XOR gives you the exact state used there.
  1. Parameter shapes help the attack
MUL = (randbelow(MOD >> 3) << 3) | 5   # ⇒ a ≡ 5 (mod 8)  (also a ≡ 1 (mod 4))
ADD = randbelow(MOD) | 1               # ⇒ c is odd
  • These fixed properties (a mod 8, c odd) shrink the search and make lifting consistent.
  1. OTP = concatenated states; PRNG never resets
otp = b''
while len(otp) < len(msg):
    otp += self.entropy.Get()  # 8 bytes per call; advance LCG each time
  • Press S again and again: the generator continues; it does not reseed.
  • Each press encrypts the same flag but with the *next states.
  1. Known prefix reveals start state of each slice
# first 8 ciphertext bytes of slice i:
# ct_i[:8] = b"BHFlagY{" ^ p64(z_i)
z_i = int.from_bytes(bytes(ci ^ pi for ci, pi in zip(ct_i[:8], b"BHFlagY{")), "big")
  • Flags start with BHFlagY{ (8 bytes). XORing the first 8 ciphertext bytes with this prefix gives z_i, the state at the start of slice i.
  1. Slice starts follow a step-n LCG

Let one slice use n = ceil(len(flag)/8) states. Then the starts z_i obey:

z_{i+1} = A*z_i + c*S   (mod 2^64)
A = a^n (mod 2^64)
S = 1 + a + a^2 + ... + a^{n-1} (mod 2^64)
  • With ≥ 3 slices (z_0, z_1, z_2), we can recover A from differences.

  • With A we lift all valid a (respect a ≡ 5 (mod 8)), then solve for c (odd), and decrypt.

Vulnerability / Bug

  • Wrong modulus assumption: Using & (2**64-1) is effectively mod 2^64, not mod 2^64-1.

  • State leak: The keystream block is the state (8 bytes) → maximum leakage with any known plaintext.

  • No reseed/reset: Each S uses the same PRNG instance, advancing forward. Slices form a step-n LCG you can solve.

  • Predictable prefix: Flag starts with BHFlagY{ (8 bytes) → gives you each slice’s start state instantly}.

  • Parameter constraints: a ≡ 5 (mod 8), c odd → recovery becomes deterministic and fast.

My Approach

Goal: press S many times, recover the start states z_i, solve the step-n LCG, decrypt.

Step 1 — Collect slices

  • Open the connection to netcat instance, press S around 10–14 times.
  • Each time, parse the big hex line to get the ciphertext slice.

Step 2 — Recover start states z_i

  • Use the known prefix P = b"BHFlagY{":
    • z_i = u64( ct_i[:8] ^ P )

Step 3 — Recover the step multiplier A = a^n (mod 2^64)

  • For triples (z0, z1, z2):

    • d1 = z1 - z0 and d2 = z2 - z1 (mod 2^64)
    • Let v = min(trailing_zeros(d1), trailing_zeros(d2))
    • Divide both by 2^v so they become odd, then compute A ≡ d2' * inv(d1') (mod 2^(64 - v))
    • This yields a small set of candidates for A (the top v bits are free).
  • Do this for several triples and intersect the candidate sets ⇒ usually one A.

Step 4 — Lift all valid a from A

  • We need a^n ≡ A (mod 2^64) and a ≡ 5 (mod 8).

  • Start with candidates {5} modulo 8.

  • For each bit position t = 3..63:

    • Work modulo 2^(t+1).
    • Try adding 0 or 1<<t to each candidate; keep only those where (a_try^n) ≡ A (mod 2^(t+1)).
  • The survivors are the full 64-bit a values allowed by the constraints.

Step 5 — Solve for c (must be odd)

  • Compute S = 1 + a + a^2 + ... + a^{n-1} (mod 2^64).

  • Let vS = trailing_zeros(S). If B = z_{i+1} − A*z_i is not divisible by 2^{vS}, discard this a.

  • Otherwise:

    • Reduce: B' = B / 2^{vS}, S' = S / 2^{vS} (now S' is odd).
    • Invert: c ≡ B' * inv(S') (mod 2^(64 - vS))
    • Lift to 64 bits: there are 2^{vS} possible c; keep only odd ones.
  • Verify the candidate (a, c) across all pairs (z_i, z_{i+1}).

Step 6 — Decrypt

  • For any slice, regenerate its OTP from start state and (a, c) and XOR.
  • You should see BHFlagY{...}.

Why this is works reliably: Everything happens mod 2^64 with tiny candidate sets. The constraints (prefix, a ≡ 5 (mod 8), c odd) cut the space down hard. Cross-pair checks remove false positives.

Final Solution

Let's construct the final solution script:

#!/usr/bin/env python3

import re
import socket
import sys
from typing import List, Set

HOST_DEFAULT = "34.252.33.37"
PORT_DEFAULT = 30706
TAKE_SLICES = 14  # grab plenty of consecutive slices in one session
DEFAULT_PREFIX = b"BHFlagY{"  # 8-byte known prefix from challenge

MASK64 = (1 << 64) - 1
HEX_RE = re.compile(rb"\b([0-9a-f]{32,})\b", re.I)

# -------------------- small helpers --------------------

def tz(x: int) -> int:
    if x == 0:
        return 64
    return (x & -x).bit_length() - 1

def inv_odd_mod_2k(a: int, k: int) -> int:
    assert a & 1
    x = 1
    for i in range(1, k):
        mod = 1 << (i + 1)
        x = (x * (2 - (a * x) % mod)) % mod
    return x % (1 << k)

def u64(b: bytes) -> int:
    return int.from_bytes(b, "big")

def p64(x: int) -> bytes:
    return (x & MASK64).to_bytes(8, "big")

def geom_sum_mod(a: int, n: int, M: int) -> int:
    s, term = 0, 1 % M
    for _ in range(n):
        s = (s + term) % M
        term = (term * a) % M
    return s

# -------------------- I/O with server --------------------

def recv_until(sock: socket.socket, needle: bytes, max_bytes: int = 1_000_000) -> bytes:
    buf = bytearray()
    while True:
        chunk = sock.recv(4096)
        if not chunk:
            break
        buf += chunk
        if needle in buf or len(buf) >= max_bytes:
            break
    return bytes(buf)

def get_ciphertexts(host: str, port: int, count: int) -> List[bytes]:
    s = socket.create_connection((host, port))
    recv_until(s, b"Menu:")
    ctexts = []
    for _ in range(count):
        s.sendall(b"S\n")
        data = recv_until(s, b"Menu:")
        m_all = HEX_RE.findall(data)
        if not m_all:
            s.sendall(b"W\n")
            s.close()
            raise RuntimeError("No ciphertext hex found in server output.")
        hex_blob = max(m_all, key=len)
        ctexts.append(bytes.fromhex(hex_blob.decode()))
    s.sendall(b"W\n")
    s.close()
    return ctexts

# -------------------- core recovery --------------------

def recover_stepn_A_candidates(z0: int, z1: int, z2: int) -> Set[int]:
    """Return all 64-bit A candidates from z0->z1->z2 under modulus 2^64."""
    d1 = (z1 - z0) & MASK64
    d2 = (z2 - z1) & MASK64
    if d1 == 0 or d2 == 0:
        return set()
    v = min(tz(d1), tz(d2))
    k = 64 - v
    d1p = d1 >> v
    d2p = d2 >> v
    inv = inv_odd_mod_2k(d1p, k)
    A_mod = (d2p * inv) % (1 << k)
    return { (A_mod + (t << k)) & MASK64 for t in range(1 << v) }

def lift_all_a_from_A(A: int, n: int) -> Set[int]:
    """
    All a (64-bit) such that a^n == A (mod 2^64), with a ≡ 5 (mod 8).
    Lift bit-by-bit: keep the extensions that match pow(a,n,M) at each M=2^(t+1).
    """
    cands = {5}  # per challenge MUL = (..<<3)|5 => a ≡ 5 (mod 8)
    for t in range(3, 64):
        M = 1 << (t + 1)
        tgt = A & (M - 1)
        new = set()
        for a in cands:
            a0 = a % M
            # next bit can be 0 or 1
            for bit in (0, 1):
                a_try = (a0 + (bit << t)) % M
                if pow(a_try, n, M) == tgt:
                    new.add(a_try)
        if not new:
            return set()
        cands = new
    return cands  # full 64-bit values

def recover_c_lifts(a: int, z0: int, z1: int, n: int) -> List[int]:
    """All odd 64-bit c consistent with z1 = a^n*z0 + c*S (mod 2^64)."""
    M = 1 << 64
    A = pow(a, n, M)
    S = geom_sum_mod(a, n, M)
    vS = tz(S)
    B = (z1 - (A * z0) % M) % M
    if vS > 0 and (B & ((1 << vS) - 1)) != 0:
        return []
    k = 64 - vS
    if k == 0:
        return []
    S_red = (S >> vS) % (1 << k)
    if (S_red & 1) == 0:
        return []  # should be odd after removing all 2s
    invS = inv_odd_mod_2k(S_red, k)
    c_base = ((B >> vS) * invS) % (1 << k)
    cands = []
    for t in range(1 << vS):
        c = (c_base + (t << k)) & MASK64
        if c & 1:  # ADD was chosen odd
            cands.append(c)
    return cands

def stepn_from_z(z: int, a: int, c: int, n: int) -> int:
    """Compute z' = A*z + c*S (mod 2^64) using fast pow/sum."""
    M = 1 << 64
    A = pow(a, n, M)
    S = geom_sum_mod(a, n, M)
    return (A * z + (c * S) % M) & MASK64

def decrypt_first_slice(ct: bytes, a: int, c: int, z0: int) -> bytes:
    L = len(ct)
    n = (L + 7) // 8
    s = z0
    otp = bytearray()
    for _ in range(n):
        otp += p64(s)
        s = (a * s + c) & MASK64
    return bytes(ci ^ oi for ci, oi in zip(ct, otp[:L]))

def solve_instance(host: str, port: int, count: int, prefix: bytes):
    if len(prefix) != 8:
        raise RuntimeError("Flag prefix must be exactly 8 bytes (e.g., BHFlagY{).")

    ctexts = get_ciphertexts(host, port, count=count)
    L = len(ctexts[0])
    n = (L + 7) // 8

    # Recover start-of-slice states z_i from first 8 bytes using known prefix.
    z = [ u64(bytes(ci ^ pi for ci, pi in zip(ct[:8], prefix))) for ct in ctexts ]
    if len(z) < 3:
        raise RuntimeError("Need at least 3 slices to solve.")

    # Intersect A candidates from multiple triples to prune hard.
    A_set = None
    triples = min(len(z) - 2, 6)  # use up to 6 triples
    for i in range(triples):
        Ai = recover_stepn_A_candidates(z[i], z[i+1], z[i+2])
        if not Ai:
            continue
        A_set = Ai if A_set is None else (A_set & Ai)
        if A_set and len(A_set) == 1:
            break
    if not A_set:
        raise RuntimeError("Failed to recover step-n multiplier A.")

    # Try each A, lift all 'a', solve 'c', verify across all pairs, decrypt.
    for A in A_set:
        a_candidates = lift_all_a_from_A(A, n)
        for a in a_candidates:
            # c must be consistent across ALL pairs modulo 2^(64 - v2(S))
            M = 1 << 64
            S = geom_sum_mod(a, n, M)
            vS = tz(S)
            k = 64 - vS
            if k == 0:
                continue
            S_red = (S >> vS) % (1 << k)
            if (S_red & 1) == 0:
                continue
            invS = inv_odd_mod_2k(S_red, k)
            Achk = pow(a, n, M)

            # derive c_hat from each pair and require equality
            c_hat = None
            consistent = True
            for i in range(len(z) - 1):
                B = ((z[i+1] - (Achk * z[i]) % M) % M)
                if vS > 0 and (B & ((1 << vS) - 1)) != 0:
                    consistent = False
                    break
                cur = ((B >> vS) * invS) % (1 << k)
                if c_hat is None:
                    c_hat = cur
                elif cur != c_hat:
                    consistent = False
                    break
            if not consistent:
                continue
            if k >= 1 and (c_hat & 1) == 0:
                continue  # c must be odd

            # enumerate the vS lifts of c and fully verify transitions
            c_cands = [ ((c_hat + (t << k)) & MASK64) for t in range(1 << vS) if ((c_hat + (t << k)) & 1) ]
            for c in c_cands:
                ok = True
                for i in range(len(z) - 1):
                    if stepn_from_z(z[i], a, c, n) != z[i+1]:
                        ok = False
                        break
                if not ok:
                    continue

                pt = decrypt_first_slice(ctexts[0], a, c, z[0])
                if pt.startswith(prefix) and pt.endswith(b"}"):
                    print(pt.decode(errors="replace"))
                    return
                # relaxed: brace could be inside later
                if pt.startswith(prefix) and b"}" in pt:
                    print(pt.decode(errors="replace"))
                    return

    raise RuntimeError("Parameters found, but no candidate produced a valid-looking flag. "
                       "Try increasing TAKE_SLICES or confirm the 8-byte prefix.")

# -------------------- entry --------------------

if __name__ == "__main__":
    host = HOST_DEFAULT
    port = PORT_DEFAULT
    if len(sys.argv) >= 3:
        host = sys.argv[1]
        port = int(sys.argv[2])
    prefix = DEFAULT_PREFIX
    if len(sys.argv) >= 4:
        prefix = sys.argv[3].encode()

    try:
        solve_instance(host, port, TAKE_SLICES, prefix)
    except Exception as e:
        print(f"[!] {e}")
        sys.exit(1)

Happy Hacking!

References