Generating Descendant Table Using Spark GraphX

11/02/20192 Min Read — In Data Engineering, Coding

Generating Descendant Table Using Spark GraphX

Hierarchical data is very common in business domain modeling. An org chart, for example, shows the reporting hierarchies of employees in an organization. This hierarchical model is important and can be used in enterprise software systems for fine-grain data access control.

In this article, I will demonstrate how we can use Apache Spark's GraphX library to model hierarchical data and implement a graph algorithm using Pregel API to generate a descendant table for use in row-level access control.

Implementing Access Control Using Descendant Table

Generic row-level access control on employee data is a common security requirement. For instance, the CEO of a company should be able to see all employee data, while a middle manager can only see data from her direct and down line reports.

One simple way to achieve this filtering of data dynamically, is to use a descendant table as an access control lookup.

For example, given an org chart:

CEO
├── COO
├── CTO
│ ├── Manager A
│ │ └── Supervisor
│ │ └── Team Lead
│ │ └── Developer
│ └── Manager B
└── CFO

We can model this hierarchy using a Descendant Table: a table that contains all reporting pairs of employee and his or her entire upper management chain. Below is an example of a descendant table generated from the org chart above.

EmployeeReportingTo
CEO-
COOCEO
CFOCEO
Manager ACEO
Manager ACTO
Manager BCEO
Manager BCTO
SupervisorCEO
SupervisorCTO
SupervisorManager A
Team LeadCEO
Team LeadCTO
Team LeadManager A
Team LeadSupervisor
DeveloperCEO
DeveloperCTO
DeveloperManager A
DeveloperSupervisor
DeveloperTeam Lead

If you want to filter a Fact Table like the one below...

EmployeeRating
DeveloperGood
Team LeadAverage
SupervisorExcellent

You can join the Descendant Table with the Fact Table and filter ReportingTo column with the current active user. A simple SQL query should do the trick.

select * from descendant d
inner join fact f on f.Employee = d.Employee
where ReportingTo = [User]

Generating the Descendant Table

