Salesforce CDC Integration
Real-time Salesforce Change Data Capture (CDC) pipeline that streams CRM changes directly to ad platform conversions.
Architecture Overview
Salesforce Org (Client's)
└── CDC Events (gRPC Pub/Sub API)
└── pubsub-worker.ts (separate process)
├── ChangeEventHeader extraction
├── Trigger field matching (Phase 1 gate)
├── Condition evaluation (AND/OR groups)
└── BullMQ queue (salesforce-cdc)
├── Full record fetch (jsforce)
├── Click ID extraction (configurable field map)
└── Platform fanout
├── Google Ads (GCLID)
├── Meta (FBCLID/FBP)
├── LinkedIn (li_fat_id)
├── TikTok (TTCLID)
├── Reddit (rdt_cid)
└── Bing (MSCLKID)
How It Works
1. OAuth Connection
A growth manager clicks "Connect Salesforce" in the dashboard. This starts a standard OAuth 2.0 flow against TNT's single Connected App. Per-client tokens are stored in the SalesforceAccess table.
During OAuth callback, the system attempts to enable CDC on Opportunity and Lead objects via the Metadata API.
2. gRPC Streaming (pubsub-worker)
A separate background worker (pubsub-worker.ts) runs as its own Render service. It:
- Polls the DB every 60 seconds for active
SalesforceAccessrecords with mappings - Opens one gRPC stream per
(clientId, topic)pair to Salesforce's Pub/Sub API - Decodes Avro-encoded CDC payloads
- Persists replay IDs for exactly-once resume on restart
- Listens on Redis
salesforce:disconnectchannel for immediate stream teardown
3. Event Matching (cdc-event-handler)
When a CDC event arrives, the handler:
- Extracts the
ChangeEventHeader(entityName, recordIds, changeType, changedFields) - Filters to only UPDATE and CREATE events (skips DELETE/UNDELETE)
- For each mapping on the topic:
- Checks if
triggerFieldwas changed (skipped for CREATE events where all fields are "changed") - Checks if the new value matches
triggerValue(case-insensitive) - Evaluates additional conditions against the CDC payload
- Checks if
- Queues a conversion job for each matching mapping + each record ID
4. Conversion Fanout (salesforce-cdc queue)
The BullMQ worker:
- Loads the full Client record with CRM mappings and platform IDs
- Fetches the complete Salesforce record via jsforce for click ID extraction
- Extracts click IDs using the mapping's
clickIdFieldMap(or defaults) - Optionally fetches a related record if
clickIdSourceObjectis set - Fans out to per-platform queues (Google, Meta, LinkedIn, TikTok, Reddit, Bing)
Configuration
SalesforcePubSubMapping
Each mapping defines a CDC trigger and its conversion routing:
| Field | Description | Example |
|---|---|---|
topic | CDC event channel | /data/OpportunityChangeEvent |
triggerField | Field to watch for changes | StageName |
triggerValue | Value that triggers a conversion | Admitted |
eventName | Links to CrmEventMapping for platform routing | sf_opportunity_admitted |
conditions | Additional filter conditions (AND/OR groups) | See Conditions section |
clickIdFieldMap | Custom Salesforce field names for click IDs | {"gclid": ["GCLID__c"]} |
clickIdSourceObject | Related object to fetch click IDs from | Contact |
platformTargets | Limit which platforms receive conversions | ["google", "meta"] |
isActive | Enable/disable this mapping | true |
Enabled Objects
Each SalesforceAccess record has an enabledObjects field (e.g., ["Opportunity", "Lead"]) that controls which CDC topics get gRPC streams. The worker only opens streams for enabled objects. If not set, all objects with active mappings get streams.
Click ID Field Mapping
Salesforce custom field names vary per org. The clickIdFieldMap lets you configure which fields to check for each click ID type:
{
"gclid": ["Google_Click_ID__c", "GCLID__c"],
"fbclid": ["Facebook_Click__c"],
"msclkid": ["Bing_CID__c"]
}
If not set, the system falls back to common field name patterns (GCLID__c, gclid__c, Google_Click_ID__c, etc.).
Click ID Source Object
Some clients store click IDs on the Lead or Contact record, not the Opportunity. Set clickIdSourceObject to Contact or Lead and the worker will fetch that related record via the ${sourceObject}Id relationship field.
Platform Targets
By default, matched events fan out to all ad platforms that have click IDs. Set platformTargets to limit routing:
["google", "meta", "linkedin"]
Valid values: google, meta, linkedin, tiktok, reddit, bing.
Condition Evaluator
Conditions support AND/OR logic with nesting. Two input formats are supported:
Flat Array (AND-only, backward-compatible)
[
{ "field": "Amount", "operator": "greater_than", "value": "10000" },
{ "field": "RecordType", "operator": "equals", "value": "Treatment" }
]
Nested Groups (AND/OR)
{
"logic": "AND",
"rules": [
{ "field": "StageName", "operator": "equals", "value": "Admitted" },
{
"logic": "OR",
"rules": [
{ "field": "RecordType", "operator": "equals", "value": "Treatment" },
{ "field": "RecordType", "operator": "equals", "value": "Detox" }
]
}
]
}
The dashboard uses a recursive tree builder (ConditionTreeBuilder in apps/flex-dashboard/src/components/) that supports nested AND/OR groups up to 3 levels deep. Groups can be collapsed/expanded, and the nesting is fully preserved on save. The same component is available as a shared building block for other integration forms (HubSpot, CRM event mappings, etc.).
Supported Operators
| Operator | Description | Value Required |
|---|---|---|
equals | Exact match (case-insensitive) | Yes |
not_equals | Not equal | Yes |
contains | Substring match | Yes |
not_contains | Does not contain substring | Yes |
greater_than | Numeric comparison | Yes |
less_than | Numeric comparison | Yes |
includes | Value exists in semicolon-delimited list (multi-select picklists) | Yes |
not_includes | Value not in semicolon-delimited list | Yes |
is_empty | Field is null or empty string | No |
is_not_empty | Field has a value | No |
is_true | Boolean field is true | No |
is_false | Boolean field is false | No |
Event Name Bridge
The eventName on a mapping is the vocabulary key that connects Salesforce events to platform-specific conversion actions:
SalesforcePubSubMapping.eventName = "sf_opportunity_admitted"
│
▼
CrmEventMapping.eventName = "sf_opportunity_admitted"
├── linkedInConversionId = "12345"
├── redditConversionId = "reddit_conversion_abc"
└── fallbackEventName = "opportunity_admitted"
│
▼
GoogleConversionMapping.eventName = "sf_opportunity_admitted"
└── conversionId = "AW-123/AbCdEfG"
Kill Switch
- Pause: Sets
SalesforceAccess.isActive = false. Streams close immediately via Redis pub/sub. OAuth tokens preserved. - Resume: Sets
isActive = true. Next polling cycle (within 60s) reopens streams. - Disconnect: Revokes OAuth token, deletes
SalesforceAccess, deactivates all mappings. Immediate via Redis signal.
Troubleshooting
No events arriving
- Check
SalesforceAccess.isActiveistrue - Check
enabledObjectsincludes the expected object type - Verify CDC is enabled in the client's Salesforce org (Setup > Change Data Capture)
- Check the mapping's
triggerFieldmatches the Salesforce API name (not the label)
Click IDs not extracted
- Check
clickIdFieldMapfield names match the client's org - If click IDs are on a related object, set
clickIdSourceObject - Verify the Salesforce user has read access to the click ID fields
Conversions not firing to a platform
- Check
platformTargets— if set, only listed platforms receive conversions - Verify the
eventNamematches aCrmEventMappingon the client - For Google, verify a
GoogleConversionMappingexists with the sameeventName - Check the client has the platform's access token/pixel ID configured
Stream disconnections
The worker reconnects automatically with exponential backoff. Check logs for [SALESFORCE_PUBSUB] prefix. If streams keep disconnecting, the OAuth token may need refresh — try pausing and resuming.