DIY: Apache Spark and ADLS Gen 2 support

Warning: this walkthrough is not to be considered as official guidance or recommendation from Microsoft. It is presented for educational purposes only, and comes “as-is” and confers no rights, no warranties or guarantees.

There are several options for customers who want to deploy Apache Spark based solutions on Microsoft Azure: Azure Databricks and Azure HDInsight being the most popular ones. In addition there is also the open-source Azure Distributed Data Engineering Toolkit (AZTK) option as well if you want a more IaaS experience. Of course, with Spark providing analytical compute capabilities, what you also need is a first-class cloud storage which offers HDFS-like capabilities: distributed data storage, redundancy and security. Azure Data Lake Storage Gen 2 (ADLS Gen 2) offers exactly that with world-wide availability and competitive pricing.

In order to connect to ADLS Gen 2 from Apache Hadoop or Apache Spark, you need to leverage the ABFS driver, which was shipped publicly with Apache Hadoop 3.2.0. The associated work item HADOOP-15407 has some more information about this implementation, and best of all, the ABFS driver is part of the Hadoop source.

Given that most distributions of Spark tend to come with Hadoop 2.x versions, the ABFS driver is absent in those cases, leading to a blocker for customers who want to “roll their own” Spark infrastructure but also want to use ADLS Gen 2. I was curious to find out if there is a way to get (let’s say) Spark 2.3.3 to work with Hadoop 3.2.0 (which does include the ABFS driver) and thereby offer at least a path forward (albeit subject to the disclaimers around supportability and stability).

The good news is that Spark comes with a “Hadoop-free” binary distribution which does allow users to associate it with any release of Hadoop, thereby allowing them to “mix and match” Spark and Hadoop versions. Here’s a set of commands that I used to do exactly this on a dev setup, just to see if it works.

The first few steps are just to get the binary tarballs for Spark 2.3.3 (without Hadoop) and separately, for Hadoop 3.2.0. Then extract those as well:

cd ~
wget https://www-eu.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-without-hadoop.tgz
wget https://www-eu.apache.org/dist/hadoop/common/hadoop-3.2.0/hadoop-3.2.0.tar.gz
tar -zxvf spark-2.3.3-bin-without-hadoop.tgz
tar -zxvf hadoop-3.2.0.tar.gz

