The Intelligence Pipeline

NDIC permits → OCR → Extraction → Snowflake → Cortex Search
2,171 wells tracked 338 documents indexed in Cortex 9 pipeline stages Cron: daily 2 AM MT 2 AI models per well ● PRODUCTION SYSTEM — RUNNING DAILY
0
NDIC Source
1
Identify
2
PDF Fetch
3
OCR
4
Grok Extract
5
Haiku Intel
6
Snowflake
7
Cortex Search
8
Web Delivery
Overview 0: Source 1: Identify 2: PDF 3: OCR 4: Grok 5: Haiku 6: Snowflake 7: Cortex 8: Deploy Tools

Overview

The complete automation architecture

This platform automates the full intelligence lifecycle for North Dakota oil & gas permit activity. Daily permit filings from the NDIC are ingested, converted from scanned PDFs to structured data using AI, stored in Snowflake for analytics, and made searchable via Snowflake's Cortex Search service. The entire pipeline runs unattended on a cron schedule. This page documents every stage with real code and data shapes — built to be replicable.

Data Ingestion

Stages 0–2: NDIC source, permit identification, PDF download

Text Extraction

Stage 3: Google Cloud Vision OCR on scanned permit PDFs

AI Intelligence

Stages 4–5: Dual-model extraction — Grok + Claude Haiku

Analytics Layer

Stage 6: Snowflake data warehouse, views, and normalization

Semantic Search

Stage 7: Cortex Search index over 338 regulatory documents

Web Delivery

Stage 8: S3 + CloudFront + Lambda for live queries

📁
Main orchestrator: scripts/orchestrate_wellfile_pipeline.py — a single Python script that runs all stages sequentially with full logging, SQLite state tracking, and graceful error handling. Pass --no-haiku to skip Stage 5.
0

Data Source

NDIC daily permit feed → nd_daily.json

The North Dakota Industrial Commission (NDIC) publishes daily oil and gas permit activity. We mirror this as a static nd_daily.json served from S3. The file covers 2,171 wells across 8 permit status sections — everything from newly active wells to confidential permits and plugged holes.

Input
NDIC public web interface / daily updated permit database
Process
Daily sync script parses 8 permit sections, serializes to JSON
Output
website/nd_daily.json — 2,171 wells, ~1.2 MB
nd_daily.json — data structure
{
  "data": {
    "active": [
      {
        "wellNumber": "W41156",
        "operator":   "Slawson Petroleum Company",
        "wellName":   "BEAR DEN FEDERAL 1-19H",
        "county":     "Mountrail",
        "formation":  "Bakken",
        "permitDate": "2025-01-15"
      }
      // ... 2,171 total entries across 8 sections
    ],
    "conf":                  [...], // confidential
    "approved":              [...], // approved, not yet spudded
    "approved_confidential": [...],
    "conf_plugged_or_prod":  [...],
    "released_confidential": [...],
    "plugged":               [...],
    "other":                 [...]
  }
}
NDIC Portal Python requests JSON AWS S3
1

Permit Identification

What's new? Cross-reference against local state.

The orchestrator scans nd_daily.json for permits issued in the last 7 days (configurable), then cross-checks against a local SQLite database to skip wells already processed. This makes every run idempotent — re-running the pipeline never double-processes a well. Maximum 100 wells per run to prevent rate-limit issues downstream.

Input
nd_daily.json (all 8 sections)
nd_signals.db — SQLite tracking DB
Process
Filter by permit date (last 7 days)
Dedup against wellfile_fetches table
Cap list at 100 per run
Output
Ordered list of new well numbers
e.g. ["W41156", "W41157", "W41435"]
orchestrate_wellfile_pipeline.py — Stage 1
def identify_new_permits(self):
    cutoff = datetime.now() - timedelta(days=CONFIG['lookback_days'])
    processed = self.db.get_processed_wells()  # set from SQLite
    new_wells = []

    for section in nd_daily['data'].values():
        for well in section:
            permit_dt = datetime.strptime(well['permitDate'], '%Y-%m-%d')
            if permit_dt >= cutoff and well['wellNumber'] not in processed:
                new_wells.append(well['wellNumber'])

    return new_wells[:CONFIG['max_per_run']]  # cap at 100


# Cron schedule — macOS launchd / crontab
# 0 2 * * * /usr/bin/python3 /path/to/orchestrate_wellfile_pipeline.py >> /var/log/pipeline.log 2>&1
Python 3 SQLite cron / launchd
2

PDF Acquisition

Download official wellfile PDFs from NDIC.

Each new well number maps to an official wellfile PDF in the NDIC document portal. These are the raw regulatory filings — scanned forms, diagrams, and permit conditions that contain the structured intelligence we need. Downloads are rate-limited to avoid triggering blocks, and all outcomes are logged to SQLite for auditability.

