通过maven引入Lynx
<dependency>
<groupId>org.grapheco</groupId>
<artifactId>lynx</artifactId>
<version>0.5</version>
</dependency>
首先需要实现Lynx中的图数据基本元素: LynxNode和LynxRelationship
LynxId是用Lynx中节点和关系的唯一标识符. 这里我们使用一个Long来表示id.
case class org.grapheco.MyId(value: Long) extends LynxId
case class org.grapheco.MyNode(id: org.grapheco.MyId, labels: Seq[LynxNodeLabel], props: Map[LynxPropertyKey, LynxValue]) extends LynxNode{
override def property(propertyKey: LynxPropertyKey): Option[LynxValue] = props.get(propertyKey)
}
case class org.grapheco.MyRelationship(id: org.grapheco.MyId,
startNodeId: org.grapheco.MyId,
endNodeId: org.grapheco.MyId,
relationType: Option[LynxRelationshipType],
props: Map[LynxPropertyKey, LynxValue]) extends LynxRelationship {
override def property(propertyKey: LynxPropertyKey): Option[LynxValue] = props.get(propertyKey)
}
实现了基本类型之后, 开始实现GraphModel. 定义一个org.grapheco.MemGraph
继承Lynx中的GraphModel
.
实现GraphModel需要实现以下基本方法:
以下是具体实现:
首先, 定义两个可变Map用来存储节点和关系, 其中Key为元素的id, 值为该元素.
private val _nodes: mutable.Map[org.grapheco.MyId, org.grapheco.MyNode] = mutable.Map()
private val _relationships: mutable.Map[org.grapheco.MyId, org.grapheco.MyRelationship] = mutable.Map()
在创建元素时, 需要生成不重复的id. 这里简单地用一个递增的Long变量来记录id, 并提供获取方法.
private var _nodeId: Long = 0
private var _relationshipId: Long = 0
private def nodeId: org.grapheco.MyId = {_nodeId += 1; org.grapheco.MyId(_nodeId)}
private def relationshipId: org.grapheco.MyId = {_relationshipId += 1; org.grapheco.MyId(_relationshipId)}
为了方便后续操作, 定义两个根据id获取元素的方法:
private def nodeAt(id: LynxId): Option[org.grapheco.MyNode] = _nodes.get(id)
private def relationshipAt(id: LynxId): Option[org.grapheco.MyRelationship] = _relationships.get(id)
首先实现两个获取全量元素的方法, 如下. 需要注意的是, relationships()
返回的
是三元组的迭代, 因此要做一次转换.
override def nodes(): Iterator[LynxNode] = _nodes.valuesIterator
override def relationships(): Iterator[PathTriple] =
_relationships.valuesIterator.map(rel => PathTriple(nodeAt(rel.startNodeId).get, rel, nodeAt(rel.endNodeId).get))
接下来实现WriteTask中的方法, 首先定义一组Map用于存放更改, 以及一组Array用于记录需要删除的元素的id.
val _nodesBuffer: mutable.Map[org.grapheco.MyId, org.grapheco.MyNode] = mutable.Map()
val _nodesToDelete: mutable.ArrayBuffer[org.grapheco.MyId] = mutable.ArrayBuffer()
val _relationshipsBuffer: mutable.Map[org.grapheco.MyId, org.grapheco.MyRelationship] = mutable.Map()
val _relationshipsToDelete: mutable.ArrayBuffer[org.grapheco.MyId] = mutable.ArrayBuffer()
实现创建元素方法: createElements有三个变量,前两个为输入元素,即要create的节点和关系。 onCreated为回调函数,传入创建好的元素。 需要注意的是,关系输入中的起始节点id是一个NodeInputRef类型,包括两种情况,一是该节点的绝对id, 或者该节点是nodesInput中的一个,此时由于还没有id,用name进行定位。这里针对这些情况定义了localNodeRef方法用于区分处理。
override def createElements[T](nodesInput: Seq[(String, NodeInput)],
relationshipsInput: Seq[(String, RelationshipInput)],
onCreated: (Seq[(String, LynxNode)], Seq[(String, LynxRelationship)]) => T): T = {
val nodesMap: Map[String, org.grapheco.MyNode] = nodesInput.toMap
.map{case (valueName,input) => valueName -> org.grapheco.MyNode(nodeId, input.labels, input.props.toMap)}
def localNodeRef(ref: NodeInputRef): org.grapheco.MyId = ref match {
case StoredNodeInputRef(id) => id
case ContextualNodeInputRef(valueName) => nodesMap(valueName).id
}
val relationshipsMap: Map[String, org.grapheco.MyRelationship] = relationshipsInput.toMap.map{
case (valueName,input) =>
valueName -> org.grapheco.MyRelationship(relationshipId, localNodeRef(input.startNodeRef),
localNodeRef(input.endNodeRef), input.types.headOption,input.props.toMap)
}
_nodesBuffer ++= nodesMap.map{ case (_, node) => (node.id, node)}
_relationshipsBuffer ++= relationshipsMap.map{ case (_, relationship) => (relationship.id, relationship)}
onCreated(nodesMap.toSeq, relationshipsMap.toSeq)
}
实现删除方法: 很简单, 就是把该id相关的修改删掉, 并在删除数组里加入这个id
override def deleteRelations(ids: Iterator[LynxId]): Unit = ids.foreach{ id =>
_relationshipsBuffer.remove(id)
_relationshipsToDelete += id
}
override def deleteNodes(ids: Seq[LynxId]): Unit = ids.foreach{ id =>
_nodesBuffer.remove(id)
_nodesToDelete += id
}
接下来实现一系列各类元素的标签,类型,属性的增删方法. 这一类方法都是对原始数据的更新, 因此, 这里先定义了一组update方法. 方法接受一组元素id,已经更新函数. 方法会先在Buffer中获取旧数据, 如果没有再去全局数据中找(如果都没有则表示没有这个元素, 无视之).
private def updateNodes(ids: Iterator[LynxId], update: org.grapheco.MyNode => org.grapheco.MyNode): Iterator[Option[LynxNode]] = {
ids.map{ id =>
val updated = _nodesBuffer.get(id).orElse(nodeAt(id)).map(update)
updated.foreach(newNode => _nodesBuffer.update(newNode.id, newNode))
updated
}
}
private def updateRelationships(ids: Iterator[LynxId], update: org.grapheco.MyRelationship => org.grapheco.MyRelationship): Iterator[Option[org.grapheco.MyRelationship]] = {
ids.map{ id =>
val updated = _relationshipsBuffer.get(id).orElse(relationshipAt(id)).map(update)
updated.foreach(newRel => _relationshipsBuffer.update(newRel.id, newRel))
updated
}
}
有了这组方法, 后续的一系列实现都较容易:
override def setNodesProperties(nodeIds: Iterator[LynxId], data: Array[(LynxPropertyKey, Any)], cleanExistProperties: Boolean): Iterator[Option[LynxNode]] =
updateNodes(nodeIds, old => org.grapheco.MyNode(old.id, old.labels, if (cleanExistProperties) Map.empty else old.props ++ data.toMap.mapValues(LynxValue.apply)))
override def setNodesLabels(nodeIds: Iterator[LynxId], labels: Array[LynxNodeLabel]): Iterator[Option[LynxNode]] =
updateNodes(nodeIds, old => org.grapheco.MyNode(old.id, (old.labels ++ labels.toSeq).distinct, old.props))
override def setRelationshipsProperties(relationshipIds: Iterator[LynxId], data: Array[(LynxPropertyKey, Any)]): Iterator[Option[LynxRelationship]] =
updateRelationships(relationshipIds, old => org.grapheco.MyRelationship(old.id, old.startNodeId, old.endNodeId, old.relationType, data.toMap.mapValues(LynxValue.apply)))
override def setRelationshipsType(relationshipIds: Iterator[LynxId], typeName: LynxRelationshipType): Iterator[Option[LynxRelationship]] =
updateRelationships(relationshipIds, old => org.grapheco.MyRelationship(old.id, old.startNodeId, old.endNodeId, Some(typeName), old.props))
override def removeNodesProperties(nodeIds: Iterator[LynxId], data: Array[LynxPropertyKey]): Iterator[Option[LynxNode]] =
updateNodes(nodeIds, old => org.grapheco.MyNode(old.id, old.labels, old.props.filterNot(data.contains)))
override def removeNodesLabels(nodeIds: Iterator[LynxId], labels: Array[LynxNodeLabel]): Iterator[Option[LynxNode]] =
updateNodes(nodeIds, old => org.grapheco.MyNode(old.id, old.labels.filterNot(labels.contains), old.props))
override def removeRelationshipsProperties(relationshipIds: Iterator[LynxId], data: Array[LynxPropertyKey]): Iterator[Option[LynxRelationship]] =
updateRelationships(relationshipIds, old => org.grapheco.MyRelationship(old.id, old.startNodeId, old.endNodeId, old.relationType, old.props.filterNot(data.contains)))
override def removeRelationshipsType(relationshipIds: Iterator[LynxId], typeName: LynxRelationshipType): Iterator[Option[LynxRelationship]] =
updateRelationships(relationshipIds, old => org.grapheco.MyRelationship(old.id, old.startNodeId, old.endNodeId, None, old.props))
最后是commit方法, 主要是把Buffer和Delete中的操作并入存储数据.
override def commit: Boolean = {
_nodes ++= _nodesBuffer
_nodes --= _nodesToDelete
_relationships ++= _relationshipsBuffer
_relationships --= _relationshipsToDelete
_nodesBuffer.clear()
_nodesToDelete.clear()
_relationshipsBuffer.clear()
_relationshipsToDelete.clear()
true
}
实现了GraphModel之后, 只需要创建一个CypherRunner, 并传入实现的GraphModel即可支持通过Cypher进行图查询.
private val runner = new CypherRunner(this)
def run(query: String, param: Map[String, Any] = Map.empty[String, Any]): LynxResult = {
// runner.compile(query)
runner.run(query, param, None)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。