Then we proceed to setup environment variables. The below also assumes that you have OpenJDK 8 installed. The crucial step is to specify SPARK_DIST_CLASSPATH which as described in the Spark documentation, tells Spark to look within the appropriate Hadoop lib folders to get the JARs needed by appropriate Spark code. Further, you will notice that we also add the hadoop/tools/lib/* into the classpath. That is where the ABFS driver lives. Unfortunately, the Spark documentation does not include this vital step.

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 
export HADOOP_HOME=~/hadoop-3.2.0
export PATH=${HADOOP_HOME}/bin:${PATH}
export SPARK_DIST_CLASSPATH=$(hadoop classpath):~/hadoop-3.2.0/share/hadoop/tools/lib/*
export SPARK_HOME=~/spark-2.3.3-bin-without-hadoop
export PATH=${SPARK_HOME}/bin:${PATH}

Then running spark-shell and trying to read from ADLS Gen 2 works fine, out of the box! I used the below sample code to test with the SharedKey authentication option. I have not tested OAuth 2.0 authentication using this custom deployment, though.

spark.conf.set("fs.azure.account.key.<<storageaccount>>.dfs.core.windows.net",  "<<key>>")
spark.read.csv("abfss://<<container>>@<<storageaccount>>.dfs.core.windows.net/<<topfolder>>/<<subfolder>>/file").count

In closing, I want to re-emphasize that the above should strictly be considered as an experiment and is by no means production-ready. For production workloads, I strongly recommend using services like Azure Databricks or Azure HDInsight, which are tested much more and are fully supported by Microsoft CSS.

This Sample Code is provided for the purpose of illustration only and is not intended to be used in a production environment.  THIS SAMPLE CODE AND ANY RELATED INFORMATION ARE PROVIDED “AS IS” WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.  We grant You a nonexclusive, royalty-free right to use and modify the Sample Code and to reproduce and distribute the object code form of the Sample Code, provided that You agree: (i) to not use Our name, logo, or trademarks to market Your software product in which the Sample Code is embedded; (ii) to include a valid copyright notice on Your software product in which the Sample Code is embedded; and (iii) to indemnify, hold harmless, and defend Us and Our suppliers from and against any claims or lawsuits, including attorneys’ fees, that arise or result from the use or distribution of the Sample Code. This posting is provided “AS IS” with no warranties, and confers no rights.

Avoiding error 403 ("request not authorized") when accessing ADLS Gen 2 from Azure Databricks while using a Service Principal

Azure Data Lake Storage Generation 2 (ADLS Gen 2) has been generally available since 7 Feb 2019. Azure Databricks is a first-party offering for Apache Spark. Many customers want to set ACLs on ADLS Gen 2 and then access those files from Azure Databricks, while ensuring that the precise / minimal permissions granted. In the process, we have seen some interesting patterns and errors (such as the infamous 403 / “request not authorized” error). While there is documentation on the topic, I wanted to take an end-t0-end walkthrough the steps, and hopefully with this you can get everything working just perfectly! Let’s get started…

Setting up a Service Principal in Azure AD

To do this, we first need to create an application registration in Azure Active Directory (AAD). This is well documented in many places, but for reference, here is what I did.

I created a new app registration from the Portal and called it ”adlsgen2app”:

image

From the next screenshot below, note the Application’s ID: 0eb2e28a-0e97-41cc-b765-4c1ec255a0bf. This is also sometimes referred to as the “Client ID”. We are going to ignore the Object ID of the application, because as you will see we will later need the Object ID of the Service Principal for that application within our AAD tenant. More on that soon.

image

We then proceed to create a secret key (“keyfordatabricks”) for this application (redacted in the screenshot below for privacy reasons):

image

Note down that key in a safe place so that we can later store in in AKV and then eventually reference that AKV-backed secret from Databricks.

Setting up a AKV-backed secret store for Azure Databricks

In order to reference the above secret stored in Azure Key Vault (AKV), from within Azure Databricks, we must first add the secret manually to AKV and then associate the AKV itself with the Databricks workspace. The instructions to do this are well documented at the official page. For reference, here is what I did.

As mentioned, I first copied the Service Principal secret into AKV (I had created a AKV instance called “mydbxakv”):

image

Then, I followed the steps in the Azure Databricks documentation to create an AKV-backed secret scope within Databricks, and reference the AKV from there:

image

Granting the Service Principal permissions in ADLS Gen 2

This is probably the most important piece, and we have had some confusion here on how exactly to set the permissions / Access Control Lists in ADLS Gen 2. To start with, we use Azure Storage Explorer to set / view these ACLs. Here’s a screenshot of the ADLS Gen 2 account that I am using. Under that account, there is a “container” (technically a “file system”) called “acltest”:

image

Before we can grant permissions at ADLS Gen 2 to the Service Principal, we need to identify its Object ID (OID). To do this, I used Azure CLI to run the sample command below. The GUID passed to –id is the Application ID which we noted a few steps ago.

az ad sp show --id 0eb2e28a-0e97-41cc-b765-4c1ec255a0bf --query objectId

The value that is returned by the above command is the Object ID (OID) for the Service Principal:

79a448a0-11f6-415d-a451-c89f15f438f2

The OID for the Service Principal has to be used to define permissions in the ADLS Gen 2 ACLs. I repeat: do not use the Object ID from the Application, you must use the Object ID from the Service Principal in order to set ACLs / permissions at ADLS Gen 2 level.

Set / Manage the ACLs at ADLS Gen 2 level

Let’s see this in action; in the above “acltest” container, I am going to add permission for this service principal on a specific file that I have to later access from Azure Databricks:

image

Sidebar: if we view the default permissions on this file you will see that only $superuser has access. $superuser represents the access to the ADLS Gen 2 file system via. storage key, and is only seen when these containers / file systems were created using Storage Key authentication.

To view / manage ACLs, you right click on the container / folder / file in Azure Storage explorer, and then use the “Manage Access” menu. Once you are in Manage Access dialog, as shown in the screenshot below, I have copied (but not yet added) the OID for the service principal that we obtained previously. Again – I cannot emphasize this enough – please make sure you use the Object ID for the Service Principal and not the Object ID for the app registration.

image

Next I clicked the Add button and also set the level of access to web_site_1.dat (Read and Execute in this case, as I intend to only read this data into Databricks):

image

Then I clicked Save. You can also use “Default permissions” if you set ACLs on top-level folders, but do remember that those default permissions only apply to newly created children.

Permissions on existing files

Currently for existing files / folders, you have to grant desired permissions explicitly. Another very important point is that the Service Principal OID must also have been granted Read and Execute at the root (the container level), as well as any intermediate folder(s). In my case, the file web_site_1.dat is located under /mydata. So note that I have also to add the permissions at root level:

image

Then at /mydata level:

image

In other words, the whole chain: all the folders in the path leading up to and including the (existing) file being accessed, must have permissions granted for the Service Principal.

Using the Service Principal from Azure Databricks

Firstly, review the requirements from the official docs. We do recommend using Databricks runtime 5.2 or above.

Providing the ADLS Gen 2 credentials to Spark

In the below walkthrough, we choose to do this at session level. In the below sample code, note the usage of the Databricks dbutils.secrets calls to obtain the secret for the app, from AKV, and also note the usage of the Application ID itself as the “client ID”):

spark.conf.set("fs.azure.account.auth.type", "OAuth") 
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "0eb2e28a-0e97-41cc-b765-4c1ec255a0bf")  # This GUID is just a sample for this walkthrough; it needs to be replaced with the actual Application ID in your case
spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "mysecretscope", key = "adlsgen2secret"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<<AAD tenant id>>/oauth2/token")

Note that the <<AAD tenant id>> placeholder above has also got to be substituted with the actual GUID for the AAD tenant. You can get that from the Azure Portal blade for Azure Active Directory.

Alternate way of configuring the ADLS Gen 2 credentials

In the previous code snippet, the service principal credentials are setup in such a way that they become the default for any ADLS Gen 2 account being accessed from that Spark session. However, there is another (potentially more precise) way of specifying these credentials, and that is to suffix the Spark configuration item keys with the ADLS Gen 2 account name. For example, imagine that “myadlsgen2” is the name of the ADLS Gen 2 account that we are using. The suffix to be applied in this case would be myadlsgen2.dfs.core.windows.net. Then the Spark conf setting would look like the below:

spark.conf.set("fs.azure.account.auth.type.myadlsgen2.dfs.core.windows.net", "OAuth") 
spark.conf.set("fs.azure.account.oauth.provider.type.myadlsgen2.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.myadlsgen2.dfs.core.windows.net", "<<application ID GUID>>") 
spark.conf.set("fs.azure.account.oauth2.client.secret.myadlsgen2.dfs.core.windows.net", dbutils.secrets.get(scope = "mysecretscope", key = "adlsgen2secret"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint.myadlsgen2.dfs.core.windows.net", "https://login.microsoftonline.com/<<AAD tenant id>>/oauth2/token")

This method of suffixing the account name was enabled by this Hadoop fix and also referenced here.

“Happy Path” testing

If all the steps were done correctly, then when you run code to read the file:

df = spark.read.csv("abfss://acltest@<<storage account name>>.dfs.core.windows.net/mydata/web_site_1.dat").show()

…it works correctly!

image

But, what if you missed a step?

If you have missed any step in granting permissions at the various level in the folder hierarchy, it will fail with an 403 error like the below:

StatusCode=403
StatusDescription=This request is not authorized to perform this operation using this permission.

The same error, from a Scala cell:

image

If you happen to run into the above errors, double-check all your steps. Most likely you missed a folder or root-level permission (assuming you gave the permission to the file correctly). The other reason that I have seen is because the permissions using Azure Storage Explorer were set using the Object ID of the application, and not the Object ID of the service principal. This is clearly documented here.

Another reason for the 403 “not authorized” error

Until recently, this error would also occur even if the ACLs were granted perfectly, due to an issue with the ABFS driver. Due to that issue, customers had to add the Service Principal to the storage account contributor IAM permission on the ADLS Gen 2 account. Thankfully, this issue was fixed in HADOOP-15969 and the fix is now included in the Databricks runtime 5.x. You no longer need to grant the Service Principal any IAM permissions on the ADLS Gen 2 account – if you get the ACLs right!

Disclaimer

This Sample Code is provided for the purpose of illustration only and is not intended to be used in a production environment.  THIS SAMPLE CODE AND ANY RELATED INFORMATION ARE PROVIDED “AS IS” WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.  We grant You a nonexclusive, royalty-free right to use and modify the Sample Code and to reproduce and distribute the object code form of the Sample Code, provided that You agree: (i) to not use Our name, logo, or trademarks to market Your software product in which the Sample Code is embedded; (ii) to include a valid copyright notice on Your software product in which the Sample Code is embedded; and (iii) to indemnify, hold harmless, and defend Us and Our suppliers from and against any claims or lawsuits, including attorneys’ fees, that arise or result from the use or distribution of the Sample Code. This posting is provided “AS IS” with no warranties, and confers no rights.

Spark job lineage in Azure Databricks with Spline and Azure Cosmos DB API for MongoDB

Tracking lineage of data as it is manipulated within Apache Spark is a common ask from customers. As of date, there are two options, the first of which is the Hortonworks Spark Atlas Connector, which persists lineage information to Apache Atlas. However, some customers who use Azure Databricks do not necessarily need or use the “full” functionality of Atlas, and instead want a more purpose-built solution. This is where the second option, Spline, comes in. Spline can persist lineage information to Apache Atlas or to a MongoDB database. Now, given that Azure Cosmos DB exposes a MongoDB API, it presents an attractive PaaS option to serve as the persistence layer for Spline.

This blog post is the result of my attempts to use Spline from within Azure Databricks, persisting the lineage information to Azure Cosmos DB using the MongoDB API. Some open “to-do” items are at the end of this blog post.

Installing Spline within Azure Databricks

First and foremost, you need to install a number of JAR libraries to allow Spark to start talking to Spline and thereon to Azure Cosmos DB MongoDB API. There is an open item wherein the Spline team is actively considering providing an “uber JAR” which will include all these dependencies. Until then, you will need to use the Maven coordinates as shown below and install these into Azure Databricks as Maven libraries. The list (assuming you are using Spark 2.4) is below. If you are using other versions of Spark within Azure Databricks, you will need to change the Maven coordinates for org.apache.spark:spark-sql-kafka and

za.co.absa.spline:spline-core-spark-adapter to match the Spark version.

[Updated on 26 March 2019] The original version of this post had every single dependency (including “child” / transitive dependencies) listed. Based on expert advice from my colleague Alexandre Gattiker, there is a cleaner way of just installing the 3 libraries:

za.co.absa.spline:spline-core:0.3.6
za.co.absa.spline:spline-core-spark-adapter-2.4:0.3.6
za.co.absa.spline:spline-persistence-mongo:0.3.6

To add just these libraries, you need to specify “exclusions” when adding these libraries in the Databricks UI, such as what is shown in the screenshot below:

image

The exclusions that we have to add are:

org.apache.spark:spark-sql-kafka-0-10_2.11:${spark.version},org.json4s:json4s-native_2.11:${json4s.version}

If you still do need the full list with transitive dependencies included, it is now included as an Appendix at the very end of this post.

Preparing Azure Cosmos DB for Spline

This was pretty easy, all I needed to do was create a new Azure Cosmos DB account with the MongoDB API enabled. I did need to enable the “pipeline aggregation” preview feature without which the Spline UI does not work. For good measure I also enabled the 3.4 wire protocol but in hindsight it is not required, as Spline only uses the legacy MongoDB driver which is a much older version of the wire protocol.

image

Spark code changes

Setup the Spark session configuration items in order to connect to Azure Cosmos DB’s MongoDB endpoint:

System.setProperty("spline.mode", "REQUIRED")
System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.mongo.MongoPersistenceFactory")
System.setProperty("spline.mongodb.url", "<<the primary connection string from the Azure Cosmos DB account>>")
System.setProperty("spline.mongodb.name", "<<Cosmos DB database name>>")

Enable lineage tracking for that Spark session:

import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()

Then we run a sample aggregation query (we do this as a Python cell just to show that the lineage tracking and persistence works even in PySpark):

%python
rawData = spark.read.option("inferSchema", "true").json("/databricks-datasets/structured-streaming/events/")
rawData.createOrReplaceTempView("rawData")
sql("select r1.action, count(*) as actionCount from rawData as r1 join rawData as r2 on r1.action = r2.action group by r1.action").write.mode('overwrite').csv("/tmp/pyaggaction.csv")

Running the Spline UI

To run the UI, I simply followed the instructions from the Spline documentation page. Since this process just needs Java to run, I can envision a PaaS-only option wherein this UI runs inside a container or an Azure App Service (doing that is left as an exercise to the reader – or maybe for a later blog post)! For now, here’s the syntax I used:

java -D"spline.mongodb.url=<<connection string from Cosmos DB page in Azure Portal>>" -D"spline.mongodb.name=<<Azure Cosmos DB name>>" -jar <<full path to spline-web-0.3.6-exec-war.jar>>

Then by browsing to the port 8080 on the machine running the UI, you can see the Spline UI. Firstly, a lineage view on the above join + aggregate query that we executed:

image

and then, a more detailed view:

image

This is of course a simple example; you can try more real-world examples on your own. Spline is under active development and is open-source; the authors are very active and responsive to queries and suggestions. I encourage you to try this out and judge for yourselves.

“To-do” list

As of this moment, I must call out some open items that I am aware of:

  • [Update 1 Apr 2019] The issue with Search has been understood and there is a reasonable way out. Again, my colleague Alexandre has been instrumental in finding the mitigation.

Unfortunately, the Search textbox does not seem to work correctly for me. I have opened an issue with the Spline team and hopefully we can track down why this is breaking.

  • I have not tested this with any major Spark jobs. If you plan to use this for any kind of serious usage, you should thoroughly test it. Please remember that this is an third-party open-source project, provided on an “as-is” basis.
  • [Update 26 March 2019]: I have verified that the above setup works correctly with VNET Service Endpoints to Azure Cosmos DB, and with corresponding firewall rules set on the Cosmos DB side to only allow traffic from the said VNET where the Databricks workspace is deployed.

My Azure Databricks workspace is deployed into an existing VNET. I still need to test service endpoints and firewall rules on the Cosmos DB side to ensure that traffic to the Azure Cosmos DB is restricted to only that from the VNET.

  • Last but not the least, as I already mentioned, if the Spline team releases an uber JAR that would reduce the overhead of managing and installing all the dependencies, that would make life a bit easier on the Azure Databricks front.

I hope this was useful; do try it out and leave me any questions / feedback / suggestions you have!

Appendix

Here is the full list of dependencies including all children / transitive dependencies. It may be useful if your Databricks workspace is deployed in a “locked-down” VNET with very restrictive NSG rules in place for outbound traffic:

org.json4s:json4s-native_2.11:3.5.3
org.json4s:json4s-ast_2.11:3.5.3
org.json4s:json4s-scalap_2.11:3.5.3
org.json4s:json4s-core_2.11:3.5.3
org.json4s:json4s-ext_2.11:3.5.3
org.scala-lang:scalap:2.11.12
org.scalaz:scalaz-core_2.11:7.2.27
org.slf4s:slf4s-api_2.11:1.7.25
com.github.salat:salat-core_2.11:1.11.2
com.github.salat:salat-util_2.11:1.11.2
org.mongodb:bson:3.10.1
org.apache.atlas:atlas-common:1.0.0
org.apache.atlas:atlas-intg:1.0.0
org.apache.atlas:atlas-notification:1.0.0
org.mongodb:casbah-core_2.11:3.1.1
org.mongodb:casbah-commons_2.11:3.1.1
org.mongodb:casbah-query_2.11:3.1.1
org.apache.kafka:kafka-clients:2.1.1
org.mongodb:mongo-java-driver:3.10.1
org.mongodb:mongodb-driver-legacy:3.10.1
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
za.co.absa.spline:spline-core:0.3.6
za.co.absa.spline:spline-commons:0.3.6
za.co.absa.spline:spline-core-spark-adapter-2.4:0.3.6
za.co.absa.spline:spline-core-spark-adapter-api:0.3.6
za.co.absa.spline:spline-model:0.3.6
za.co.absa.spline:spline-persistence-api:0.3.6
za.co.absa.spline:spline-persistence-atlas:0.3.6
za.co.absa.spline:spline-persistence-hdfs:0.3.6
za.co.absa.spline:spline-persistence-mongo:0.3.6

Disclaimer

This Sample Code is provided for the purpose of illustration only and is not intended to be used in a production environment.  THIS SAMPLE CODE AND ANY RELATED INFORMATION ARE PROVIDED “AS IS” WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.  We grant You a nonexclusive, royalty-free right to use and modify the Sample Code and to reproduce and distribute the object code form of the Sample Code, provided that You agree: (i) to not use Our name, logo, or trademarks to market Your software product in which the Sample Code is embedded; (ii) to include a valid copyright notice on Your software product in which the Sample Code is embedded; and (iii) to indemnify, hold harmless, and defend Us and Our suppliers from and against any claims or lawsuits, including attorneys’ fees, that arise or result from the use or distribution of the Sample Code. This posting is provided “AS IS” with no warranties, and confers no rights.

How we did it: PASS 2017 Summit Session Similarity using SQL Graph and Python

I had previously shared a sneak preview of our upcoming session on Graph data processing in SQL Server. The talk is at the PASS Summit 2017. In that post, I had promised to share more details closer to the session. And here it is!

Inferring Graph Edges using SQL ML Services

In many cases, the edges in a graph are deterministic and ‘known’ to the application. In other cases, edges have to be ‘inferred’ or ‘discovered’ by some code:

  • In some cases, node attributes can be used to detect similar nodes and create an edge
  • In other cases, an ETL process could use fuzzy lookups etc.
  • But, for more complex situations, ML Services in SQL Server 2017 and Azure SQL DB can be used as well! sp_execute_external_script can be used to invoke an R / Python script and get back a list of keys to build edges

In this walkthrough we will use ML Services in SQL Server 2017 to invoke a Python script to infer similar edges in a graph.

Approach

The nodes in this graph will be the sessions at PASS 2017 (with the data imported as per this previous post) and then we will use Python to invoke some language processing code to compute the measures of similarity between pairs of sessions, based on their Title and Abstract fields. In summary here is what we will do:

  • Our database has a node table with all the sessions from PASS Summit 2017
  • Sessions are saved as a Node table in SQL Graph
  • Session node has attributes like Session Id, Title, Abstract, Speaker Names and Track
  • Hypothesis: similar themed sessions have similar keywords in their Title / Abstract
  • Using NLP libraries in Python we can break down these sessions into underlying keywords and their frequency counts
  • Construct a “similarity matrix” and then return for each session, those sessions which have at least 20% similarity
  • Construct edges in SQL Graph for these related session pairs

Pre-requisites

We will be leveraging two powerful Python libraries: NLTK and Gensim, to help us analyze the text and derive a measure of similarity for pairs of sessions. While NLTK comes pre-installed with SQL Server 2017 ML Services, you have to install Gensim using PIP:

 pip install stop_words pip install gensim 

We will then need to install a “corpus” of stop words for NLTK. This will help eliminate some common “noise” words from text to help improve the accuracy of the analysis. To do this we first create a folder for NLTK data:

 md "C:\Program Files\Microsoft SQL Server\MSSQL14.SQL20171000\PYTHON_SERVICES\Lib\nltk_data" 

Then we use nltk.download() to download and install the stopwords corpus as shown below. The important thing to note is to correctly escape the backslash characters in the path when providing it to the NLTK download GUI. In my case I used:

C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\PYTHON_SERVICES\lib\nltk_data

Here’s a screenshot of the above step in case you are wondering:

NLTK_Stopwords2

Once the stopwords corpus is downloaded, we proceed to create the necessary SQL node table, and then convert the previously prepared “regular” table into a Node table using INSERT…SELECT:

CREATE TABLE [dbo].[Session](
	[Index] integer IDENTITY(1,1) NOT NULL,
	[SessionID] integer NULL,
	[Abstract] [nvarchar](max) NULL,
	[SessionLevel] [int] NULL,
	[Speaker1] [nvarchar](100) NULL,
	[Speaker2] [nvarchar](100) NULL,
	[Speaker3] [nvarchar](100) NULL,
	[Title] [nvarchar](4000) NULL,
	[Track] [nvarchar](50) NULL
) AS NODE
GO
INSERT INTO Session (SessionID, Abstract, SessionLevel, Speaker1, Speaker2, Speaker3, Title, Track)
SELECT SessionID, Abstract, SessionLevel, Speaker1, Speaker2, Speaker3, Title, Track FROM dbo.PASS2017Sessions;
GO

We then proceed to create an empty edge table:

CREATE TABLE SimilarSessions
(
SimilarityMetric float
)
AS EDGE

This table is implicitly going to hold the “from” and “to” nodes in the graph and additionally it holds a similarity measure value for that relationship.

Using Python (NLTK and Gensim) to compute session similarity

Now, that we have the tables in place, let’s dig in and do the heavy lifting of text processing and analytics. Given below is the entire code which does the processing, but given it is complex, let me give you a high level flow prior to actually presenting the code. Here is what is happening in the below code:

  • The session data (titles, session ID, abstract, track and an incremental index number) are provided to the Python script from a T-SQL query (that query is at the very end of this code block)
  • Then NLTK is used to break down the title and abstract into words (a process called tokenization)
  • We then stem and remove stop words from the tokenized words
  • We then proceed to build a corpus of these words, taking only those words which have occurred at least 3 times
  • Then we proceed to use TF-IDF to prepare a document matrix of these words and their frequencies in various documents
  • Then, Gensim is used to compute “Matrix Similarity” which is basically a matrix of documents and how similar they are to each other.
  • Once the similarity matrix is built up, we then proceed to build the output result set which maps back the SessionId values and their similarity measures
  • In the above step, one interesting thing to note is that in SQL, graphs are directed. So we have to exclude situations where Session1 ‘is similar to’ Session2 AND Session2 ‘is similar to’ Session1.
  • Once this list of unique edges is built up, it is written back into SQL as edges in the SimilarSessions graph (edge) table by using a function called rxDataStep.

A small but important nuance here with rxDataStep and specifically SQL Graph edge tables, is that you need to exactly match the $from_id and $to_id column names with the actual values (including the GUID portions) that are in the edge table. Alternatively, you can avoid using rxDataStep and insert the output of the sp_execute_external_script into a temporary table / table variable and then JOIN back to the node tables to finally insert into the graph edge table. We will look at improving this experience going forward.

Take your time to understand the code! Here we go:

exec sp_execute_external_script @language = N'Python',
@script = N'
####
from nltk.tokenize import RegexpTokenizer
from stop_words import get_stop_words
from nltk.stem.snowball import SnowballStemmer
from gensim import corpora, models, similarities
import gensim
import pandas as pd
from revoscalepy import RxSqlServerData, rx_data_step
# read data back in
pdDocuments = InputDataSet
tokenizer = RegexpTokenizer(r"w+")
en_stop = get_stop_words("en")
stemmer = SnowballStemmer("english", ignore_stopwords=True)
def iter_documents(pdSeries):
    """Iterate over all documents, yielding a document (=list of utf8 tokens) at a time."""
    for (idx, docrow) in pdSeries.iterrows():
        concatsessionattributes = list()
        concatsessionattributes.append(docrow.Title.lower())
        concatsessionattributes.append(docrow.Abstract.lower())
concatsessionattributesstr = " ".join(concatsessionattributes)
tokens = tokenizer.tokenize(concatsessionattributesstr)
        # Remove stop words from tokens
        stopped_tokens = [i for i in tokens if not i in en_stop]
        final = [stemmer.stem(word) for word in stopped_tokens]
yield final
class MyCorpus(object):
    def __init__(self, pdSeriesInput):
        self.series = pdSeriesInput
        self.dictionary = gensim.corpora.Dictionary(iter_documents(self.series))
        self.dictionary.filter_extremes(no_below=3)
        self.dictionary.compactify()
def __iter__(self):
        for tokens in iter_documents(self.series):
            yield self.dictionary.doc2bow(tokens)
corp1 = MyCorpus(pdDocuments)
tfidf = models.TfidfModel(corp1,id2word=corp1.dictionary, normalize=True)
train_corpus_tfidf = tfidf[corp1]
corpora.MmCorpus.serialize("train_ssd_corpus-tfidf.mm",train_corpus_tfidf)
train_corpus_tfidf = corpora.MmCorpus("train_ssd_corpus-tfidf.mm")
index = similarities.MatrixSimilarity(train_corpus_tfidf)
tfidf_sims  = index[train_corpus_tfidf]
# print (tfidf_sims)
similaritylist = []
def similarsessions(inputindex):
    print("Selected session: " + pdDocuments.loc[inputindex].Title)
    print()
    print("Most similar sessions are listed below:")
    print()
    topNmatches = tfidf_sims[inputindex].argsort()[-10:][::-1]
    for matchedsessindex in topNmatches:
        if (inputindex != matchedsessindex and round(tfidf_sims[inputindex][matchedsessindex] * 100, 2) &amp;amp;gt; 20.0):
            rowdict = {}
            rowdict["OriginalSession"] = pdDocuments.loc[inputindex].SessionId
            rowdict["SimilarSession"] = pdDocuments.loc[matchedsessindex].SessionId
            rowdict["SimilarityMetric"] = round(tfidf_sims[inputindex][matchedsessindex] * 100, 2)
# this graph effectively being a "Undirected Graph" we need to
            # only add a new row if there is no prior edge connecting these 2 sessions
            prioredgeexists = False
for priorrow in similaritylist:# only add a new row if there is no prior edge connecting these 2 sessions
                # only add a new row if there is no prior edge connecting these 2 sessions
                if (priorrow["SimilarSession"] == rowdict["OriginalSession"] and priorrow["OriginalSession"] == rowdict["SimilarSession"]):
                    prioredgeexists = True
if (not prioredgeexists):
                similaritylist.append(rowdict)
print(str(matchedsessindex) + ": " + pdDocuments.loc[matchedsessindex]["Title"] + " ("  + str(round(tfidf_sims[inputindex][matchedsessindex] * 100, 2)) + "% similar)")
for sessid in range(len(pdDocuments)):
    similarsessions(sessid)
print(similaritylist.__len__())
finalresultDF = pd.DataFrame(similaritylist)
# rename the DF columns to suit graph column names
finalresultDF.rename(columns = {"OriginalSession":"$from_id_C19A274BF63B41359AD62328FD4E987D", "SimilarSession":"$to_id_464CF6F8A8A1406B914D18B5010D7CB1"}, inplace = True)
sqlDS=RxSqlServerData(connection_string = "Driver=ODBC Driver 13 for SQL Server;Server=.\SQL2017;Database=PASS-Demo;trusted_connection=YES"
, table="dbo.SimilarSessions")
rx_data_step(finalresultDF, output_file = sqlDS, append = ["rows"])
', @input_data_1 = N'SELECT CAST((ROW_NUMBER() OVER (ORDER BY (SELECT NULL))) - 1 AS INT) as RowIndex, Abstract, SessionLevel, Speaker1, Speaker2, Speaker3, Title, Track, $node_id AS SessionId FROM Session'

Once the above code is executed, the SimilarSessions table is populated with edges! Then we can query that table using regular T-SQL and the new MATCH predicate in SQL Graph. For example below we look at sessions similar to my colleague Denzil’s session:

SELECT TS.SessionId, TS.Title, SimilarityMetric
FROM SimilarSessions SS, [Session] OS, [Session] TS
where MATCH (OS-(SS)-&amp;gt;TS)
AND (OS.SessionId = 69503)
UNION ALL
SELECT OS.SessionId, OS.Title, SimilarityMetric
FROM SimilarSessions SS, [Session] OS, [Session] TS
where MATCH (OS-(SS)-&amp;gt;TS)
AND (TS.SessionId = 69503)

Here is the output of that query:

Sessions similar to Denzil's session

I’m sure you will agree, looking at the above, these are greatly correlated sessions and would be a great recommendation for anyone already viewing Denzil’s session!

Visualization – GraphML

Now, the last part is how to visualize the above graph in some capable tool. SQL does not ship with native visualization for graphs, and the main reason for this is that preferences on the visualization are hugely varied and we do not want to enforce anything specific from our side. Instead, we recommend using standard tools like d3.js, Gephi etc. In my case, I chose to use a very powerful tool called Cytoscape. Now, many of these tools understand a standard format for representing graphs, called GraphML. This format is XML and hence it is easy to use T-SQL to generate GraphML corresponding to our graph! Here’s the code to do this:

CREATE OR ALTER PROCEDURE CreateGraphML
AS
BEGIN
    DECLARE @prolog AS NVARCHAR (MAX) = N'       ';
    DECLARE @epilog AS NVARCHAR (MAX) = N'   ';
    DECLARE @nodeXML AS NVARCHAR (MAX) = (SELECT   *
                                          FROM     (SELECT 1 AS Tag,
                                                           0 AS Parent,
                                                           S.SessionId AS [node!1!id],
                                                           NULL AS [data!2!!element],
                                                           NULL AS [data!2!key]
                                                    FROM   dbo.[Session] AS S
                                                    UNION ALL
                                                    SELECT 2 AS Tag,
                                                           1 AS Parent,
                                                           S.SessionId,
                                                           CONCAT(S.Title, CHAR(13), CHAR(10), CONCAT('(', S.Speaker1, IIF (S.Speaker2 IS NULL, '', CONCAT(',', Speaker2)), IIF (S.Speaker3 IS NULL, '', CONCAT(',', Speaker3)), ')'), ' [', ((SELECT COUNT(*)
                                                                                                                                                                                                                                               FROM   SimilarSessions AS SS
                                                                                                                                                                                                                                               WHERE  SS.$FROM_ID = S.$NODE_ID) + (SELECT COUNT(*)
                                                                                                                                                                                                                                                                                   FROM   SimilarSessions AS SS
                                                                                                                                                                                                                                                                                   WHERE  SS.$TO_ID = S.$NODE_ID)), ' connections]'),
                                                           'd0'
                                                    FROM   dbo.[Session] AS S
                                                    UNION ALL
                                                    SELECT 2 AS Tag,
                                                           1 AS Parent,
                                                           S.SessionId,
                                                           S.Track,
                                                           'd1'
                                                    FROM   dbo.[Session] AS S
                                                    UNION ALL
                                                    SELECT 2 AS Tag,
                                                           1 AS Parent,
                                                           S.SessionId,
                                                           CAST (S.SessionId AS NVARCHAR (200)),
                                                           'd2'
                                                    FROM   dbo.[Session] AS S) AS InnerTable
                                          ORDER BY [node!1!id], [data!2!!element]
                                          FOR      XML EXPLICIT);
    DECLARE @edgeXML AS NVARCHAR (MAX);
    WITH   Edges
    AS     (SELECT OS.SessionId AS source,
                   TS.SessionId AS target,
                   CAST (SS.SimilarityMetric AS INT) AS data
            FROM   SimilarSessions AS SS, [Session] AS OS, [Session] AS TS
            WHERE  MATCH(OS-(SS)->TS))
    SELECT @edgeXML = (SELECT   *
                       FROM     (SELECT 1 AS Tag,
                                        0 AS Parent,
                                        source AS [edge!1!source],
                                        target AS [edge!1!target],
                                        NULL AS [data!2!!element],
                                        NULL AS [data!2!key]
                                 FROM   Edges
                                 UNION ALL
                                 SELECT 2 AS Tag,
                                        1 AS Parent,
                                        source,
                                        target,
                                        data,
                                        'd3'
                                 FROM   Edges) AS InnerTable
                       ORDER BY [edge!1!source], [edge!1!target], [data!2!!element]
                       FOR      XML EXPLICIT);
    SELECT CONCAT(@prolog, @nodeXML, @edgeXML, @epilog);
END
GO

EXECUTE CreateGraphML ;
GO

/* Run from CMD prompt: 
bcp "EXEC CreateGraphML" queryout PASS2017.xml -T -S.SQL2017 -dPASS-Demo -C65001 -c 
*/