Input
Well number list from Stage 1
e.g. ["W41156", "W41157"]
Process
Construct NDIC PDF URL per well
Download at 0.5 req/sec
3 retries on failure
Log to wellfile_fetches table
Output
ND/wellfiles/pdfs/W{num}.pdf
Typical size: 7–20 MB per well
PDF download — rate-limited
NDIC_PDF_URL = "https://www.dmr.nd.gov/oilgas/Wellfiles/{well_num}.pdf"

for well_num in new_wells:
    url = NDIC_PDF_URL.format(well_num=well_num)

    for attempt in range(3):
        try:
            resp = requests.get(url, timeout=30)
            if resp.status_code == 200:
                path = PDFS_DIR / f"{well_num}.pdf"
                path.write_bytes(resp.content)
                db.mark_downloaded(well_num)
                break
        except requests.RequestException as e:
            logging.warning(f"Attempt {attempt+1} failed: {e}")

    time.sleep(2.0)  # 0.5 req/sec to avoid NDIC rate limits
Python requests SQLite NDIC Portal
3

OCR Processing

Scanned PDFs → clean machine-readable text

NDIC wellfiles are scanned documents — there is no embedded text layer. Google Cloud Vision API's document_text_detection handles the OCR pass. Each page is submitted as an image; results are reassembled, cleaned of artifacts (headers, page numbers, watermarks), and written as a clean text file ready for AI ingestion. A local fallback using ocrmypdf + pdftotext is available when Cloud Vision is unavailable.

Input
ND/wellfiles/pdfs/W{num}.pdf
Scanned image-based PDF, 7–20 MB
Process
Split PDF → page images
Google Cloud Vision document_text_detection
Reassemble + clean artifacts
Normalize whitespace
Output
ND/wellfiles/cleantxt/W{num}_clean.txt
Clean structured permit text, 50–200 KB
OCR implementation — primary + fallback
from google.cloud import vision

def ocr_page(image_bytes: bytes) -> str:
    client = vision.ImageAnnotatorClient()
    image  = vision.Image(content=image_bytes)
    resp   = client.document_text_detection(image=image)
    return resp.full_text_annotation.text


# Local fallback — requires: brew install ocrmypdf poppler
def ocr_local_fallback(pdf_path: Path) -> str:
    ocr_path = pdf_path.with_suffix('.ocr.pdf')
    subprocess.run(
        ['ocrmypdf', '--force-ocr', str(pdf_path), str(ocr_path)],
        check=True
    )
    result = subprocess.run(
        ['pdftotext', str(ocr_path), '-'],
        capture_output=True, text=True
    )
    return result.stdout
Sample OCR output — W41156_clean.txt (excerpt)
NORTH DAKOTA INDUSTRIAL COMMISSION
OIL AND GAS DIVISION

PERMIT TO DRILL

WELL NUMBER:   41156
OPERATOR:      Slawson Petroleum Company, Inc.
WELL NAME:     BEAR DEN FEDERAL 1-19H
LOCATION:      SE/4SE/4 Sec. 19, T153N, R90W
COUNTY:        Mountrail
FORMATION:     Bakken
SURFACE ELEV:  2,212 ft.
TOTAL DEPTH:   21,500 ft.
H2S PRESENT:   Yes — safety equipment required
SETBACK:       Variance granted per Docket OG-22-0471
...
Google Cloud Vision API ocrmypdf pdftotext Pillow pdf2image
4

Grok Extraction

Primary AI model — structured intelligence from raw text

The first AI pass uses Grok (via xAI API) to extract 20+ structured fields from the clean permit text. The model receives the full document and returns a JSON object with operator details, well geometry, formation data, hazard flags, and non-routine signals. An HTML report is generated for human review. Files exceeding the 200K token limit (~400 KB) are skipped with a circuit-breaker after 3 consecutive failures.

Input
W{num}_clean.txt
Max 200K tokens (~400 KB)
Files larger than this are skipped
Process
Full document sent to Grok API
Structured extraction prompt (20+ fields)
JSON response parsed and validated
HTML report rendered via template
Output
ND/wellfiles/llmjson/W{num}.json
website/html_reports/W{num}.html
W41156.json — extracted intelligence schema
{
  "wellNumber":       "W41156",
  "operator":         "Slawson Petroleum Company, Inc.",
  "wellName":         "BEAR DEN FEDERAL 1-19H",
  "county":           "Mountrail",
  "formation":        "Bakken",
  "surfaceElevation": 2212,
  "totalDepth":       21500,
  "spudDate":         "2025-01-20",
  "permitDate":       "2025-01-15",
  "signals": {
    "h2s":               true,
    "flaring":           false,
    "setbackVariance":   true,
    "multipleFormations":false
  },
  "signalCount":  2,
  "tier":         "T1",   // T1 = score 50+, high-value opportunity
  "model":        "grok-2",
  "extractedAt":  "2025-01-22T02:14:33Z"
}
⚠️
Token limit: Files exceeding 400 KB (~216K tokens) are automatically skipped. A circuit-breaker halts the run after 3 consecutive failures. Credit exhaustion from the API returns as a generic 400 error — monitor for this pattern.
Grok API (xAI) Python Jinja2
5

