import cv2 import numpy as np import hashlib import argparse ID_HEX_LEN = 16 ID_BITS = 64 CRC_BITS = 8 TOTAL_BITS = ID_BITS + CRC_BITS REDUNDANT_ROWS = 3 def sha256_file(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def short_id_from_file(path: str) -> str: return sha256_file(path)[-ID_HEX_LEN:] def crc8_bytes(data: bytes, poly: int = 0x07, init: int = 0x00) -> int: crc = init for b in data: crc ^= b for _ in range(8): if crc & 0x80: crc = ((crc << 1) ^ poly) & 0xFF else: crc = (crc << 1) & 0xFF return crc & 0xFF def id_hex_to_bits_with_crc(id_hex: str) -> list[int]: if len(id_hex) != ID_HEX_LEN: raise ValueError(f"ID muss {ID_HEX_LEN} Hex-Zeichen lang sein.") payload = bytes.fromhex(id_hex) crc = crc8_bytes(payload) full_hex = id_hex + f"{crc:02x}" bits = [] for ch in full_hex: v = int(ch, 16) bits.extend([(v >> 3) & 1, (v >> 2) & 1, (v >> 1) & 1, v & 1]) return bits def bits_to_hex(bits: list[int]) -> str: if len(bits) % 4 != 0: raise ValueError("Bitanzahl muss durch 4 teilbar sein.") out = [] for i in range(0, len(bits), 4): v = (bits[i] << 3) | (bits[i + 1] << 2) | (bits[i + 2] << 1) | bits[i + 3] out.append(format(v, "x")) return "".join(out) def verify_crc_from_bits(bits: list[int]) -> tuple[bool, str, str]: full_hex = bits_to_hex(bits) id_hex = full_hex[:ID_HEX_LEN] crc_hex = full_hex[ID_HEX_LEN:ID_HEX_LEN + 2] expected_crc = crc8_bytes(bytes.fromhex(id_hex)) ok = crc_hex.lower() == f"{expected_crc:02x}" return ok, id_hex, crc_hex.lower() def get_embed_region(width: int, height: int) -> tuple[int, int, int, int]: # etwas breiter als die 32-Bit-Version x0 = int(round(width * 0.50)) x1 = int(round(width * 0.92)) y0 = int(round(height * 0.46)) y1 = int(round(height * 0.80)) if x1 <= x0 or y1 <= y0: raise RuntimeError("Ungültige Einbettungsregion.") return x0, y0, x1, y1 def get_grid_geometry(width: int, height: int): x0, y0, x1, y1 = get_embed_region(width, height) region_w = x1 - x0 region_h = y1 - y0 row_h = region_h / REDUNDANT_ROWS col_w = region_w / TOTAL_BITS pair_dx = max(1, int(round(col_w * 0.18))) patch_rx = max(2, int(round(col_w * 0.16))) patch_ry = max(2, int(round(row_h * 0.16))) return { "x0": x0, "y0": y0, "x1": x1, "y1": y1, "region_w": region_w, "region_h": region_h, "row_h": row_h, "col_w": col_w, "pair_dx": pair_dx, "patch_rx": patch_rx, "patch_ry": patch_ry, } def pair_positions(bit_index: int, row_index: int, width: int, height: int): g = get_grid_geometry(width, height) cx = g["x0"] + (bit_index + 0.5) * g["col_w"] cy = g["y0"] + (row_index + 0.5) * g["row_h"] xl = int(round(cx - g["pair_dx"])) xr = int(round(cx + g["pair_dx"])) y = int(round(cy)) return xl, xr, y def clamp_patch(x: int, y: int, rx: int, ry: int, w: int, h: int): x0 = max(0, x - rx) x1 = min(w, x + rx + 1) y0 = max(0, y - ry) y1 = min(h, y + ry + 1) return x0, y0, x1, y1 def patch_mean(ch: np.ndarray, x: int, y: int, rx: int, ry: int) -> float: h, w = ch.shape x0, y0, x1, y1 = clamp_patch(x, y, rx, ry, w, h) roi = ch[y0:y1, x0:x1] if roi.size == 0: return 0.0 return float(np.mean(roi)) def write_patch_delta(ch: np.ndarray, x: int, y: int, rx: int, ry: int, delta: int): h, w = ch.shape x0, y0, x1, y1 = clamp_patch(x, y, rx, ry, w, h) roi = ch[y0:y1, x0:x1].astype(np.int16) roi = np.clip(roi + delta, 0, 255).astype(np.uint8) ch[y0:y1, x0:x1] = roi def encode_image(input_path: str, output_path: str, delta: int = 10): bgr = cv2.imread(input_path, cv2.IMREAD_COLOR) if bgr is None: raise RuntimeError(f"Bild konnte nicht geladen werden: {input_path}") h, w = bgr.shape[:2] short_id = short_id_from_file(input_path) bits = id_hex_to_bits_with_crc(short_id) ycrcb = cv2.cvtColor(bgr, cv2.COLOR_BGR2YCrCb) y, cr, cb = cv2.split(ycrcb) g = get_grid_geometry(w, h) rx = g["patch_rx"] ry = g["patch_ry"] for row in range(REDUNDANT_ROWS): for bit_index, bit in enumerate(bits): xl, xr, yy = pair_positions(bit_index, row, w, h) if bit == 1: write_patch_delta(cb, xl, yy, rx, ry, +delta) write_patch_delta(cb, xr, yy, rx, ry, -delta) else: write_patch_delta(cb, xl, yy, rx, ry, -delta) write_patch_delta(cb, xr, yy, rx, ry, +delta) merged = cv2.merge([y, cr, cb]) out = cv2.cvtColor(merged, cv2.COLOR_YCrCb2BGR) ok = cv2.imwrite(output_path, out) if not ok: raise RuntimeError(f"Bild konnte nicht gespeichert werden: {output_path}") print("SHA256 Originalbild :", sha256_file(input_path)) print("Eingebettete ID :", short_id) print("Bits+CRC :", bits_to_hex(bits)) print("Region :", get_embed_region(w, h)) print("Gespeichert :", output_path) def decode_image(input_path: str, debug_path: str | None = None): bgr = cv2.imread(input_path, cv2.IMREAD_COLOR) if bgr is None: raise RuntimeError(f"Bild konnte nicht geladen werden: {input_path}") h, w = bgr.shape[:2] ycrcb = cv2.cvtColor(bgr, cv2.COLOR_BGR2YCrCb) _, _, cb = cv2.split(ycrcb) cb = cv2.GaussianBlur(cb, (3, 3), 0) g = get_grid_geometry(w, h) rx = g["patch_rx"] ry = g["patch_ry"] y_search = max(1, int(round(g["row_h"] * 0.08))) row_bits = [] row_strengths = [] row_positions = [] for row in range(REDUNDANT_ROWS): bits = [] strengths = [] positions = [] for bit_index in range(TOTAL_BITS): xl, xr, yy = pair_positions(bit_index, row, w, h) best_diff = None best_abs = -1.0 best_y = yy for ys in range(yy - y_search, yy + y_search + 1): lv = patch_mean(cb, xl, ys, rx, ry) rv = patch_mean(cb, xr, ys, rx, ry) diff = lv - rv strength = abs(diff) if strength > best_abs: best_abs = strength best_diff = diff best_y = ys bit = 1 if best_diff > 0 else 0 bits.append(bit) strengths.append(best_abs) positions.append((xl, xr, best_y)) row_bits.append(bits) row_strengths.append(strengths) row_positions.append(positions) final_bits = [] confidences = [] for i in range(TOTAL_BITS): votes_1 = 0 votes_0 = 0 strength_1 = 0.0 strength_0 = 0.0 for row in range(REDUNDANT_ROWS): b = row_bits[row][i] s = row_strengths[row][i] if b == 1: votes_1 += 1 strength_1 += s else: votes_0 += 1 strength_0 += s if votes_1 > votes_0: final_bits.append(1) confidences.append(strength_1 - strength_0) elif votes_0 > votes_1: final_bits.append(0) confidences.append(strength_0 - strength_1) else: if strength_1 >= strength_0: final_bits.append(1) confidences.append(strength_1 - strength_0) else: final_bits.append(0) confidences.append(strength_0 - strength_1) ok, id_hex, crc_hex = verify_crc_from_bits(final_bits) print("Row 1 :", bits_to_hex(row_bits[0])) print("Row 2 :", bits_to_hex(row_bits[1])) print("Row 3 :", bits_to_hex(row_bits[2])) print("Final :", bits_to_hex(final_bits)) print("Ausgelesene ID :", id_hex) print("CRC :", crc_hex) print("CRC ok :", ok) print("Confidence :", [round(c, 2) for c in confidences]) if debug_path: dbg = bgr.copy() x0, y0, x1, y1 = get_embed_region(w, h) cv2.rectangle(dbg, (x0, y0), (x1, y1), (0, 255, 255), 1) for row in range(REDUNDANT_ROWS): for bit_index in range(TOTAL_BITS): xl, xr, yy = row_positions[row][bit_index] cv2.circle(dbg, (xl, yy), 2, (0, 255, 0), -1) cv2.circle(dbg, (xr, yy), 2, (0, 0, 255), -1) cv2.imwrite(debug_path, dbg) print("Debug-Bild :", debug_path) return ok, id_hex def verify_image(marked_image_path: str, original_image_path: str, debug_path: str | None = None): ok_crc, decoded_id = decode_image(marked_image_path, debug_path=debug_path) expected_id = short_id_from_file(original_image_path) print("Erwartete ID :", expected_id) print("ID Match :", decoded_id.lower() == expected_id.lower()) print("Gesamt ok :", ok_crc and decoded_id.lower() == expected_id.lower()) return ok_crc and decoded_id.lower() == expected_id.lower() def main(): parser = argparse.ArgumentParser() sub = parser.add_subparsers(dest="cmd", required=True) enc = sub.add_parser("encode") enc.add_argument("input") enc.add_argument("output") enc.add_argument("--delta", type=int, default=10) dec = sub.add_parser("decode") dec.add_argument("input") dec.add_argument("--debug", default=None) ver = sub.add_parser("verify") ver.add_argument("marked") ver.add_argument("original") ver.add_argument("--debug", default=None) args = parser.parse_args() if args.cmd == "encode": encode_image(args.input, args.output, delta=args.delta) elif args.cmd == "decode": decode_image(args.input, debug_path=args.debug) elif args.cmd == "verify": verify_image(args.marked, args.original, debug_path=args.debug) if __name__ == "__main__": main()