site-web-targo/supabase/functions/import-csv-url/index.ts
louispaulb 88dc3714a1 Initial deploy: gigafibre.ca website with self-hosted address search
React/Vite/shadcn-ui site for Gigafibre ISP.
Address qualification via PostgreSQL (5.2M AQ addresses, pg_trgm fuzzy search).
No Supabase dependency for address search.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 14:37:50 -04:00

420 lines
15 KiB
TypeScript

import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
const corsHeaders = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers":
"authorization, x-client-info, apikey, content-type, x-supabase-client-platform, x-supabase-client-platform-version, x-supabase-client-runtime, x-supabase-client-runtime-version",
};
const BATCH_SIZE = 200;
const MAX_ROWS_PER_CALL = 50000;
const MAX_RETRIES = 3;
const RETRY_DELAY_MS = 2000;
const TIME_LIMIT_MS = 120000;
// Collect log lines for returning to frontend
let logLines: string[] = [];
function log(msg: string) {
const ts = new Date().toISOString().slice(11, 19);
const line = `[${ts}] ${msg}`;
logLines.push(line);
console.log(`[import-csv-url] ${msg}`);
}
async function upsertWithRetry(
supabase: any,
table: string,
batch: any[],
onConflict: string,
ignoreDuplicates = false
) {
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
const opts: any = { onConflict };
if (ignoreDuplicates) opts.ignoreDuplicates = true;
const { error } = await supabase.from(table).upsert(batch, opts);
if (!error) return { error: null };
if (error.message?.includes("statement timeout") && attempt < MAX_RETRIES - 1) {
log(`⚠️ Statement timeout on batch (attempt ${attempt + 1}/${MAX_RETRIES}), retrying...`);
await new Promise((r) => setTimeout(r, RETRY_DELAY_MS * (attempt + 1)));
continue;
}
return { error };
}
return { error: { message: "Max retries exceeded" } };
}
function parseLine(line: string, delimiter: string): string[] {
if (delimiter === "\t" || delimiter === ";") {
return line.split(delimiter).map((v) => v.trim().replace(/^"|"$/g, ""));
}
const values: string[] = [];
let current = "";
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
if (char === '"') {
if (inQuotes && i + 1 < line.length && line[i + 1] === '"') {
current += '"';
i++;
} else {
inQuotes = !inQuotes;
}
} else if (char === delimiter && !inQuotes) {
values.push(current.trim());
current = "";
} else {
current += char;
}
}
values.push(current.trim());
return values;
}
function normalizeHeader(name: string): string {
return name
.toLowerCase()
.normalize("NFD")
.replace(/[\u0300-\u036f]/g, "")
.replace(/[_\s]+/g, "_")
.replace(/[^a-z0-9_]/g, "")
.trim();
}
const FIBER_HEADER_MAP: Record<string, string> = {
id_placemark: "id_placemark", idplacemark: "id_placemark",
id_appart: "id_appart", idappart: "id_appart",
uuidadresse: "uuidadresse", idgouvqc: "uuidadresse", id_gouv_qc: "uuidadresse",
nom: "nom", civique: "civique",
appartement: "appartement", appt: "appartement", apt: "appartement",
rue: "rue", ville: "ville",
code_postal: "code_postal", codepostal: "code_postal", cp: "code_postal",
lien_googlemap: "lien_googlemap", liengooglemap: "lien_googlemap", googlemap: "lien_googlemap",
zone_tarifaire: "zone_tarifaire", zonetarifaire: "zone_tarifaire", zone: "zone_tarifaire",
max_speed: "max_speed", maxspeed: "max_speed", vitesse: "max_speed", vitesse_max: "max_speed",
commentaire: "commentaire", commentaires: "commentaire", comment: "commentaire",
lien_map_targo: "_skip", lienmaptargo: "_skip",
};
function mapAddressRow(r: Record<string, string>) {
return {
identifiant_unique_adresse: r.identifiant_unique_adresse,
date_diffusion_version: r.date_diffusion_version || null,
date_creation: r.date_creation || null,
date_modification: r.date_modification || null,
date_fin: r.date_fin || null,
numero_municipal: r.numero_municipal || null,
numero_municipal_suffixe: r.numero_municipal_suffixe || null,
numero_unite: r.numero_unite || null,
type_unite: r.type_unite || null,
code_postal: r.code_postal || null,
seqodo: r.seqodo || null,
generique_odonyme: r.generique_odonyme || null,
particule_odonyme: r.particule_odonyme || null,
specifique_odonyme: r.specifique_odonyme || null,
point_cardinal_odonyme: r.point_cardinal_odonyme || null,
odonyme_recompose_normal: r.odonyme_recompose_normal || null,
odonyme_recompose_court: r.odonyme_recompose_court || null,
odonyme_recompose_long: r.odonyme_recompose_long || null,
discriminant_odonyme: r.discriminant_odonyme || null,
odonyme_officiel_ctop: r.odonyme_officiel_ctop || null,
adresse_formatee: r.adresse_formatee || null,
qualite_positionnement_geometrique: r.qualite_positionnement_geometrique || null,
etat: r.etat || null,
code_utilisation: r.code_utilisation || null,
categorie: r.categorie || null,
nombre_unite: r.nombre_unite ? parseInt(r.nombre_unite) : null,
numero_lot: r.numero_lot || null,
matricule_unite_evaluation: r.matricule_unite_evaluation || null,
nosqnocivq: r.nosqnocivq || null,
caracteristique_adresse: r.caracteristique_adresse || null,
code_region_administrative: r.code_region_administrative || null,
nom_region_administrative: r.nom_region_administrative || null,
code_mrc: r.code_mrc || null,
nom_mrc: r.nom_mrc || null,
code_municipalite: r.code_municipalite || null,
nom_municipalite: r.nom_municipalite || null,
nom_municipalite_complet: r.nom_municipalite_complet || null,
code_arrondissement: r.code_arrondissement || null,
nom_arrondissement: r.nom_arrondissement || null,
code_communaute_metropolitaine: r.code_communaute_metropolitaine || null,
nom_communaute_metropolitaine: r.nom_communaute_metropolitaine || null,
identifiant_reseau_routier: r.identifiant_reseau_routier || null,
cote_route: r.cote_route || null,
longitude: r.longitude ? parseFloat(r.longitude) : null,
latitude: r.latitude ? parseFloat(r.latitude) : null,
};
}
function mapFiberRow(r: Record<string, string>) {
if (!r.uuidadresse) return null;
return {
id_placemark: r.id_placemark || null,
id_appart: r.id_appart || null,
uuidadresse: r.uuidadresse,
nom: r.nom || null,
civique: r.civique || null,
appartement: r.appartement || null,
rue: r.rue || null,
ville: r.ville || null,
code_postal: r.code_postal || null,
lien_googlemap: r.lien_googlemap || null,
zone_tarifaire: r.zone_tarifaire ? parseInt(r.zone_tarifaire) : 1,
max_speed: r.max_speed ? parseInt(r.max_speed) : 0,
commentaire: r.commentaire || null,
};
}
Deno.serve(async (req) => {
// Reset logs for each request
logLines = [];
if (req.method === "OPTIONS") {
return new Response(null, { headers: corsHeaders });
}
try {
// --- Auth check ---
const authHeader = req.headers.get("Authorization");
if (!authHeader) {
return new Response(JSON.stringify({ error: "Unauthorized" }), {
status: 401, headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
const authClient = createClient(
Deno.env.get("SUPABASE_URL")!,
Deno.env.get("SUPABASE_ANON_KEY")!,
{ global: { headers: { Authorization: authHeader } } }
);
const token = authHeader.replace("Bearer ", "");
const { data: claimsData, error: authError } = await authClient.auth.getClaims(token);
if (authError || !claimsData?.claims?.sub) {
return new Response(JSON.stringify({ error: "Unauthorized: invalid token" }), {
status: 401, headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
const serviceClient = createClient(
Deno.env.get("SUPABASE_URL")!,
Deno.env.get("SUPABASE_SERVICE_ROLE_KEY")!
);
const { data: roleRow } = await serviceClient
.from("user_roles")
.select("role")
.eq("user_id", claimsData.claims.sub)
.eq("role", "admin")
.maybeSingle();
if (!roleRow) {
return new Response(JSON.stringify({ error: "Forbidden: admin role required" }), {
status: 403, headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
const { url, type, skip_rows = 0, csv_text } = await req.json();
if (!type || !["addresses", "fiber"].includes(type)) {
return new Response(JSON.stringify({ error: "Missing 'type' (addresses|fiber)" }), {
status: 400, headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
log(`🚀 Import ${type} — skip_rows=${skip_rows}`);
let csvStream: ReadableStream<Uint8Array>;
if (csv_text) {
// CSV text sent directly from browser (e.g. after client-side ZIP decompression)
log(`📄 Receiving CSV text from client (${(csv_text.length / 1024 / 1024).toFixed(1)} MB)`);
const encoded = new TextEncoder().encode(csv_text);
csvStream = new ReadableStream({
start(controller) {
// Send in 512KB chunks to avoid memory spikes
const chunkSize = 512 * 1024;
let offset = 0;
while (offset < encoded.length) {
controller.enqueue(encoded.slice(offset, offset + chunkSize));
offset += chunkSize;
}
controller.close();
},
});
} else if (url) {
// Direct CSV URL (no ZIP support server-side)
log(`📥 Downloading from URL: ${url.slice(0, 80)}...`);
const fetchResponse = await fetch(url);
if (!fetchResponse.ok || !fetchResponse.body) {
const errMsg = `Failed to fetch URL: ${fetchResponse.status} ${fetchResponse.statusText}`;
log(`${errMsg}`);
return new Response(JSON.stringify({ error: errMsg, logs: logLines }), {
status: 400, headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
csvStream = fetchResponse.body;
} else {
return new Response(JSON.stringify({ error: "Missing 'url' or 'csv_text'" }), {
status: 400, headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
const streamReader = csvStream.getReader();
const decoder = new TextDecoder("utf-8", { fatal: false });
const startTime = Date.now();
let headers: string[] = [];
let delimiter = ",";
let leftover = "";
let totalRowsRead = 0;
let processedRows = 0;
let inserted = 0;
let errors = 0;
const errorMessages: string[] = [];
let pendingRows: any[] = [];
let done = false;
let stoppedByLimit = false;
const flushBatch = async (batch: any[]) => {
if (batch.length === 0) return;
const table = type === "addresses" ? "addresses" : "fiber_availability";
const onConflict = type === "addresses" ? "identifiant_unique_adresse" : "uuidadresse";
const ignoreDups = type === "fiber";
const { error } = await upsertWithRetry(serviceClient, table, batch, onConflict, ignoreDups);
if (error) {
const msg = error.message || "Unknown error";
log(`❌ Batch error (${batch.length} rows): ${msg}`);
errors += batch.length;
if (errorMessages.length < 10) errorMessages.push(msg);
} else {
inserted += batch.length;
}
};
while (true) {
if (Date.now() - startTime > TIME_LIMIT_MS) { stoppedByLimit = true; break; }
if (processedRows >= MAX_ROWS_PER_CALL) { stoppedByLimit = true; break; }
const { done: streamDone, value } = await streamReader.read();
const chunk = decoder.decode(value, { stream: !streamDone });
const text = leftover + chunk;
const lines = text.split(/\r?\n/);
leftover = streamDone ? "" : (lines.pop() || "");
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
if (headers.length === 0) {
const cleanLine = trimmed.charCodeAt(0) === 0xfeff ? trimmed.slice(1) : trimmed;
const tabCount = (cleanLine.match(/\t/g) || []).length;
const semiCount = (cleanLine.match(/;/g) || []).length;
const commaCount = (cleanLine.match(/,/g) || []).length;
delimiter = tabCount >= semiCount && tabCount >= commaCount ? "\t"
: semiCount >= commaCount ? ";" : ",";
const candidateHeaders = parseLine(cleanLine, delimiter);
if (candidateHeaders.length < 3) continue;
headers = candidateHeaders;
log(`📋 Headers (${headers.length}): ${headers.slice(0, 5).join(", ")}...`);
if (type === "fiber") {
const mapped = headers.map(h => {
const norm = normalizeHeader(h);
const col = FIBER_HEADER_MAP[norm];
return col ? `${h}${col}` : `${h}→❌`;
});
log(`🔗 Mapping: ${mapped.join(" | ")}`);
}
continue;
}
totalRowsRead++;
if (totalRowsRead <= skip_rows) {
if (totalRowsRead % 100000 === 0) {
log(`⏩ Skipping... ${totalRowsRead.toLocaleString()} / ${skip_rows.toLocaleString()}`);
}
continue;
}
const values = parseLine(trimmed, delimiter);
const row: Record<string, string> = {};
if (type === "fiber") {
headers.forEach((h, idx) => {
const norm = normalizeHeader(h);
const dbCol = FIBER_HEADER_MAP[norm];
if (dbCol && dbCol !== "_skip") {
row[dbCol] = values[idx] || "";
}
});
} else {
headers.forEach((h, idx) => {
row[h] = values[idx] || "";
});
}
const mapped = type === "addresses" ? mapAddressRow(row) : mapFiberRow(row);
if (mapped) {
pendingRows.push(mapped);
processedRows++;
}
if (pendingRows.length >= BATCH_SIZE) {
await flushBatch(pendingRows);
pendingRows = [];
// Log progress every 10k rows
if (processedRows % 10000 === 0) {
const elapsed = ((Date.now() - startTime) / 1000).toFixed(0);
log(`📊 Progress: ${processedRows.toLocaleString()} rows processed, ${inserted.toLocaleString()} inserted (${elapsed}s)`);
}
}
if (processedRows >= MAX_ROWS_PER_CALL || Date.now() - startTime > TIME_LIMIT_MS) {
stoppedByLimit = true;
break;
}
}
if (stoppedByLimit) break;
if (streamDone) { done = true; break; }
}
if (pendingRows.length > 0) {
await flushBatch(pendingRows);
}
if (!done) {
try { streamReader.cancel(); } catch {}
}
const nextSkip = skip_rows + processedRows;
const isComplete = done && !stoppedByLimit;
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
if (isComplete) {
log(`✅ Import COMPLETE: ${processedRows.toLocaleString()} rows, ${inserted.toLocaleString()} inserted, ${errors} errors (${elapsed}s)`);
} else {
log(`⏸️ Chunk done: ${processedRows.toLocaleString()} rows this chunk, next_skip=${nextSkip} (${elapsed}s)`);
}
return new Response(
JSON.stringify({
success: true,
type,
processed: processedRows,
inserted,
errors,
errorMessages,
next_skip: nextSkip,
done: isComplete,
logs: logLines,
}),
{ headers: { ...corsHeaders, "Content-Type": "application/json" } }
);
} catch (err: any) {
log(`💥 Fatal error: ${err.message}`);
console.error("[import-csv-url] Error:", err);
return new Response(JSON.stringify({ error: err.message, logs: logLines }), {
status: 500, headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
});