Haiku Intelligence

Claude Haiku — validation pass + interpretive depth

A second AI model pass using Claude Haiku validates Grok's extraction and adds a layer of interpretive intelligence: what does this permit actually signal for service companies? Haiku generates an opportunityScore, action window, and a plain-English intelligence summary. API calls are rate-limited to 10–15 seconds apart, and the pipeline has resume capability — already-processed wells are skipped on restart.

Input
W{num}_clean.txt (same source)
W{num}.json from Grok (context)
Intelligence prompt from documents/wellfiles/grok_prompt_intelligence.txt
Process
Claude Haiku API call
10–15s delay between calls
Circuit breaker: stop after 3 failures
Skip already-processed wells
Output
ND/wellfiles/llmjson_i/W{num}_haiku.json
website/html_reports_i/W{num}_i.html
~8–9 KB JSON, ~6 KB HTML
W41156_haiku.json — intelligence output
{
  "wellNumber":  "W41156",
  "model":       "claude-haiku-3-5",
  "operator":    "Slawson Petroleum Company",
  "formation":   "Bakken",
  "intelligenceSummary": "High-value Bakken horizontal with H2S
    present and setback variance — likely targeting Three Forks
    secondary bench. Setback variance suggests proximity to
    residential or public land boundary. Prioritize H2S-rated
    equipment proposal.",
  "hazardFlags":     ["H2S", "SETBACK_VARIANCE"],
  "opportunityScore": 72,
  "tier":            "T1",
  "actionWindow":    "24-48 hours",
  "comparesWithGrok": {
    "operatorMatch":  true,
    "formationMatch": true,
    "signalMatch":    true
  },
  "processedAt": "2025-01-22T02:15:01Z"
}
📊
Current coverage: 42 of 137 standard JSON files (30.7% complete). The pipeline grows this daily. Processing time is approximately 28 seconds per well including API call and mandatory rate-limit delay.
Claude Haiku (Anthropic) Python Anthropic SDK
6

Snowflake Load

Structured data into the analytics warehouse

Both AI extracts and raw OCR text are loaded into Snowflake. The raw tables hold the source text; a CANONICAL_WELL_SIGNALS view applies operator name normalization, signal scoring, and tier assignment. Analytics views downstream compute sentiment scores, hazard flag distributions, and model-comparison diffs — all queryable via standard SQL or Snowflake's Cortex functions.

Input
W{num}.json — Grok extraction
W{num}_haiku.json — Haiku intelligence
W{num}_clean.txt — OCR text
Process
Load to raw tables via Snowflake Python connector
Transform via CANONICAL_WELL_SIGNALS view
Operator normalization CASE statement
Analytics views auto-refresh
Output
RAW_WELLFILE_TEXT — 198 wells
RAW_ORDERFILES_TEXT — 71 orders
RAW_CASEFILES_TEXT — 69 cases
CANONICAL_WELL_SIGNALS — 218 wells
E_canonical_signals.sql — operator normalization
-- snowflake/02_analytics/E_canonical_signals.sql
-- Normalize inconsistent NDIC operator name casing at the view level.
-- Source data is never modified — normalization lives here only.

CREATE OR REPLACE VIEW CANONICAL_WELL_SIGNALS AS
SELECT
  well_number,
  permit_date,
  CASE
    WHEN UPPER(operator) LIKE '%KODA RESOURCES%'
      THEN 'Koda Resources Operating, LLC'
    WHEN UPPER(operator) LIKE '%OASIS PETROLEUM%'
      THEN 'Oasis Petroleum North America, LLC'
    WHEN UPPER(operator) LIKE '%DEVON ENERGY%'
      THEN 'Devon Energy Williston, LLC'
    WHEN UPPER(operator) LIKE '%ENERPLUS%'
      THEN 'Enerplus Resources USA Corporation'
    WHEN UPPER(operator) LIKE '%GRAYSON MILL%'
      THEN 'Grayson Mill Operating, LLC'
    ELSE operator
  END AS operator_normalized,
  signal_count,
  tier,
  h2s_flag,
  flaring_flag,
  setback_variance_flag
FROM RAW_WELLFILE_TEXT;
💡
Design decision: Normalization lives in the Snowflake view, not in the source data. This keeps nd_daily.json as a faithful mirror of the NDIC feed and makes normalization logic easy to audit and update in one place.
Snowflake SQL Python Snowflake Connector
7

Cortex Search Indexing

338 documents. Semantic search. Live queries via Lambda.

