Skip to content

Commit d12b418

Browse files
committed
feat: tnc connector implementation
Signed-off-by: Mouad BANI <mouad-mb@outlook.com>
1 parent bc2e323 commit d12b418

15 files changed

Lines changed: 849 additions & 1 deletion

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES
2+
('enrolled-certification', 'tnc', false, false, 'Successful payment purchase of certification enrollment', 'Enrolled in certification'),
3+
('enrolled-training', 'tnc', false, false, 'Successful payment purchase of training enrollment', 'Enrolled in training'),
4+
('issued-certification', 'tnc', false, false, 'User is granted a certification', 'Issued certification'),
5+
('attempted-course', 'tnc', false, false, 'Certification course is completed', 'Attempted course'),
6+
('attempted-exam', 'tnc', false, false, 'Certification exam is completed', 'Attempted exam');

services/apps/snowflake_connectors/src/core/transformerBase.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,17 @@ export abstract class TransformerBase {
3636
try {
3737
return this.transformRow(row)
3838
} catch (err) {
39-
log.warn({ err, platform: this.platform }, 'Failed to transform row, skipping')
39+
const message = err instanceof Error ? err.message : String(err)
40+
const stack = err instanceof Error ? err.stack : undefined
41+
log.warn(
42+
{
43+
errMessage: message,
44+
errStack: stack,
45+
platform: this.platform,
46+
rowKeys: Object.keys(row),
47+
},
48+
'Failed to transform row, skipping',
49+
)
4050
return null
4151
}
4252
}

services/apps/snowflake_connectors/src/integrations/index.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ import { PlatformType } from '@crowd/types'
88

99
import { buildSourceQuery as cventBuildSourceQuery } from './cvent/event-registrations/buildSourceQuery'
1010
import { CventTransformer } from './cvent/event-registrations/transformer'
11+
import { buildSourceQuery as tncCertificatesBuildQuery } from './tnc/certificates/buildSourceQuery'
12+
import { TncCertificatesTransformer } from './tnc/certificates/transformer'
13+
import { buildSourceQuery as tncCoursesBuildQuery } from './tnc/courses/buildSourceQuery'
14+
import { TncCoursesTransformer } from './tnc/courses/transformer'
15+
import { buildSourceQuery as tncEnrollmentsBuildQuery } from './tnc/enrollments/buildSourceQuery'
16+
import { TncEnrollmentsTransformer } from './tnc/enrollments/transformer'
1117
import { DataSource, DataSourceName, PlatformDefinition } from './types'
1218

1319
export type { BuildSourceQuery, DataSource, PlatformDefinition } from './types'
@@ -23,6 +29,25 @@ const supported: Partial<Record<PlatformType, PlatformDefinition>> = {
2329
},
2430
],
2531
},
32+
[PlatformType.TNC]: {
33+
sources: [
34+
{
35+
name: DataSourceName.TNC_ENROLLMENTS,
36+
buildSourceQuery: tncEnrollmentsBuildQuery,
37+
transformer: new TncEnrollmentsTransformer(),
38+
},
39+
{
40+
name: DataSourceName.TNC_CERTIFICATES,
41+
buildSourceQuery: tncCertificatesBuildQuery,
42+
transformer: new TncCertificatesTransformer(),
43+
},
44+
{
45+
name: DataSourceName.TNC_COURSES,
46+
buildSourceQuery: tncCoursesBuildQuery,
47+
transformer: new TncCoursesTransformer(),
48+
},
49+
],
50+
},
2651
}
2752

