Custom Connectors
Connect any data source to your knowledge graph - not just the 20+ first-class connectors. If it has an API, a webhook, or can export data, it can feed your graph.
Two Integration Models
Graphory supports two ways to bring in custom data. Choose based on your use case:
Push Model (Ingest API)
Your system sends data to Graphory whenever new data is available. Best for webhooks, event-driven systems, and one-time imports.
Pull Model (Custom Collector)
A script fetches data from your API on a schedule. Best for APIs that need polling, batch jobs, and recurring data sync.
Push Model: Ingest API
The fastest way to get custom data in. Send a POST request to the Ingest API from any system that can make HTTP requests. No collector setup, no scheduling - just push and go.
Supported source types
- REST APIs - Fetch data from any REST endpoint, then POST to Ingest
- GraphQL APIs - Query your GraphQL endpoint, transform results, POST to Ingest
- Webhooks - Point any webhook at a relay that forwards to the Ingest API
- CSV / Spreadsheets - Read rows and POST each as an ingest item
- Automation platforms - Zapier, Make.com, n8n all support HTTP POST actions
Example: REST API
Fetch data from any REST API and push it to your graph:
import requests
# Fetch from your internal API
response = requests.get(
"https://internal.yourcompany.com/api/deals",
headers={"Authorization": "Bearer your_internal_key"}
)
deals = response.json()
# Push each deal to Graphory
for deal in deals:
requests.post(
"https://api.graphory.io/ingest",
headers={"Authorization": "Bearer gs_ak_your_api_key"},
json={
"entity": "your-org",
"source": "internal-crm",
"title": deal["name"],
"body": f"Stage: {deal['stage']}. Value: ${deal['value']}. Contact: {deal['contact']}",
"type": "deal",
"date": deal["updated_at"][:10]
}
)
print(f"Ingested: {deal['name']}")
Example: GraphQL API
import requests
# Query your GraphQL endpoint
query = """
query {
projects(status: ACTIVE) {
id
name
description
updatedAt
team { name }
}
}
"""
result = requests.post(
"https://api.linear.app/graphql",
headers={"Authorization": "Bearer lin_your_key"},
json={"query": query}
).json()
# Push to Graphory
for project in result["data"]["projects"]:
team_names = ", ".join(m["name"] for m in project["team"])
requests.post(
"https://api.graphory.io/ingest",
headers={"Authorization": "Bearer gs_ak_your_api_key"},
json={
"entity": "your-org",
"source": "linear",
"title": project["name"],
"body": f"{project['description']}\nTeam: {team_names}",
"type": "project",
"date": project["updatedAt"][:10]
}
)
Example: Webhook Relay
For webhook-driven sources (Stripe events, GitHub webhooks, etc.), set up a simple relay:
const express = require('express');
const app = express();
app.use(express.json());
app.post('/webhook/stripe', async (req, res) => {
const event = req.body;
await fetch('https://api.graphory.io/ingest', {
method: 'POST',
headers: {
'Authorization': 'Bearer gs_ak_your_api_key',
'Content-Type': 'application/json'
},
body: JSON.stringify({
entity: 'your-org',
source: 'stripe',
title: `${event.type}: ${event.data.object.id}`,
body: JSON.stringify(event.data.object, null, 2),
type: event.type,
date: new Date(event.created * 1000).toISOString().slice(0, 10),
idempotency_key: event.id
})
});
res.json({ received: true });
});
app.listen(3000);
Example: CSV Import
import csv
import requests
GRAPHORY_KEY = "gs_ak_your_api_key"
with open("contacts.csv") as f:
reader = csv.DictReader(f)
items = []
for row in reader:
items.append({
"entity": "your-org",
"source": "csv-import",
"title": f"{row['first_name']} {row['last_name']}",
"body": f"Email: {row['email']}\nCompany: {row['company']}\nRole: {row['role']}",
"type": "contact",
"idempotency_key": f"csv-{row['email']}"
})
# Batch ingest (up to 100 items per request)
for i in range(0, len(items), 100):
batch = items[i:i+100]
resp = requests.post(
"https://api.graphory.io/ingest",
headers={"Authorization": f"Bearer {GRAPHORY_KEY}"},
json={"items": batch}
)
print(f"Batch {i//100 + 1}: {resp.json()}")
Example: Bash / curl
# Quick one-liner to push a note
curl -X POST https://api.graphory.io/ingest \
-H "Authorization: Bearer gs_ak_your_api_key" \
-H "Content-Type: application/json" \
-d '{
"entity": "your-org",
"source": "manual",
"title": "Meeting notes - Product review",
"body": "Decided to launch v2 next month. Key blocker: API docs need updating.",
"type": "note"
}'
Via MCP: Two-Step Register and Configure
If you want Graphory to poll a custom REST or GraphQL API on your schedule - without you running a cron job - register the source through MCP. This is a strict two-call flow, and the source will not collect anything until both calls succeed.
1. connect_custom_source
Stores credentials in your org's encrypted vault and registers the connection. The source is created with ready: false and is skipped by every cron run until step 2 completes. Required arguments include system_name, base_url, auth_type, credentials, and entity. Optional: schedule, test_endpoint, auth_config, extra_headers.
2. configure_custom_collector
Defines which endpoints to pull, pagination strategy, and item extraction rules. Once this succeeds with at least one endpoint, the source flips to ready: true and starts collecting on its schedule.
The schedule field
Every custom source has a schedule that controls how often Graphory polls it. Pick one when you call connect_custom_source, or leave it at the default:
| Value | Cadence | Use for |
|---|---|---|
live | Every 6 hours | Live chat, real-time tickets, fast-moving platforms. |
standard | Twice daily (default for new sources) | Most business systems: CRMs, accounting, project management. |
nightly | Once per day (4am) | Stable reference data, company directories, rarely-changing config. |
manual | On demand only | Experimental integrations, one-shot imports, expensive polls. The AI triggers collection manually with sync_graph. |
paused | Paused | Broken auth, user paused, pre-launch. Parked, not deleted. |
paused so it stops burning retries. Move it back with update_source_schedule.
Dual-header auth (extra_headers)
Some APIs require more than one header - for example, an API that expects both an apikey header and an Authorization header. Pass extra_headers as a JSON object of additional header name/value pairs. They are sent alongside the primary Authorization header on every request.
{
"system_name": "supabase",
"base_url": "https://your-project.supabase.co",
"auth_type": "bearer",
"credentials": "eyJhbGci...",
"entity": "acme-corp",
"schedule": "standard",
"extra_headers": {
"apikey": "eyJhbGci..."
}
}
GraphQL endpoints
configure_custom_collector accepts an api_type: "graphql" endpoint alongside the default REST type. GraphQL endpoints require a graphql_query and support cursor-based pagination via pagination.type: "graphql_cursor" - Graphory injects the cursor into the configured variable on each successive page.
{
"path": "/graphql",
"api_type": "graphql",
"graphql_query": "query($cursor: String) { viewer { repositories(first: 50, after: $cursor) { nodes { id name description url updatedAt } pageInfo { hasNextPage endCursor } } } }",
"graphql_variables": {},
"item_type": "repository",
"items_path": "data.viewer.repositories.nodes",
"pagination": {
"type": "graphql_cursor",
"cursor_variable": "cursor",
"has_next_path": "data.viewer.repositories.pageInfo.hasNextPage",
"next_cursor_path": "data.viewer.repositories.pageInfo.endCursor"
}
}
Pull Model: Custom Collectors
For APIs that need regular polling, build a collector script and run it on a schedule. This is how Graphory's built-in connectors work internally.
How it works
Write a collector script
A script that authenticates with your API, fetches new data, and pushes it to the Ingest API. Can be in any language.
Track what you have seen
Use idempotency keys or a local state file to avoid re-ingesting the same data on each run.
Schedule it
Run on a cron schedule, a GitHub Action, or any task scheduler. Hourly, daily, or whatever fits your data.
Example: Custom API Collector
#!/usr/bin/env python3
"""Custom collector for an internal ticket system."""
import os
import json
import requests
from datetime import datetime, timedelta
GRAPHORY_KEY = os.getenv("GRAPHORY_API_KEY")
TICKET_API = os.getenv("TICKET_API_URL")
TICKET_TOKEN = os.getenv("TICKET_API_TOKEN")
STATE_FILE = "last_sync.json"
def load_state():
try:
with open(STATE_FILE) as f:
return json.load(f)
except FileNotFoundError:
return {"last_sync": (datetime.utcnow() - timedelta(days=30)).isoformat()}
def save_state(state):
with open(STATE_FILE, "w") as f:
json.dump(state, f)
def collect():
state = load_state()
since = state["last_sync"]
# Fetch new tickets since last sync
resp = requests.get(
f"{TICKET_API}/tickets",
headers={"Authorization": f"Bearer {TICKET_TOKEN}"},
params={"updated_since": since, "limit": 100}
)
tickets = resp.json()["tickets"]
if not tickets:
print("No new tickets.")
return
# Push to Graphory
items = []
for t in tickets:
items.append({
"entity": "your-org",
"source": "internal-tickets",
"title": f"[{t['id']}] {t['subject']}",
"body": f"Status: {t['status']}\nPriority: {t['priority']}\nAssigned: {t['assignee']}\n\n{t['description']}",
"type": "ticket",
"date": t["updated_at"][:10],
"idempotency_key": f"ticket-{t['id']}-{t['updated_at']}"
})
resp = requests.post(
"https://api.graphory.io/ingest",
headers={"Authorization": f"Bearer {GRAPHORY_KEY}"},
json={"items": items}
)
print(f"Ingested {len(items)} tickets: {resp.json()}")
# Update state
save_state({"last_sync": datetime.utcnow().isoformat()})
if __name__ == "__main__":
collect()
Schedule with cron:
# Run every hour 0 * * * * cd /path/to/collector && python3 collect_tickets.py >> collect.log 2>&1
Best Practices
Always use idempotency keys
Prevent duplicate data by including an idempotency_key with every ingest call.
Use a combination of the source system's ID and a timestamp or version number.
Batch when possible
The Ingest API accepts up to 100 items per request in the items array.
Batching reduces HTTP overhead and is faster than individual requests.
Structure your body text
The extractor works best with clear, labeled fields in the body text.
Use a format like Field: Value on separate lines.
Name: Sarah Chen Company: Beta Industries Role: VP of Engineering Email: sarah@beta.io Met at: SaaStr 2026 Notes: Interested in enterprise plan. Follow up next week.
Choose meaningful source names
The source field helps you filter and identify where data came from.
Use consistent, descriptive names like "internal-crm", "stripe-webhooks", or "csv-import-2026-04".
Handle errors gracefully
The Ingest API returns standard HTTP status codes. Implement retry logic for
429 (rate limited) and 500 (server error) responses.
400 errors indicate bad data and should be logged for investigation.
What Can You Connect?
If it produces data, it can feed your graph. Common custom sources include:
REST APIs
Any API that returns JSON or XML. Internal tools, SaaS platforms, IoT endpoints.
GraphQL APIs
Query exactly what you need from Linear, GitHub, Hasura, or custom GraphQL servers.
Webhooks
Stripe events, GitHub pushes, Shopify orders, Twilio messages - relay them to Ingest.
CSV / Excel
Bulk import from spreadsheets. CRM exports, financial reports, contact lists.
Databases
Query Postgres, MySQL, MongoDB, or any database and push results to your graph.
AI Conversations
Push insights from Claude, ChatGPT, or other AI conversations back into your graph.