Snowflake's Cortex Search service builds a semantic embedding index over all loaded documents — wellfiles, regulatory orders, and case files. Unlike keyword search, Cortex Search understands meaning: a query for "H2S hazard near residential" finds relevant documents even when those exact words don't appear. Search results are filtered below a 0.3 relevance score to eliminate low-quality padding. Live queries are served via an AWS Lambda proxy authenticated with a Snowflake Personal Access Token (PAT).

Input
338 documents in Snowflake:
198 wellfiles + 71 orders + 69 cases
Indexed on: document_text column
Process
Cortex Search auto-embeds all documents
Builds semantic similarity index
Lambda proxy authenticates via PAT
Results filtered at relevance < 0.3
Output
Live search endpoint (Lambda URL)
Returns ranked results + snippets
Linked to HTML reports per well
Search API — request and response shape
// POST request to Lambda endpoint
{
  "query": "H2S hazard Mountrail setback variance",
  "limit": 10
}

// Response from Snowflake Cortex Search via Lambda
{
  "results": [
    {
      "wellNumber":     "W41156",
      "documentType":  "wellfile",
      "relevanceScore": 0.87,
      "snippet": "...H2S present at 15 ppm. Setback variance
                  granted per Docket OG-22-0471. Safety equipment
                  required within 500ft buffer...",
      "reportUrl": "/html_reports/W41156.html"
    }
  ],
  "totalDocs":  338,
  "searchedAt": "2026-03-04T02:14:22Z"
}
scripts/search_handler.py — Lambda authentication
import snowflake.connector

def lambda_handler(event, context):
    query = event['body']['query']
    limit = event['body'].get('limit', 10)

    # PAT auth — no username/password stored in Lambda env
    conn = snowflake.connector.connect(
        account=os.environ['SNOWFLAKE_ACCOUNT'],
        token=os.environ['SNOWFLAKE_PAT'],
        authenticator='oauth'
    )

    results = conn.cursor().execute("""
        SELECT * FROM TABLE(
          CORTEX_SEARCH_RESULT(
            service => 'WELLFILE_SEARCH',
            query   => %s,
            limit   => %s
          )
        )
    """, (query, limit))

    return {"results": results.fetchall()}
Snowflake Cortex Search AWS Lambda Python PAT Auth
8

Web Delivery

S3 + CloudFront + Lambda — static files with live search

Website files, HTML reports, and JSON data are synced to an S3 bucket and served via CloudFront CDN. Static files are cached globally at the edge; the Lambda endpoint handles live Cortex Search queries. The cortex_insights.json summary file is regenerated manually via generate_cortex_insights.py and re-deployed.

Input
All HTML reports, JSON extracts
nd_daily.json, site files
Lambda function ZIP package
Process
aws s3 sync to S3 bucket
CloudFront invalidation on update
Lambda serves live search queries
Output
Live at oilgasorgrass.com
CDN-served globally
Live search via Lambda endpoint
Live site endpoints
https://oilgasorgrass.com/
  /                           → index.html (landing + well search)
  /signal_coverage.html       → live intelligence dashboard (6 tabs)
  /wellfile_updates.html      → per-well AI report index
  /daily_permits.html         → daily NDIC permit feed
  /pipeline.html              → this page

Per-well reports:
  /html_reports/W{num}.html   → Grok extraction report
  /html_reports_i/W{num}_i.html → Haiku intelligence report

Data files (S3, CloudFront-cached):
  /nd_daily.json              → 2,171 wells, updated daily
  /html_files_manifest.json   → index of all reports
AWS S3 CloudFront AWS Lambda Python

Tools & Stack

Every component in production

The full technology inventory, organized by pipeline layer.

🐍

Python 3

Orchestration, all pipeline stages, API clients

Core

cron / launchd

Daily scheduling at 2 AM MT, macOS launchd in production

Core
🗄️

SQLite

Local state tracking for idempotency — nd_signals.db

Core
📜

NDIC Portal

Source permit data and wellfile PDFs (public regulatory feed)

Data In
👁️

Google Cloud Vision

PDF OCR — document_text_detection on scanned wellfiles

Data In
📝

ocrmypdf + pdftotext

Local OCR fallback when Cloud Vision is unavailable

Data In
🧠

Grok API (xAI)

Primary AI extraction — 20+ structured fields per permit

Intelligence

Claude Haiku (Anthropic)

Intelligence validation, opportunity scoring, action summaries

Intelligence
❄️

Snowflake

Data warehouse — raw tables, analytics views, operator normalization

Analytics
🔍

Snowflake Cortex Search

Semantic search index over 338 regulatory documents

Analytics

AWS Lambda

Live search API proxy — authenticates to Snowflake Cortex via PAT

Delivery
☁️

AWS S3 + CloudFront

Static hosting and global CDN delivery for all site files

Delivery