And that’s it! When you run the BCP command line from CMD prompt, it will create a PASS2017.xml file, which is internally in the GraphML format. That’s easily imported into Cytoscape or other such graph visualization tools. And that is how we created the fun visualization that you saw in the “sneak preview” blog post!

Disclaimer

This Sample Code is provided for the purpose of illustration only and is not intended to be used in a production environment.  THIS SAMPLE CODE AND ANY RELATED INFORMATION ARE PROVIDED “AS IS” WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.  We grant You a nonexclusive, royalty-free right to use and modify the Sample Code and to reproduce and distribute the object code form of the Sample Code, provided that You agree: (i) to not use Our name, logo, or trademarks to market Your software product in which the Sample Code is embedded; (ii) to include a valid copyright notice on Your software product in which the Sample Code is embedded; and (iii) to indemnify, hold harmless, and defend Us and Our suppliers from and against any claims or lawsuits, including attorneys’ fees, that arise or result from the use or distribution of the Sample Code. This posting is provided “AS IS” with no warranties, and confers no rights.

Strange errors with SQLBindR

I recently encountered a strange problem while trying to bind a SQL Server 2016 instance to Microsoft R Server 9.1. On running the below command:

sqlbindR /list

I got the error:

An unexpected error has occurred: Value cannot be null.
Parameter name: input