Normally, employee tables would contain information about their direct line manager only. And the entire reporting structure (ie. their manager's manager) may have to be recursively queried from the same table.

Performing this query on a relational database can be quite inefficient. In this scenario, a graph based algorithm is better suited.

In this article, I am going to use Apache Spark's GraphX library and specifically the Pregel API to generate the descendant table. You can refer to the GraphX guide for more details on how to use this library.

I chose Spark since I intend to use this code on a large dataset and performance is a key factor. I am using a small dataset for demonstration purposes only.

First, let's generate a sample data that may be typical of an employee table. The data includes some information about the employee and a reference ID to the employee's supervisor supervisorId. Note that the CEO doesn't have a supervisor.

type Role = String
case class Employee(name: String, role: Role)
val employeeRawData = Array(
(1L, "Steve", "Jobs", "CEO", None),
(2L, "Leslie", "Lamport", "CTO", Some(1L)),
(3L, "Jason", "Fried", "Manager", Some(1L)),
(4L, "Joel", "Spolsky", "Manager", Some(2L)),
(5L, "Jeff", "Dean", "Lead", Some(4L)),
(6L, "Martin", "Odersky", "Sr.Dev", Some(5L)),
(7L, "Linus", "Trovalds", "Dev", Some(6L)),
(8L, "Steve", "Wozniak", "Dev", Some(6L)),
(9L, "Matei", "Zaharia", "Dev", Some(6L)),
(10L, "James", "Faeldon", "Intern", Some(7L))
)
val employeeDf = sc.parallelize(employeeRawData, 4).toDF(
"employeeId",
"firstName",
"lastName",
"role",
"supervisorId"
)

Next, we generate the graph model using the basic building blocks -- vertices and edges.

val verticesRdd: RDD[(VertexId, Employee)] = employeeDf
.select($"employeeId", concat($"firstName", lit(" "), $"lastName"), $"role")
.rdd.map(emp => (emp.getLong(0), Employee(emp.getString(1), emp.getString(2))))
val edgesRdd: RDD[Edge[String]] = employeeDf
// Remove vertices without supervisor, in Scala None === Null
.filter($"supervisorId".isNotNull)
// First column is supervisorID (not employeeId), since direction of edge is top-down
.select($"supervisorId", $"employeeId", $"role")
// Edge property is the Role
.rdd.map(emp => Edge(emp.getLong(0), emp.getLong(1), emp.getString(2)))
// Define a default employee in case there are missing employee referenced in Graph
val missingEmployee = Employee("John Doe", "Unknown")
// Let's build the graph model
val employeeGraph: Graph[Employee, String] = Graph(verticesRdd, edgesRdd, missingEmployee)

Let's create case classes to model the vertex values as well as the messages that will be used for the Pregel API. I recommend, using case classes as oppose to tuples when working in Spark as it improves code readability.

// The structure of the message to be passed to vertices
case class EmployeeMessage(
currentId: Long, // Tracks the most recent vertex appended to path and used for flagging isCyclic
level: Int, // The number of up-line supervisors (level in reporting heirarchy)
head: String, // The top-most supervisor
path: List[String], // The reporting path to the the top-most supervisor
isCyclic: Boolean, // Is the reporting structure of the employee cyclic
isLeaf: Boolean // Is the employee rank and file (no down-line reporting employee)
)
// The structure of the vertex values of the graph
case class EmployeeValue(
name: String, // The employee name
currentId: Long, // Initial value is the employeeId
level: Int, // Initial value is zero
head: String, // Initial value is this employee's name
path: List[String], // Initial value contains this employee's name only
isCyclic: Boolean, // Initial value is false
isLeaf: Boolean // Initial value is true
)

Let's initialize the vertex values.

// Initialize the employee vertices
val employeeValueGraph: Graph[EmployeeValue, String] = employeeGraph.mapVertices { (id, v) =>
EmployeeValue(
name = v.name,
currentId = id,
level = 0,
head = v.name,
path = List(v.name),
isCyclic = false,
isLeaf = false
)
}

Now, let's encode the graph algorithm using the Pregel API superstep functions making sure to handle cyclic structures (ex. A reports to B, B reports to C, and C reports to A). In case of a cyclic structures, we will use an isCyclic flag to indicate all employees in the cycle.

/**
* Step 1: Mutate the value of the vertices, based on the message received
*/
def vprog(
vertexId: VertexId,
value: EmployeeValue,
message: EmployeeMessage
): EmployeeValue = {
if (message.level == 0) { //superstep 0 - initialize
value.copy(level = value.level + 1)
} else if (message.isCyclic) { // set isCyclic
value.copy(isCyclic = true)
} else if (!message.isLeaf) { // set isleaf
value.copy(isLeaf = false)
} else { // set new values
value.copy(
currentId = message.currentId,
level = value.level + 1,
head = message.head,
path = value.name :: message.path
)
}
}
/**
* Step 2: For all triplets that received a message -- meaning, any of the two vertices
* received a message from the previous step -- then compose and send a message.
*/
def sendMsg(
triplet: EdgeTriplet[EmployeeValue, String]
): Iterator[(VertexId, EmployeeMessage)] = {
val src = triplet.srcAttr
val dst = triplet.dstAttr
// Handle cyclic reporting structure
if (src.currentId == triplet.dstId || src.currentId == dst.currentId) {
if (!src.isCyclic) { // Set isCyclic
Iterator((triplet.dstId, EmployeeMessage(
currentId = src.currentId,
level = src.level,
head = src.head,
path = src.path,
isCyclic = true,
isLeaf = src.isLeaf
)))
} else { // Already marked as isCyclic (possibly, from previous superstep) so ignore
Iterator.empty
}
} else { // Regular reporting structure
if (src.isLeaf) { // Initially every vertex is leaf. Since this is a source then it should NOT be a leaf, update
Iterator((triplet.srcId, EmployeeMessage(
currentId = src.currentId,
level = src.level,
head = src.head,
path = src.path,
isCyclic = false,
isLeaf = false // This is the only important value here
)))
} else { // Set new values by propagating source values to destination
//Iterator.empty
Iterator((triplet.dstId, EmployeeMessage(
currentId = src.currentId,
level = src.level,
head = src.head,
path = src.path,
isCyclic = false, // Set to false so that cyclic updating is ignored in vprog
isLeaf = true // Set to true so that leaf updating is ignored in vprog
)))
}
}
}
/**
* Step 3: Merge all inbound messages to a vertex. No special merging needed for this use case.
*/
def mergeMsg(msg1: EmployeeMessage, msg2: EmployeeMessage): EmployeeMessage = msg2

Let's run the graph algorithm.

val initialMsg = EmployeeMessage(
currentId = 0L,
level = 0,
head = "",
path = Nil,
isCyclic = false,
isLeaf = true
)
val results = employeeValueGraph.pregel(
initialMsg,
Int.MaxValue,
EdgeDirection.Out
)(
vprog,
sendMsg,
mergeMsg
)
val resultDf = results
.vertices.map { case (id, v) => (id, v.name, v.level, v.head, v.path.reverse.mkString(">"), v.isCyclic, v.isLeaf) }
.toDF("id", "employee", "level", "head", "path", "cyclic", "leaf")
display(resultDf)

The vertex values' path column contains all the employees upper management chain.

Finally, we can generate the descendant table by doing a simple flatMap on the path. You can now use the access column to join with your fact table, similarly to how ReportingTo works in our initial example.

val acl = results.
vertices.flatMap { case (id, v) =>
v.path.map(p => ((id, v.name, p, v.level, v.head, v.path.reverse.mkString(">"), v.isCyclic, v.isLeaf)))
}.toDF("id", "employee", "access", "level", "head", "path", "cyclic", "leaf")
display(acl)

You can find [the entire code on Github. I've tested the code on Scala 2.11, Spark 5.5. The code is a Databricks Scala notebook that you can easily import and run on your own Apache Spark cluster.