""" Alation Exporter - Export to Alation Data Catalog format. Alation is an enterprise data catalog and data governance platform. https://www.alation.com/ """ from typing import Dict, Any, List from datetime import datetime import uuid from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge class AlationExporter(LineageExporter): """Export lineage to Alation format.""" def __init__(self, graph: LineageGraph, datasource_id: int = 1, datasource_name: str = "Lineage Accelerator"): super().__init__(graph) self.datasource_id = datasource_id self.datasource_name = datasource_name @property def format_name(self) -> str: return "Alation" @property def file_extension(self) -> str: return ".json" def _node_type_to_alation_otype(self, node_type: str) -> str: """Map internal node types to Alation object types.""" type_mapping = { "table": "table", "view": "view", "model": "table", "source": "datasource", "destination": "table", "column": "attribute", "database": "schema", "schema": "schema", "report": "bi_report", "dimension": "table", "fact": "table", "feature_set": "table", "semantic_model": "bi_datasource", "external_api": "datasource", "extract": "table" } return type_mapping.get(node_type.lower(), "table") def _create_table_object(self, node: LineageNode) -> Dict[str, Any]: """Create an Alation table object from a node.""" obj = { "key": self._get_key(node), "title": node.name, "description": node.description or "", "ds_id": self.datasource_id, "schema_name": node.schema or "default", "table_name": node.name, "table_type": node.type.upper() if node.type else "TABLE" } # Add custom fields custom_fields = [] if node.category: custom_fields.append({ "field_name": "Data Layer", "value": node.category }) if node.owner: custom_fields.append({ "field_name": "Data Owner", "value": node.owner }) if node.tags: custom_fields.append({ "field_name": "Tags", "value": ", ".join(node.tags) }) if node.database: custom_fields.append({ "field_name": "Database", "value": node.database }) if custom_fields: obj["custom_fields"] = custom_fields return obj def _get_key(self, node: LineageNode) -> str: """Get Alation-style key for a node.""" parts = [str(self.datasource_id)] if node.schema: parts.append(node.schema) else: parts.append("default") parts.append(node.name) return ".".join(parts) def _create_column_objects(self, node: LineageNode) -> List[Dict[str, Any]]: """Create Alation column objects from a node's columns.""" if not node.columns: return [] column_objects = [] table_key = self._get_key(node) for idx, col in enumerate(node.columns): col_obj = { "key": f"{table_key}.{col.get('name')}", "column_name": col.get("name"), "column_type": col.get("type") or col.get("data_type", "string"), "description": col.get("description", ""), "table_key": table_key, "position": idx + 1 } # Check for primary key if col.get("isPrimaryKey"): col_obj["is_primary_key"] = True # Check for foreign key if col.get("isForeignKey"): col_obj["is_foreign_key"] = True if col.get("references"): col_obj["fk_reference"] = col.get("references") column_objects.append(col_obj) return column_objects def _create_lineage_object(self, edge: LineageEdge) -> Dict[str, Any]: """Create an Alation lineage object from an edge.""" source_node = self.graph.get_node(edge.source) target_node = self.graph.get_node(edge.target) lineage = { "source_key": self._get_key(source_node) if source_node else edge.source, "target_key": self._get_key(target_node) if target_node else edge.target, "lineage_type": edge.type or "DIRECT" } # Add job information if available if edge.job_name: lineage["dataflow_name"] = edge.job_name if edge.job_id: lineage["dataflow_id"] = edge.job_id # Add transformation description if edge.transformation: lineage["transformation_description"] = edge.transformation return lineage def _create_dataflow(self, edge: LineageEdge) -> Dict[str, Any]: """Create an Alation dataflow object from an edge.""" dataflow_name = edge.job_name or f"dataflow_{edge.source}_to_{edge.target}" dataflow = { "external_id": edge.job_id or str(uuid.uuid4()), "title": dataflow_name, "description": f"Data transformation: {edge.type}", "dataflow_type": edge.type.upper() if edge.type else "ETL" } return dataflow def export(self) -> str: """Export to Alation JSON format.""" return self.to_json(indent=2) def _to_dict(self) -> Dict[str, Any]: """Convert to Alation bulk import dictionary.""" # Collect tables tables = [] columns = [] for node in self.graph.nodes: tables.append(self._create_table_object(node)) columns.extend(self._create_column_objects(node)) # Collect lineage lineage_objects = [self._create_lineage_object(edge) for edge in self.graph.edges] # Collect unique dataflows dataflows = [] seen_dataflows = set() for edge in self.graph.edges: dataflow_name = edge.job_name or f"dataflow_{edge.source}_to_{edge.target}" if dataflow_name not in seen_dataflows: dataflows.append(self._create_dataflow(edge)) seen_dataflows.add(dataflow_name) return { "exportInfo": { "producer": "Lineage Graph Accelerator", "exportedAt": self.graph.generated_at, "sourceLineageName": self.graph.name, "format": "Alation Bulk API", "version": "1.0" }, "datasource": { "id": self.datasource_id, "title": self.datasource_name, "ds_type": "custom" }, "schemas": self._extract_schemas(), "tables": tables, "columns": columns, "lineage": lineage_objects, "dataflows": dataflows, "summary": { "totalTables": len(tables), "totalColumns": len(columns), "totalLineageEdges": len(lineage_objects), "totalDataflows": len(dataflows), "schemas": list(set(t.get("schema_name", "default") for t in tables)) } } def _extract_schemas(self) -> List[Dict[str, Any]]: """Extract unique schemas from nodes.""" schemas = {} for node in self.graph.nodes: schema_name = node.schema or "default" if schema_name not in schemas: schemas[schema_name] = { "key": f"{self.datasource_id}.{schema_name}", "schema_name": schema_name, "ds_id": self.datasource_id, "description": f"Schema: {schema_name}" } if node.database: schemas[schema_name]["db_name"] = node.database return list(schemas.values())