I had a hunch that this was related to an ‘orphaned’ pre-release SQL Server 2017 instance which I had previously installed and manually deleted. I found some registry keys related to that (non-existent) instance and deleted those. But SQLBindR was still erroring out:

sqlbindR /list
An unexpected error has occurred: Version string portion was too short or too long

I finally figured out that there was still an entry for the “non-existent” SQL instance at the registry key HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Microsoft SQL Server\Instance Names\SQL. There was a value (on the right hand pane when viewed in RegEdit) for that old instance. When I deleted that value for the non-existent SQL instance, I was able to run SQLBindR  successfully!

Please note this troubleshooting tip is intended for non-production systems and for advanced users. If you have questions or concerns, or this error is happening on a production system, I strongly recommend you contact Microsoft Support.

Disclaimers Please note that this troubleshooting tip is provided as-is and neither me nor Microsoft offers any guarantees or warranties regarding the same. Using tools like RegEdit incorrectly can cause serious problems that may require you to reinstall your operating system. Use RegEdit at your own risk. This Information is provided for the purpose of illustration only and is not intended to be used in a production environment.  THIS INFORMATION IS PROVIDED “AS IS” WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.

Sneak Preview: SQL Graph session at PASS Summit 2017

At PASS Summit 2017, Shreya Verma and I will be co-presenting a session on using the newly added Graph data (nodes / edges) processing capabilities in your applications. One of the patterns we will discuss in that session is how to leverage in-database Python scripts to detect “similar” nodes and thereby ‘infer’ edges in the graph.

