My motivation behind this particular script was to start trying out some new hardcore pipelines with the AWS infrastructure. I know, however, that it would be good to have my data structured in such a way that it could be used for automatic machine learning.
Neo4j is an attractive choice for me, because first it has a python wrapper that is easy to use and it’s interoperable. I’ve also heard Neo4j Bloom coming more business friendly as a viable software route. Global-Chem
is a knowledge graph organized by it’s folder directory structure. I have an output of the full graph and each node path in a tsv file:
So how do I organize this data structure that I have created for my stuff and port it into Neo4j.
Let’s start with the base object of Neo4J acting as an oject relation mapper (ORM) to the Neo4j Database. Pip install your imports:
import time
import pandas as pd
from neo4j import GraphDatabase, basic_auth
The functionality we need is to query the database and close it as the base functions. I’m sure adding and updating data will be coming in soon but we don’t know what that looks like just yet. Here’s the full code of the object:
class Neo4jConnection:
def __init__(self, uri, user, pwd): self.__uri = uri
self.__user = user
self.__pwd = pwd
self.__driver = None try:
self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
except Exception as e:
print("Failed to create the driver:", e)
def close(self):
if self.__driver is not None:
self.__driver.close()
def query(self, query, parameters=None, db=None):
assert self.__driver is not None, "Driver not initialized!"
session = None
response = None
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
response = list(session.run(query, parameters))
except Exception as e:
print("Query failed:", e)
finally:
if session is not None:
session.close()
return response
Let’s break down what you are seeing. And what we expect the user to pass in:
User:
conn = Neo4jConnection(
uri="bolt://44.210.87.223:7687",
user="neo4j",
pwd="convulsion-endings-bulbs"
)
Object:
class Neo4jConnection:
def __init__(self, uri, user, pwd):
self.__uri = uri
self.__user = user
self.__pwd = pwd
self.__driver = None
try:
self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
except Exception as e:
print("Failed to create the driver:", e)
You can create a Neo4J sandbox here and copy/paste the URI to the uri parameter. Likewise do both the username and password.
The real test will now be whether the GraphDatabase Driver was set up which is our connection modality to the Neo4j. If the authentication failed then it will happen here and something is misconfigured.
Next, we add a method to query the database. I want to be able to query all my database and their node network as a test to see the nodes were inserted correctly into the database. I create a CONSTRAINT
on the 3 main attributes of the data Molecule
, Name
, Category
to ensure their uniqueness. The reason being is that we are starting to have a misbalance of different SMILES mapped to the same name. Our data creation has been pretty robust as a manual effort but needs to be actually enforced. Creating these boundaries now will ensure scalability.
User:
conn.query('CREATE CONSTRAINT molecules IF NOT EXISTS FOR (p:Molecule) REQUIRE p.id IS UNIQUE')
conn.query('CREATE CONSTRAINT names IF NOT EXISTS FOR (a:Name) REQUIRE a.name IS UNIQUE')
conn.query('CREATE CONSTRAINT categories IF NOT EXISTS FOR (c:Category) REQUIRE c.category IS UNIQUE')
The code connects to the driver
and then initiates a session with the database. This session is then used to perform queries, cache them, and do what you need to do. The session.close
at the end of the try block is to ensure the session closes without any memory leakage which can often happen in such cases if a session hangs.
Code:
def query(self, query, parameters=None, db=None):
assert self.__driver is not None, "Driver not initialized!"
session = None
response = None
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
response = list(session.run(query, parameters))
except Exception as e:
print("Query failed:", e)
finally:
if session is not None:
session.close()
return response
Let’s add two things now to our database: Categories
and the Names
of chemical compounds we would like to add. Fortunately, we have that data already organized for us.
df = pd.read_csv(
'global_chem.tsv',
sep='\t',
names=['names_list', 'smiles', 'node', 'category', 'path']
)
So let’s read in the tsv file. The example of the row of the data looks like this:
perfluorohexanoic acid C(=O)(C(C(C(C(C(F)(F)F)(F)F)(F)F)(F)F)(F)F)O emerging_perfluoroalkyls environmental_chemistry global_chem.environment.emerging_perfluoroalkyls
The name is the Perfluorohexanoic acid
and then the SMILES, node, category, and the path to the node in the network.
We want three items to prepare for our network and we can do so with the pandas
package.
paths = df['path'].to_list()
categories = []
parent_nodes = []
ids = []
global_chem_counters = []
counter = 1
for path in paths:
parent_node = path.split('.')[-1]
categories.append(path.split('.')[:-1])
parent_nodes.append(parent_node)
global_chem_counters.append(counter)
counter += 1
df['nodes'] = parent_nodes
df['categories'] = categories
df['ids'] = global_chem_counters
What we are doing is fetching the last item path envionrment.emerging_perfluoroalkyls
where the node is the last item and the category is one parent up. For now let’s focus on getting the categories up and ready.
Next we want to create a row for every unique category that is possible. We explode the categories
and then drop the duplicates.
categories = pd.DataFrame(df[['categories']])
categories.rename(columns={'categories':'category'},
inplace=True)
categories = categories.explode('category').drop_duplicates(subset=['category'])
And then we want our addition functions into neo4j.
User:
add_categories(categories)
Code:
def add_categories(categories):
# Adds category nodes to the Neo4j graph.
query = '''
UNWIND $rows AS row
MERGE (c:Category {category: row.category})
RETURN count(*) as total
'''
return conn.query(query, parameters = {'rows':categories.to_dict('records')})
UNWIND
unwinds a list of data as an accessible dict object. The row[`category`] is now going to be updated and then the total amount of rows are then returned.
The connection is called with the categories to dict function which passed in the data as a list of dictionaries.
[{'category': 'environment'}, {'category': 'instellar_space'}]
So see if you can add categories to your Neo4J Database and we can move on to Part 2.