Parsing Opencti Threat Intelligence To Produce Sbom-Like Incident Fingerprints
Written by
Vera Crypt
I got tired of treating “threat intelligence” like a blob of text—some feeds said “APT29”, others said “malware family”, and in practice my SOC (security operations) team still had to manually map incidents back to what our software actually shipped.
So I built a small pipeline that takes OpenCTI (an open-source Threat Intelligence platform) and turns selected indicators into deterministic incident fingerprints that look a lot like the structure of an SBOM (Software Bill of Materials—an inventory of what software components are in a release). The key idea: instead of “artifact labels,” I generate stable IDs derived from the indicator payloads, and I attach those fingerprints to build artifacts in a DevSecOps-friendly way.
This post walks through the exact parsing logic and code.
The niche problem I solved: “Indicator drift” breaks correlation
Threat intelligence feeds change constantly:
- domains are reworded
- IPs get reshuffled between attributes
- hashes are sometimes stored under different OpenCTI fields
- the same campaign gets re-labeled
In my logs and in our CI metadata, that causes “incident correlation drift”: the same underlying indicator effectively becomes a new item.
I wanted a way to create fingerprints that are:
- deterministic (same input → same fingerprint)
- deduplicated (different OpenCTI fields that mean the same thing should map to the same fingerprint)
- auditable (you can trace the fingerprint back to the raw indicator attributes)
What I implemented: deterministic “incident fingerprints” from OpenCTI
Target indicators
I focused on three indicator types that commonly appear in OpenCTI:
- Domain names
- IPv4 addresses
- File hashes (SHA-256 only, for consistency)
Fingerprint format
For each indicator, I build a canonical string like:
domain|example.comip|203.0.113.10hash|sha256|<hex>
Then I hash that canonical string with SHA-256 again to create the final fingerprint:
ti-fp-<first 32 hex chars>
This gives a short but collision-resistant token you can store in logs, tickets, and build metadata.
Working code: Python parser + OpenCTI query + fingerprinting
Prerequisites
- Python 3.10+
- Requests library:
pip install requests
Step 1: Configure environment variables
I keep OpenCTI connection details out of code:
export OPENCTI_URL="https://opencti.example.org" export OPENCTI_TOKEN="YOUR_OPENCTI_API_TOKEN" export OPENCTI_SCOPE_ORG="Your Organization Name"
Step 2: Use OpenCTI GraphQL API
OpenCTI exposes a GraphQL endpoint. The common pattern is:
- send a POST request to
/graphql - include
Authorization: Bearer <token> - run a query to fetch indicators
Here’s the full working script.
import os import re import json import hmac import hashlib import requests OPENCTI_URL = os.environ["OPENCTI_URL"].rstrip("/") OPENCTI_TOKEN = os.environ["OPENCTI_TOKEN"] GRAPHQL_ENDPOINT = f"{OPENCTI_URL}/graphql" # Optional: a stable secret can be used if you want fingerprints that # are not directly reversible (still deterministic per secret). # If not set, fingerprints are plain deterministic hashes of canonical strings. SECRET = os.environ.get("FINGERPRINT_SECRET", "").encode("utf-8") def sha256_hex(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def fingerprint(canonical: str) -> str: """ Build a deterministic fingerprint for a canonical indicator string. If SECRET is provided, use HMAC-SHA256 to avoid exposing canonical contents. """ if SECRET: digest = hmac.new(SECRET, canonical.encode("utf-8"), hashlib.sha256).hexdigest() else: digest = sha256_hex(canonical.encode("utf-8")) # Short, stable token for logs and metadata: return f"ti-fp-{digest[:32]}" def normalize_indicator_value(indicator_type: str, value: str) -> str | None: """ Normalize values to reduce field drift. Returns canonical payload for fingerprinting or None if invalid. """ v = value.strip() if indicator_type == "domain": # Normalize to lowercase, remove trailing dot. v = v.lower() if v.endswith("."): v = v[:-1] # Basic domain sanity check. if not re.fullmatch(r"([a-z0-9-]+\.)*[a-z0-9-]+", v): return None return v if indicator_type == "ipv4": # Strict IPv4 check. parts = v.split(".") if len(parts) != 4: return None try: nums = [int(p) for p in parts] except ValueError: return None if any(n < 0 or n > 255 for n in nums): return None return ".".join(str(n) for n in nums) if indicator_type == "sha256": v = v.lower() # SHA-256 should be 64 hex chars if not re.fullmatch(r"[0-9a-f]{64}", v): return None return v return None def canonical_string(kind: str, payload: str, hash_alg: str | None = None) -> str: if kind == "hash": return f"hash|{hash_alg}|{payload}" return f"{kind}|{payload}" def opencti_graphql(query: str, variables: dict) -> dict: headers = { "Authorization": f"Bearer {OPENCTI_TOKEN}", "Content-Type": "application/json", } resp = requests.post( GRAPHQL_ENDPOINT, headers=headers, data=json.dumps({"query": query, "variables": variables}), timeout=30, ) resp.raise_for_status() data = resp.json() if "errors" in data: raise RuntimeError(data["errors"]) return data["data"] def fetch_indicators(limit: int = 50) -> list[dict]: """ Fetch indicators from OpenCTI. I query for indicators and ask OpenCTI for fields that usually contain indicator “value” material. Exact schema can vary by OpenCTI version, but 'type' + 'pattern'/'value' + 'indicatorTypes' is common. This script intentionally focuses on a narrow set: - domain - ipv4-addr - sha256 """ query = """ query Indicators($first: Int) { indicators(first: $first) { edges { node { id entity_type name createdAt updatedAt pattern validFrom validUntil # Some OpenCTI versions store indicator type strings like: # "domain", "ipv4-addr", "sha256" indicator_type # Some store lists for indicator types: indicator_types # Some store raw values: value } } } } """ variables = {"first": limit} data = opencti_graphql(query, variables) edges = data["indicators"]["edges"] return [e["node"] for e in edges] def classify_and_canonicalize(node: dict) -> list[dict]: """ Convert an OpenCTI indicator node into one or more canonical items. OpenCTI might provide different fields; this function extracts and normalizes values reliably for the supported kinds. """ results = [] # Prefer explicit value; fallback to pattern; fallback to name. raw_value = node.get("value") or node.get("pattern") or node.get("name") if not raw_value: return results # Determine type: # Some versions: node['indicator_type'] (single) # Others: node['indicator_types'] (list) explicit_type = node.get("indicator_type") type_list = node.get("indicator_types") candidate_types = [] if explicit_type: candidate_types.append(explicit_type) if isinstance(type_list, list): candidate_types.extend(type_list) # If still empty, heuristically infer from the raw_value. if not candidate_types: if re.fullmatch(r"[0-9a-fA-F]{64}", str(raw_value).strip()): candidate_types = ["sha256"] elif re.fullmatch(r"(\d{1,3}\.){3}\d{1,3}", str(raw_value).strip()): candidate_types = ["ipv4-addr"] else: candidate_types = ["domain"] # Map OpenCTI types to our normalized kind labels. type_map = { "domain": "domain", "domain-name": "domain", "ipv4-addr": "ipv4", "ip": "ipv4", "sha256": "sha256", "file:hashes:SHA256": "sha256", } normalized_value_str = str(raw_value) for t in candidate_types: mapped = type_map.get(str(t).lower()) if not mapped: continue if mapped == "domain": canon = normalize_indicator_value("domain", normalized_value_str) if canon: c = canonical_string("domain", canon) results.append( { "opencti_indicator_id": node["id"], "kind": "domain", "canonical": canon, "fingerprint": fingerprint(c), "source_fields": {"value": node.get("value"), "pattern": node.get("pattern"), "name": node.get("name")}, } ) elif mapped == "ipv4": canon = normalize_indicator_value("ipv4", normalized_value_str) if canon: c = canonical_string("ip", canon) results.append( { "opencti_indicator_id": node["id"], "kind": "ipv4", "canonical": canon, "fingerprint": fingerprint(c), "source_fields": {"value": node.get("value"), "pattern": node.get("pattern"), "name": node.get("name")}, } ) elif mapped == "sha256": canon = normalize_indicator_value("sha256", normalized_value_str) if canon: c = canonical_string("hash", canon, hash_alg="sha256") results.append( { "opencti_indicator_id": node["id"], "kind": "sha256", "canonical": canon, "fingerprint": fingerprint(c), "source_fields": {"value": node.get("value"), "pattern": node.get("pattern"), "name": node.get("name")}, } ) return results def main(): indicators = fetch_indicators(limit=25) fingerprints = [] for node in indicators: items = classify_and_canonicalize(node) fingerprints.extend(items) # Deduplicate: fingerprint+kind uniq = {} for item in fingerprints: key = (item["kind"], item["fingerprint"]) uniq[key] = item out = { "count_input_indicators": len(indicators), "count_fingerprinted_items": len(uniq), "items": list(uniq.values()), } print(json.dumps(out, indent=2)) if __name__ == "__main__": main()
How to run it
- Set environment variables:
export OPENCTI_URL="https://opencti.example.org" export OPENCTI_TOKEN="YOUR_OPENCTI_API_TOKEN" # Optional: # export FINGERPRINT_SECRET="some-stable-secret"
- Run:
python opencti_fingerprint.py
What happens when you run this (a concrete example)
Given OpenCTI indicator nodes like:
- type:
domain, value:Example.com. - type:
ipv4-addr, value:203.0.113.010 - type:
sha256, value:A3...FF(64 hex chars)
The script will:
- Normalize domains:
Example.com.→example.com
- Normalize IPv4:
203.0.113.010→203.0.113.10
- Normalize hashes:
- uppercase hex → lowercase hex
- Build canonical strings:
domain|example.comip|203.0.113.10hash|sha256|a3...ff
- Compute deterministic fingerprints and deduplicate.
The output item looks like:
{ "opencti_indicator_id": "indicator--1234", "kind": "domain", "canonical": "example.com", "fingerprint": "ti-fp-9b2c6d4a1c0f3e7a2f5b8d9c1a4e6f70", "source_fields": { "value": "Example.com.", "pattern": null, "name": "Example.com" } }
Why this helps in DevSecOps and Zero Trust-style controls
In a Zero Trust model, the “trust” decision is often automated and metadata-driven. When I attach these fingerprints to:
- build logs,
- artifact metadata,
- deployment audits,
- and incident response events,
…correlation becomes resilient against CTI labeling drift.
It’s also naturally compatible with DevSecOps practices: fingerprints are just structured data you can persist and query in pipelines, rather than fragile text from feeds.
Practical notes I learned while building it
- Canonicalization matters more than hashing. Normalizing domains and IPv4 formats eliminated a surprising amount of “duplicate but not equal” fingerprints.
- Lock down the indicator field you treat as truth. I used
value → pattern → namefallback to handle real-world schema inconsistency. - Deduplicate after fingerprinting. OpenCTI can store multiple indicator representations; dedupe ensures you get stable counts.
- Consider HMAC with a secret. If you store canonical values somewhere sensitive, HMAC-based fingerprints prevent direct inference of the canonical string.
Conclusion
I built a deterministic pipeline that reads selected OpenCTI indicator attributes, normalizes them, and generates SBOM-like “incident fingerprints” that stay stable across indicator drift. The big takeaway: for threat intelligence to be operational, it needs canonical, queryable identifiers—so correlation survives label changes, field reshaping, and feed churn while still remaining traceable back to the original indicator.