2853
const enabled = (process.env.CROWD_SNOWFLAKE_ENABLED_PLATFORMS || '')
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import { IS_PROD_ENV } from '@crowd/common'
2+
3+
// Main: analytics.silver_fact.certificates (certificate data)
4+
// Joins:
5+
// - analytics.silver_dim._crowd_dev_segments_union (segment resolution)
6+
// - analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user (LFID)
7+
// - analytics.silver_dim.users (LFID fallback)
8+
// - analytics.bronze_fivetran_salesforce.accounts + analytics.bronze_fivetran_salesforce_b2b.accounts (org data)
9+
10+
const CDP_MATCHED_SEGMENTS = `
11+
cdp_matched_segments AS (
12+
SELECT DISTINCT
13+
s.SOURCE_ID AS sourceId,
14+
s.slug
15+
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
16+
WHERE s.PARENT_SLUG IS NOT NULL
17+
AND s.GRANDPARENTS_SLUG IS NOT NULL
18+
AND s.SOURCE_ID IS NOT NULL
19+
)`
20+
21+
const ORG_ACCOUNTS = `
22+
org_accounts AS (
23+
SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES
24+
FROM analytics.bronze_fivetran_salesforce.accounts
25+
WHERE website IS NOT NULL
26+
UNION ALL
27+
SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES
28+
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
29+
WHERE website IS NOT NULL
30+
)`
31+
32+
const LFID_COALESCE = `COALESCE(mu.user_name, u.lf_username)`
33+
34+
export const buildSourceQuery = (sinceTimestamp?: string): string => {
35+
let select = `
36+
SELECT
37+
c.*,
38+
cms.slug AS PROJECT_SLUG,
39+
org.account_name AS ORGANIZATION_NAME,
40+
org.website AS ORG_WEBSITE,
41+
org.domain_aliases AS ORG_DOMAIN_ALIASES,
42+
org.logo_url AS LOGO_URL,
43+
org.industry AS ORGANIZATION_INDUSTRY,
44+
CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE,
45+
${LFID_COALESCE} AS LFID
46+
FROM analytics.silver_fact.certificates c
47+
INNER JOIN cdp_matched_segments cms
48+
ON cms.sourceId = c.project_id
49+
LEFT JOIN analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user mu
50+
ON c.user_id = mu.user_id
51+
AND mu.user_name IS NOT NULL
52+
LEFT JOIN analytics.silver_dim.users u
53+
ON LOWER(c.user_email) = LOWER(u.email)
54+
AND u.lf_username IS NOT NULL
55+
LEFT JOIN org_accounts org
56+
ON c.account_id = org.account_id
57+
WHERE c.user_email IS NOT NULL`
58+
59+
// Limit to a single project in non-prod to avoid exporting all projects data
60+
if (!IS_PROD_ENV) {
61+
select += ` AND cms.slug = 'cncf'`
62+
}
63+
64+
const dedup = `
65+
QUALIFY ROW_NUMBER() OVER (PARTITION BY c.certificate_id ORDER BY org.website DESC) = 1`
66+
67+
if (!sinceTimestamp) {
68+
return `
69+
WITH ${ORG_ACCOUNTS},
70+
${CDP_MATCHED_SEGMENTS}
71+
${select}
72+
${dedup}`.trim()
73+
}
74+
75+
return `
76+
WITH ${ORG_ACCOUNTS},
77+
${CDP_MATCHED_SEGMENTS},
78+
new_cdp_segments AS (
79+
SELECT DISTINCT
80+
s.SOURCE_ID AS sourceId,
81+
s.slug
82+
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
83+
WHERE s.CREATED_TS >= '${sinceTimestamp}'
84+
AND s.PARENT_SLUG IS NOT NULL
85+
AND s.GRANDPARENTS_SLUG IS NOT NULL
86+
AND s.SOURCE_ID IS NOT NULL
87+
)
88+
89+
-- New certificates since last export
90+
${select}
91+
AND c.issued_ts >= '${sinceTimestamp}'
92+
${dedup}
93+
94+
UNION
95+
96+
-- All certificates in newly created segments
97+
${select}
98+
AND EXISTS (
99+
SELECT 1 FROM new_cdp_segments ncs
100+
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
101+
)
102+
${dedup}`.trim()
103+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import { TNC_GRID, TncActivityType } from '@crowd/integrations'
2+
import { getServiceChildLogger } from '@crowd/logging'
3+
import {
4+
IActivityData,
5+
IMemberData,
6+
MemberAttributeName,
7+
MemberIdentityType,
8+
PlatformType,
9+
} from '@crowd/types'
10+
11+
import { TransformedActivity } from '../../../core/transformerBase'
12+
import { TncTransformerBase } from '../tncTransformerBase'
13+
14+
const log = getServiceChildLogger('tncCertificatesTransformer')
15+
16+
export class TncCertificatesTransformer extends TncTransformerBase {
17+
transformRow(row: Record<string, unknown>): TransformedActivity | null {
18+
const email = (row.USER_EMAIL as string | null)?.trim() || null
19+
if (!email) {
20+
log.debug({ certificateId: row.CERTIFICATE_ID }, 'Skipping row: missing email')
21+
return null
22+
}
23+
24+
const certificateId = (row.CERTIFICATE_ID as string)?.trim()
25+
const learnerName = (row.LEARNER_NAME as string | null)?.trim() || null
26+
const lfUsername = (row.LFID as string | null)?.trim() || null
27+
28+
const identities: IMemberData['identities'] = []
29+
const sourceId = (row.USER_ID as string | null) || undefined
30+
31+
if (lfUsername) {
32+
identities.push(
33+
{
34+
platform: PlatformType.TNC,
35+
value: email,
36+
type: MemberIdentityType.EMAIL,
37+
verified: true,
38+
sourceId,
39+
},
40+
{
41+
platform: PlatformType.TNC,
42+
value: lfUsername,
43+
type: MemberIdentityType.USERNAME,
44+
verified: true,
45+
sourceId,
46+
},
47+
{
48+
platform: PlatformType.LFID,
49+
value: lfUsername,
50+
type: MemberIdentityType.USERNAME,
51+
verified: true,
52+
sourceId,
53+
},
54+
)
55+
} else {
56+
identities.push({
57+
platform: PlatformType.TNC,
58+
value: email,
59+
type: MemberIdentityType.USERNAME,
60+
verified: true,
61+
sourceId,
62+
})
63+
}
64+
65+
const activity: IActivityData = {
66+
type: TncActivityType.ISSUED_CERTIFICATION,
67+
platform: PlatformType.TNC,
68+
timestamp: (row.ISSUED_TS as string | null) || null,
69+
score: TNC_GRID[TncActivityType.ISSUED_CERTIFICATION].score,
70+
sourceId: certificateId,
71+
sourceParentId: (row.COURSE_ID as string | null) || undefined,
72+
member: {
73+
displayName: learnerName || email.split('@')[0],
74+
identities,
75+
organizations: this.buildOrganizations(row),
76+
attributes: {
77+
...((row.JOB_TITLE as string | null) && {
78+
[MemberAttributeName.JOB_TITLE]: { [PlatformType.TNC]: row.JOB_TITLE as string },
79+
}),
80+
...((row.USER_COUNTRY as string | null) && {
81+
[MemberAttributeName.COUNTRY]: { [PlatformType.TNC]: row.USER_COUNTRY as string },
82+
}),
83+
},
84+
},
85+
attributes: {
86+
productName: (row.COURSE_NAME as string | null) || null,
87+
technology: (row.TECHNOLOGIES_LIST as string | null) || null,
88+
didExpire: row.DID_EXPIRE as boolean | null,
89+
expirationDate: (row.EXPIRATION_DATE as string | null) || null,
90+
},
91+
}
92+
93+
const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null
94+
const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null
95+
if (!segmentSlug || !segmentSourceId) {
96+
return null
97+
}
98+
99+
return { activity, segment: { slug: segmentSlug, sourceId: segmentSourceId } }
100+
}
101+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import { IS_PROD_ENV } from '@crowd/common'
2+
3+
// Main: analytics.bronze_census_ti.course_actions (course action data)
4+
// Joins:
5+
// - analytics.bronze_census_ti.users (user resolution via internal_ti_user_id)
6+
// - analytics.bronze_census_ti.courses (course metadata)
7+
// - analytics.silver_fact.enrollments (segment + org resolution via email + course_id)
8+
// - analytics.silver_dim._crowd_dev_segments_union (segment resolution)
9+
// - analytics.bronze_fivetran_salesforce.accounts + analytics.bronze_fivetran_salesforce_b2b.accounts (org data)
10+
11+
const CDP_MATCHED_SEGMENTS = `
12+
cdp_matched_segments AS (
13+
SELECT DISTINCT
14+
s.SOURCE_ID AS sourceId,
15+
s.slug
16+
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
17+
WHERE s.PARENT_SLUG IS NOT NULL
18+
AND s.GRANDPARENTS_SLUG IS NOT NULL
19+
AND s.SOURCE_ID IS NOT NULL
20+
)`
21+
22+
const ORG_ACCOUNTS = `
23+
org_accounts AS (
24+
SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES
25+
FROM analytics.bronze_fivetran_salesforce.accounts
26+
WHERE website IS NOT NULL
27+
UNION ALL
28+
SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES
29+
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
30+
WHERE website IS NOT NULL
31+
)`
32+
33+
export const buildSourceQuery = (sinceTimestamp?: string): string => {
34+
let select = `
35+
SELECT
36+
ca.*,
37+
co.*,
38+
tu.user_email,
39+
tu.lfid,
40+
tu.learner_name,
41+
tu.user_country,
42+
tu.job_title,
43+
e.project_slug AS PROJECT_SLUG,
44+
e.project_id AS PROJECT_ID,
45+
e.account_id,
46+
org.account_name AS ORGANIZATION_NAME,
47+
org.website AS ORG_WEBSITE,
48+
org.domain_aliases AS ORG_DOMAIN_ALIASES,
49+
org.logo_url AS LOGO_URL,
50+
org.industry AS ORGANIZATION_INDUSTRY,
51+
CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE
52+
FROM analytics.bronze_census_ti.course_actions ca
53+
INNER JOIN analytics.bronze_census_ti.users tu
54+
ON ca.internal_ti_user_id = tu.internal_ti_user_id
55+
INNER JOIN analytics.bronze_census_ti.courses co
56+
ON ca.course_id = co.course_id
57+
INNER JOIN analytics.silver_fact.enrollments e
58+
ON e.course_id = ca.course_id
59+
AND LOWER(e.user_email) = LOWER(tu.user_email)
60+
INNER JOIN cdp_matched_segments cms
61+
ON cms.slug = e.project_slug
62+
AND cms.sourceId = e.project_id
63+
LEFT JOIN org_accounts org
64+
ON e.account_id = org.account_id
65+
WHERE ca.type = 'status_change'
66+
AND ca.source = 'course_started'
67+
AND co.is_test_or_archived = false
68+
AND tu.user_email IS NOT NULL`
69+
70+
// Limit to a single project in non-prod to avoid exporting all projects data
71+
if (!IS_PROD_ENV) {
72+
select += ` AND e.project_slug = 'cncf'`
73+
}
74+
75+
const dedup = `
76+
QUALIFY ROW_NUMBER() OVER (PARTITION BY ca.course_action_id ORDER BY org.website DESC) = 1`
77+
78+
if (!sinceTimestamp) {
79+
return `
80+
WITH ${ORG_ACCOUNTS},
81+
${CDP_MATCHED_SEGMENTS}
82+
${select}
83+
${dedup}`.trim()
84+
}
85+
86+
return `
87+
WITH ${ORG_ACCOUNTS},
88+
${CDP_MATCHED_SEGMENTS},
89+
new_cdp_segments AS (
90+
SELECT DISTINCT
91+
s.SOURCE_ID AS sourceId,
92+
s.slug
93+
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
94+
WHERE s.CREATED_TS >= '${sinceTimestamp}'
95+
AND s.PARENT_SLUG IS NOT NULL
96+
AND s.GRANDPARENTS_SLUG IS NOT NULL
97+
AND s.SOURCE_ID IS NOT NULL
98+
)
99+
100+
-- New course actions since last export
101+
${select}
102+
AND ca.timestamp >= '${sinceTimestamp}'
103+
${dedup}
104+
105+
UNION
106+
107+
-- All course actions in newly created segments
108+
${select}
109+
AND EXISTS (
110+
SELECT 1 FROM new_cdp_segments ncs
111+
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
112+
)
113+
${dedup}`.trim()
114+
}

0 commit comments

Comments
 (0)