Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not merge: Add ip node spec #6

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 143 additions & 35 deletions src/main/scala/com/whitepages/cloudmanager/state/SolrState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.apache.solr.common.cloud.{ClusterState, Replica, Slice, ZkStateReader

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.matching.Regex

object SolrReplica {
def hostName(nodeName: String) = {
Expand Down Expand Up @@ -87,34 +88,85 @@ case class SolrState(state: ClusterState, collectionInfo: CollectionInfo, config
lazy val activeReplicas = allReplicas.filter(_.active)
lazy val inactiveReplicas = allReplicas.filterNot(activeReplicas.contains)

/**
* Returns all replicas for a given collection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't tell me anything that isn't in the function name.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing

* @param collection
* @return
*/
def replicasFor(collection: String): Seq[SolrReplica] = allReplicas.filter(_.collection == collection)

/**
* Returns all replicas for a given collection, slice combination
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing

* @param collection
* @param sliceName
* @return
*/
def replicasFor(collection: String, sliceName: String): Seq[SolrReplica] =
replicasFor(collection).filter(_.slice.getName == sliceName)
def liveReplicasFor(collection: String): Seq[SolrReplica] = replicasFor(collection).filter(_.active)
def nodesWithCollection(collection: String) = replicasFor(collection).map(_.node).distinct
def nodesWithoutCollection(collection: String) = liveNodes -- nodesWithCollection(collection)
def nodesWithCollection(collection: String): Seq[String] = replicasFor(collection).map(_.node).distinct
def nodesWithoutCollection(collection: String): Set[String] = liveNodes -- nodesWithCollection(collection)

/**
*
* This method take the representation of each node, resolves it and creates a map of
* fully qualified domain name -> node ZK representation
*
* @param nodeList list of ZK represenation of each node in the cluster
* @return Map (dns name -> node zk representation)
*/
def dnsNameMap(nodeList: Set[String] = liveNodes): Map[String,String] = {
nodeList.map( node => InetAddress.getByName(node.take(node.indexOf(':'))).getCanonicalHostName -> node ).toMap
}

/**
* This method take the representation of each node, resolves it and creates a map of
* * IP address -> node ZK representation
*
* @param nodeList list of ZK represenation of each node in the cluster
* @return Map (ip address -> node zk representation)
*/
def ipNameMap(nodeList: Set[String] = liveNodes): Map[String,String] = {
nodeList.map( node => InetAddress.getByName(node.take(node.indexOf(':'))).getHostAddress -> node ).toMap
}

/**
* Takes the value specified for nodes by the user via the CLI and returns their
* corresponding string representation from the ZK cluster state
*
* @param indicators user passed in argument for nodes via the command line e.g. "all","empty", a comma separated
* list of IPs or a regular expression
* @param allowOfflineReferences
* @param ignoreUnrecognized
* @return
*/
def mapToNodes(indicators: Option[Seq[String]], allowOfflineReferences: Boolean = false, ignoreUnrecognized: Boolean = false): Option[Seq[String]] = {
indicators.map(mapToNodes(_, allowOfflineReferences, ignoreUnrecognized))
}

/**
*
* @param indicators user passed in argument for nodes via the command line e.g. "all","empty", a comma separated
* list of IPs or a regular expression
* @param allowOfflineReferences allow down nodes to be considered
* @param ignoreUnrecognized
* @return
*/
def mapToNodes(indicators: Seq[String], allowOfflineReferences: Boolean, ignoreUnrecognized: Boolean): Seq[String] = {
val nodeList = indicators.foldLeft(Seq[String]())( (acc, indicator) => {
val nodeList: Seq[String] = indicators.foldLeft(Seq[String]())((acc, indicator) => {
indicator.toLowerCase match {
case "all" =>
val nodeList = if (allowOfflineReferences) allNodes else liveNodes
val nodeList: Set[String] = if (allowOfflineReferences) allNodes else liveNodes
acc ++ nodeList
case "empty" =>
val nodeList = if (allowOfflineReferences) unusedNodes else unusedNodes -- downNodes
acc ++ nodeList.toSeq
case r if r.startsWith("regex=") =>
val pattern = r.stripPrefix("regex=").r
val nodeList = dnsNameMap(if (allowOfflineReferences) allNodes else liveNodes)
nodeList.filter{ case (k, v) => pattern.findFirstIn(k).nonEmpty}.values.toSeq
//If the user specified a regular expression
val pattern: Regex = r.stripPrefix("regex=").r
getNodeListUsingRegEx(pattern, allowOfflineReferences)
case i =>
//If a comma separated list of nodes is specified, then for each node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this comment misleading, as in this context, we're evaluating a single indicator. There's no list.

val nodeName = Try(Seq(canonicalNodeName(i, allowOfflineReferences))).recover({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But on a related note, maybe the Seq wrapper here is confusing and could be removed.

case e: ManagerException if ignoreUnrecognized =>
comment.warn(e.getMessage)
Expand All @@ -128,48 +180,104 @@ case class SolrState(state: ClusterState, collectionInfo: CollectionInfo, config
nodeList
}


/**
* Gets a known host name for a given string
* @param hostIndicator
* @param hostIndicator node indicator passed in by the user, this could be a host name or an IP, or
* an exact representation of the node in ZK
* @param allowOfflineReferences
* @throw ManagerException if a host could not be safely determined
* @return A known canonical host
*/
def canonicalNodeName(hostIndicator: String, allowOfflineReferences: Boolean = false): String = {
val rawNodeList = if (allowOfflineReferences) allNodes else liveNodes

def unambiguousFragment(fragment: String, dnsMap: Map[String,String]): Option[String] = {
findUnambigousNode(dnsMap, (s: String) => s == fragment)
.orElse(findUnambigousNode(dnsMap, (s: String) => s.contains(fragment)))
//If the value specified by the user exactly matches a node from the cluster state, return this value as is
if (rawNodeList.contains(hostIndicator)) {
hostIndicator
}
def findUnambigousNode(dnsMap: Map[String,String], comparison: (String) => Boolean): Option[String] = {
val matchingMaps = dnsMap.filter{
case (dnsName,canonName) => comparison(dnsName) || comparison(canonName)
}
matchingMaps.toList match {
case (dnsName, canonName) :: Nil => Some(canonName)
case _ => None
}
else {
unambiguousFragment(hostIndicator, dnsNameMap(rawNodeList)).getOrElse(
unambiguousFragment(hostIndicator, ipNameMap(rawNodeList)).getOrElse(
{
//Here we attempt to transform the indicator(user passed in value) instead of the values from the cluster state
val chunks = hostIndicator.split(':')
val host = chunks.head
val port = if (chunks.length > 1) ":" + chunks.last else ""
val ipAndPort = InetAddress.getByName(host).getHostAddress + port
val matches = rawNodeList.filter((node) => node.contains(ipAndPort))
matches.size match {
case 0 => throw new ManagerException(s"Could not determine a live node from '$hostIndicator'")
case 1 => matches.head
case _ => throw new ManagerException(s"Ambiguous node name '$hostIndicator', possible matches: $matches")
}
}
)
)
}
}

val nodeList = if (allowOfflineReferences) allNodes else liveNodes
if (nodeList.contains(hostIndicator)) {
hostIndicator

/**
*
* @param fragment user passed in string for node identification
* @param nodeComparisonMap map where key is the resolved name of a node(IP or Host) and value is its
* string representation in the cluster state
* @return
*/
def unambiguousFragment(fragment: String, nodeComparisonMap: Map[String,String]): Option[String] = {
findUnambigousNode(nodeComparisonMap, (s: String) => s == fragment)
.orElse(findUnambigousNode(nodeComparisonMap, (s: String) => s.contains(fragment)))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get rid of the .contains, it prohibits exact IP/Hostname specification. I believe this happens with the existing version of solr cloud manager also.
For e.g specifying --nodes 1.0.0.8 selects nodes 1.0.0.8,1.0.0.81 and 1.0.0.82

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree "contains" is a poor choice for partial name matching. I liked the idea of being able to say --nodes foo2,foo3 or perhaps --nodes 0.8,0.81,0.82 instead of repeating all the identical fully qualified info every time though, and I wasn't too bothered by 1.0.0.8 matching 1.0.0.8 and 1.0.0.81 because if I do actually have both, it should throw the ambiguous node spec message with the conflicting possibilities.

Perhaps a smarter comparison would do literal matches, but within the context of period-separated chunks. Again, just looking to see if there's a single node that matches while using that comparison function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the ambiguous node exception will not be thrown every time, IMO it should be. For e.g. if you invoke waitactive with an IP that matches multiple nodes, the call just sums up the replica count across those nodes and returns their state e.g. "All 18 shards were active". IMO we should enforce really tight matching in this case.

}

/**
*
* @param nodeComparisonMap map where key is the resolved name of a node(IP or Host) and value is its
* string representation in the cluster state
* @param comparison function to use for comparison
* @return
*/
def findUnambigousNode(nodeComparisonMap: Map[String,String], comparison: (String) => Boolean): Option[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be changed to just return the matching node list, and have the test for "single-match" elsewhere. Then we can reuse it in the regex side of things too.

val matchingMaps = nodeComparisonMap.filter{
case (resolvedNodeString, clusterNodeString) => comparison(resolvedNodeString) || comparison(clusterNodeString)
}
else {
unambiguousFragment(hostIndicator, dnsNameMap(nodeList)).getOrElse {
val chunks = hostIndicator.split(':')
val host = chunks.head
val port = if (chunks.length > 1) ":" + chunks.last else ""
val ipAndPort = InetAddress.getByName(host).getHostAddress + port
val matches = nodeList.filter((node) => node.contains(ipAndPort))
matches.size match {
case 0 => throw new ManagerException(s"Could not determine a live node from '$hostIndicator'")
case 1 => matches.head
case _ => throw new ManagerException(s"Ambiguous node name '$hostIndicator', possible matches: $matches")
matchingMaps.toList match {
case (resolvedNodeName, clusterNodeString) :: Nil => Some(clusterNodeString)
case _ => None
}
}


/**
* This method takes the passed in regular expression and searches for nodes that match this pattern
* In the first pass, the pattern attempts to match against fully qualified domain names
* If no nodes matched in the first pass, it then attempts to match against the IP addresses
* If no nodes matched in the second pass, it then attempts to match against the raw node representation returned by ZK
* Return an empty sequence if none of these work
*
* @param pattern the pattern to use for matching nodes
* @param allowOfflineReferences
* @return
*/
def getNodeListUsingRegEx(pattern: Regex, allowOfflineReferences: Boolean): Seq[String] = {
val clusterStateNodeList = if (allowOfflineReferences) allNodes else liveNodes

//First try by matching to the fully qualified domain name
val dnsNodeList: Map[String, String] = dnsNameMap(clusterStateNodeList)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't think of an idiomatic way to implement this without refactoring each individual filter into a method and use case classes

val dnsMatch = dnsNodeList.filter{ case (k, v) => pattern.findFirstIn(k).nonEmpty}.values.toSeq
if(!dnsMatch.isEmpty){ dnsMatch } else{
//If matching via domain name didn't work, try matching to the IP addresses
val ipNodeList: Map[String, String] = ipNameMap(clusterStateNodeList)
val ipMatch = ipNodeList.filter{ case (k, v) => pattern.findFirstIn(k).nonEmpty}.values.toSeq
if(!ipMatch.isEmpty){ ipMatch } else {
//If either of these approaches do not work, trying matching with the unresolved list of nodes i.e. use the
//node list from the cluster state without transforming it
val clusterStateMatch = clusterStateNodeList.filter{pattern.findFirstIn(_).nonEmpty}.toSeq
if(!clusterStateMatch.isEmpty){ clusterStateMatch } else {
//return an empty sequence if nothing matches
Seq()
}
}
}

}

}