Update November 4th, 2017: You can now download this graph as a PDF file: PASS 2017 Sessions. Please view with Adobe Acrobat Reader at zoom level 1600%; web browsers like Edge and Chrome do not allow you such high zoom levels.)

We have a very interesting graph processing demo which uses PASS 2017 schedule data. In this demo, we will start with each session as a node in our graph, and then use in-database Python to detect similar sessions (based on the session title, abstract and the speaker names.) We then return that data to SQL to build weighted edges in the graph, and then use an external visualization tool to draw the graph. As a sneak preview, here is a section of the graph built with this method; edges connect similar sessions, and the thicker the edge, the more similar those sessions are:

PASS Summit 2017 sessions as a graph

To know more on this theme of how Python and Graph integrate, and how we generate the above visual, come see us in our session on Nov 3 @ 3:30PM. Yes, we know that’s the last session slot of PASS Summit 2017 – you always save the best for the last 🙂

Create your own database with all PASS Summit 2017 Sessions using SQL Server 2017 ML Services (Python)

Next week is the biggest event for the SQL Server community: PASS Summit 2017! I will be presenting the Real-world SQL Server R Services session with two of our customers – APT and Financial Fabric. I will also be part of the SQL Clinic, so I’m really excited and pumped about next week!

Being a Data Nut, it suddenly occurred to me today: what if we can import all the PASS Summit 2017 session titles, abstracts and other details into a SQL Server table? And once this thought was in my mind, there was no stopping me! The wonderful thing is that with Python in SQL Server 2017 you can leverage rich libraries such as Beautiful Soup 4, URLLib3 to parse HTML and then present it as a structured table (using Pandas) which SQL can then consume.

The code below will do exactly that for you. It leverages the above mentioned Python libraries, so prior to executing the script you must install two libraries (bs4 & urllib3) using PIP. (Note that pip.exe is present under the C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\PYTHON_SERVICES\scripts folder; change drive letters as appropriate for your installation.) Also note that PIP must be executed from within an administrative CMD prompt in Windows.

The other step for the below script to run correctly is that outgoing Internet access must be allowed for the SQL instance’s R / Python scripts. By default, we block such access. To edit the rule temporarily (we strongly recommend to not disable this rule on a permanent basis) use wf.msc to open the Firewall with Advanced Security console, and then locate the rule “Block network access for R local user accounts in SQL Server instance ” in the Outbound Rules section. Right click and Disable the rule for now; and DO NOT FORGET to enable it later on!

Alright, here is the code!

CREATE DATABASE PASS2017
GO

USE PASS2017
GO

DROP TABLE IF EXISTS PASS2017Sessions

CREATE TABLE PASS2017Sessions
(Abstract nvarchar(max)
		, SessionLevel int
		, Speaker1 nvarchar(100)
		, Speaker2 nvarchar(100)
		, Speaker3 nvarchar(100)
		, Title nvarchar(4000)
		, Track nvarchar(50)
		, SessionId int
		);
GO

INSERT PASS2017Sessions
exec sp_execute_external_script @language = N'Python',
@script = N'
from bs4 import BeautifulSoup
import urllib3
import re
import pandas as pd

http = urllib3.PoolManager()
schedpage = http.request("GET", "http://www.pass.org/summit/2017/Sessions/Schedule.aspx")
schedpage.status
soup_schedpage = BeautifulSoup(schedpage.data, "lxml")
schedpage.close()

documents = []
processedsessions = []

sessioncells = soup_schedpage.find_all("div", class_ = "session-schedule-cell", recursive=True)
for currsess in sessioncells:
    hrefs = currsess.find_all("a")
    if (len(hrefs) >= 1):
        rowdict = {}

        # session title

        rowdict["Title"] = hrefs[0].text

        # session level
        sesslevel = currsess.find("p", id = re.compile("plcLevel"))
        if (sesslevel != None):
            rowdict["Level"] = sesslevel.text.replace("Level: ", "")
        else:
            rowdict["Level"] = None

        # session track
        allps = currsess.find_all("p")
        rowdict["Track"] = allps[len(allps) -2].text

        # get into session page itself
        if ("href" in hrefs[0].attrs):
            sessurl = hrefs[0].attrs["href"]

            # session ID
            mtch = re.search(r"sid=(d+)", sessurl)
            if (mtch is None):
                continue

            # check if this session ID was already processed
            sessionid = mtch.group(1)

            if (sessionid in processedsessions):
                continue

            processedsessions.append(sessionid)

            rowdict["sessionid"] = sessionid

            sesspage = http.request("GET", sessurl)
            soup_sesspage = BeautifulSoup(sesspage.data, "lxml")
            sesspage.close()

            # session abstract
            sessabstract = soup_sesspage.find("pre", class_ ="abstract")
            rowdict["Abstract"] = sessabstract.text
            if (len(rowdict["Abstract"]) == 0):
                continue

            # speakers
            allspeakers = soup_sesspage.find_all("a", id=re.compile("Detail.+lnkSpeaker"))

            rowdict["Speaker1"] = None
            rowdict["Speaker2"] = None
            rowdict["Speaker3"] = None

            if (len(allspeakers) >= 1):
                rowdict["Speaker1"] = allspeakers[0].text

            if (len(allspeakers) >= 2):
                rowdict["Speaker2"] = allspeakers[1].text

            if (len(allspeakers) == 3):
                rowdict["Speaker3"] = allspeakers[2].text
        else:
            continue

        documents.append(rowdict)

OutputDataSet = pd.DataFrame(documents)'
GO

select *
from PASS2017Sessions
GO

If all goes well you should see the results:

httpsmsdnshared.blob.core.windows.netmedia201710PASS2017Sessions

Isn’t that so cool! Play around with it and let me know what you think. And if you are at the Summit and interested in SQL Server ML Services, a friendly reminder to come to the Real-world SQL Server R Services session. See you later!

Disclaimer

This Sample Code is provided for the purpose of illustration only and is not intended to be used in a production environment.  THIS SAMPLE CODE AND ANY RELATED INFORMATION ARE PROVIDED “AS IS” WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.  We grant You a nonexclusive, royalty-free right to use and modify the Sample Code and to reproduce and distribute the object code form of the Sample Code, provided that You agree: (i) to not use Our name, logo, or trademarks to market Your software product in which the Sample Code is embedded; (ii) to include a valid copyright notice on Your software product in which the Sample Code is embedded; and (iii) to indemnify, hold harmless, and defend Us and Our suppliers from and against any claims or lawsuits, including attorneys’ fees, that arise or result from the use or distribution of the Sample Code. This posting is provided “AS IS” with no warranties, and confers no rights.