-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not merge: Add ip node spec #6
base: master
Are you sure you want to change the base?
Changes from 5 commits
bc127a2
c5bf434
4c11ad2
92f8812
774e9f4
5a4ddfa
8fad29c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import org.apache.solr.common.cloud.{ClusterState, Replica, Slice, ZkStateReader | |
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.Try | ||
import scala.util.matching.Regex | ||
|
||
object SolrReplica { | ||
def hostName(nodeName: String) = { | ||
|
@@ -87,34 +88,85 @@ case class SolrState(state: ClusterState, collectionInfo: CollectionInfo, config | |
lazy val activeReplicas = allReplicas.filter(_.active) | ||
lazy val inactiveReplicas = allReplicas.filterNot(activeReplicas.contains) | ||
|
||
/** | ||
* Returns all replicas for a given collection | ||
* @param collection | ||
* @return | ||
*/ | ||
def replicasFor(collection: String): Seq[SolrReplica] = allReplicas.filter(_.collection == collection) | ||
|
||
/** | ||
* Returns all replicas for a given collection, slice combination | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing |
||
* @param collection | ||
* @param sliceName | ||
* @return | ||
*/ | ||
def replicasFor(collection: String, sliceName: String): Seq[SolrReplica] = | ||
replicasFor(collection).filter(_.slice.getName == sliceName) | ||
def liveReplicasFor(collection: String): Seq[SolrReplica] = replicasFor(collection).filter(_.active) | ||
def nodesWithCollection(collection: String) = replicasFor(collection).map(_.node).distinct | ||
def nodesWithoutCollection(collection: String) = liveNodes -- nodesWithCollection(collection) | ||
def nodesWithCollection(collection: String): Seq[String] = replicasFor(collection).map(_.node).distinct | ||
def nodesWithoutCollection(collection: String): Set[String] = liveNodes -- nodesWithCollection(collection) | ||
|
||
/** | ||
* | ||
* This method take the representation of each node, resolves it and creates a map of | ||
* fully qualified domain name -> node ZK representation | ||
* | ||
* @param nodeList list of ZK represenation of each node in the cluster | ||
* @return Map (dns name -> node zk representation) | ||
*/ | ||
def dnsNameMap(nodeList: Set[String] = liveNodes): Map[String,String] = { | ||
nodeList.map( node => InetAddress.getByName(node.take(node.indexOf(':'))).getCanonicalHostName -> node ).toMap | ||
} | ||
|
||
/** | ||
* This method take the representation of each node, resolves it and creates a map of | ||
* * IP address -> node ZK representation | ||
* | ||
* @param nodeList list of ZK represenation of each node in the cluster | ||
* @return Map (ip address -> node zk representation) | ||
*/ | ||
def ipNameMap(nodeList: Set[String] = liveNodes): Map[String,String] = { | ||
nodeList.map( node => InetAddress.getByName(node.take(node.indexOf(':'))).getHostAddress -> node ).toMap | ||
} | ||
|
||
/** | ||
* Takes the value specified for nodes by the user via the CLI and returns their | ||
* corresponding string representation from the ZK cluster state | ||
* | ||
* @param indicators user passed in argument for nodes via the command line e.g. "all","empty", a comma separated | ||
* list of IPs or a regular expression | ||
* @param allowOfflineReferences | ||
* @param ignoreUnrecognized | ||
* @return | ||
*/ | ||
def mapToNodes(indicators: Option[Seq[String]], allowOfflineReferences: Boolean = false, ignoreUnrecognized: Boolean = false): Option[Seq[String]] = { | ||
indicators.map(mapToNodes(_, allowOfflineReferences, ignoreUnrecognized)) | ||
} | ||
|
||
/** | ||
* | ||
* @param indicators user passed in argument for nodes via the command line e.g. "all","empty", a comma separated | ||
* list of IPs or a regular expression | ||
* @param allowOfflineReferences allow down nodes to be considered | ||
* @param ignoreUnrecognized | ||
* @return | ||
*/ | ||
def mapToNodes(indicators: Seq[String], allowOfflineReferences: Boolean, ignoreUnrecognized: Boolean): Seq[String] = { | ||
val nodeList = indicators.foldLeft(Seq[String]())( (acc, indicator) => { | ||
val nodeList: Seq[String] = indicators.foldLeft(Seq[String]())((acc, indicator) => { | ||
indicator.toLowerCase match { | ||
case "all" => | ||
val nodeList = if (allowOfflineReferences) allNodes else liveNodes | ||
val nodeList: Set[String] = if (allowOfflineReferences) allNodes else liveNodes | ||
acc ++ nodeList | ||
case "empty" => | ||
val nodeList = if (allowOfflineReferences) unusedNodes else unusedNodes -- downNodes | ||
acc ++ nodeList.toSeq | ||
case r if r.startsWith("regex=") => | ||
val pattern = r.stripPrefix("regex=").r | ||
val nodeList = dnsNameMap(if (allowOfflineReferences) allNodes else liveNodes) | ||
nodeList.filter{ case (k, v) => pattern.findFirstIn(k).nonEmpty}.values.toSeq | ||
//If the user specified a regular expression | ||
val pattern: Regex = r.stripPrefix("regex=").r | ||
getNodeListUsingRegEx(pattern, allowOfflineReferences) | ||
case i => | ||
//If a comma separated list of nodes is specified, then for each node | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find this comment misleading, as in this context, we're evaluating a single indicator. There's no list. |
||
val nodeName = Try(Seq(canonicalNodeName(i, allowOfflineReferences))).recover({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But on a related note, maybe the |
||
case e: ManagerException if ignoreUnrecognized => | ||
comment.warn(e.getMessage) | ||
|
@@ -128,48 +180,104 @@ case class SolrState(state: ClusterState, collectionInfo: CollectionInfo, config | |
nodeList | ||
} | ||
|
||
|
||
/** | ||
* Gets a known host name for a given string | ||
* @param hostIndicator | ||
* @param hostIndicator node indicator passed in by the user, this could be a host name or an IP, or | ||
* an exact representation of the node in ZK | ||
* @param allowOfflineReferences | ||
* @throw ManagerException if a host could not be safely determined | ||
* @return A known canonical host | ||
*/ | ||
def canonicalNodeName(hostIndicator: String, allowOfflineReferences: Boolean = false): String = { | ||
val rawNodeList = if (allowOfflineReferences) allNodes else liveNodes | ||
|
||
def unambiguousFragment(fragment: String, dnsMap: Map[String,String]): Option[String] = { | ||
findUnambigousNode(dnsMap, (s: String) => s == fragment) | ||
.orElse(findUnambigousNode(dnsMap, (s: String) => s.contains(fragment))) | ||
//If the value specified by the user exactly matches a node from the cluster state, return this value as is | ||
if (rawNodeList.contains(hostIndicator)) { | ||
hostIndicator | ||
} | ||
def findUnambigousNode(dnsMap: Map[String,String], comparison: (String) => Boolean): Option[String] = { | ||
val matchingMaps = dnsMap.filter{ | ||
case (dnsName,canonName) => comparison(dnsName) || comparison(canonName) | ||
} | ||
matchingMaps.toList match { | ||
case (dnsName, canonName) :: Nil => Some(canonName) | ||
case _ => None | ||
} | ||
else { | ||
unambiguousFragment(hostIndicator, dnsNameMap(rawNodeList)).getOrElse( | ||
unambiguousFragment(hostIndicator, ipNameMap(rawNodeList)).getOrElse( | ||
{ | ||
//Here we attempt to transform the indicator(user passed in value) instead of the values from the cluster state | ||
val chunks = hostIndicator.split(':') | ||
val host = chunks.head | ||
val port = if (chunks.length > 1) ":" + chunks.last else "" | ||
val ipAndPort = InetAddress.getByName(host).getHostAddress + port | ||
val matches = rawNodeList.filter((node) => node.contains(ipAndPort)) | ||
matches.size match { | ||
case 0 => throw new ManagerException(s"Could not determine a live node from '$hostIndicator'") | ||
case 1 => matches.head | ||
case _ => throw new ManagerException(s"Ambiguous node name '$hostIndicator', possible matches: $matches") | ||
} | ||
} | ||
) | ||
) | ||
} | ||
} | ||
|
||
val nodeList = if (allowOfflineReferences) allNodes else liveNodes | ||
if (nodeList.contains(hostIndicator)) { | ||
hostIndicator | ||
|
||
/** | ||
* | ||
* @param fragment user passed in string for node identification | ||
* @param nodeComparisonMap map where key is the resolved name of a node(IP or Host) and value is its | ||
* string representation in the cluster state | ||
* @return | ||
*/ | ||
def unambiguousFragment(fragment: String, nodeComparisonMap: Map[String,String]): Option[String] = { | ||
findUnambigousNode(nodeComparisonMap, (s: String) => s == fragment) | ||
.orElse(findUnambigousNode(nodeComparisonMap, (s: String) => s.contains(fragment))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should get rid of the .contains, it prohibits exact IP/Hostname specification. I believe this happens with the existing version of solr cloud manager also. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree "contains" is a poor choice for partial name matching. I liked the idea of being able to say Perhaps a smarter comparison would do literal matches, but within the context of period-separated chunks. Again, just looking to see if there's a single node that matches while using that comparison function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the ambiguous node exception will not be thrown every time, IMO it should be. For e.g. if you invoke waitactive with an IP that matches multiple nodes, the call just sums up the replica count across those nodes and returns their state e.g. "All 18 shards were active". IMO we should enforce really tight matching in this case. |
||
} | ||
|
||
/** | ||
* | ||
* @param nodeComparisonMap map where key is the resolved name of a node(IP or Host) and value is its | ||
* string representation in the cluster state | ||
* @param comparison function to use for comparison | ||
* @return | ||
*/ | ||
def findUnambigousNode(nodeComparisonMap: Map[String,String], comparison: (String) => Boolean): Option[String] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this should be changed to just return the matching node list, and have the test for "single-match" elsewhere. Then we can reuse it in the regex side of things too. |
||
val matchingMaps = nodeComparisonMap.filter{ | ||
case (resolvedNodeString, clusterNodeString) => comparison(resolvedNodeString) || comparison(clusterNodeString) | ||
} | ||
else { | ||
unambiguousFragment(hostIndicator, dnsNameMap(nodeList)).getOrElse { | ||
val chunks = hostIndicator.split(':') | ||
val host = chunks.head | ||
val port = if (chunks.length > 1) ":" + chunks.last else "" | ||
val ipAndPort = InetAddress.getByName(host).getHostAddress + port | ||
val matches = nodeList.filter((node) => node.contains(ipAndPort)) | ||
matches.size match { | ||
case 0 => throw new ManagerException(s"Could not determine a live node from '$hostIndicator'") | ||
case 1 => matches.head | ||
case _ => throw new ManagerException(s"Ambiguous node name '$hostIndicator', possible matches: $matches") | ||
matchingMaps.toList match { | ||
case (resolvedNodeName, clusterNodeString) :: Nil => Some(clusterNodeString) | ||
case _ => None | ||
} | ||
} | ||
|
||
|
||
/** | ||
* This method takes the passed in regular expression and searches for nodes that match this pattern | ||
* In the first pass, the pattern attempts to match against fully qualified domain names | ||
* If no nodes matched in the first pass, it then attempts to match against the IP addresses | ||
* If no nodes matched in the second pass, it then attempts to match against the raw node representation returned by ZK | ||
* Return an empty sequence if none of these work | ||
* | ||
* @param pattern the pattern to use for matching nodes | ||
* @param allowOfflineReferences | ||
* @return | ||
*/ | ||
def getNodeListUsingRegEx(pattern: Regex, allowOfflineReferences: Boolean): Seq[String] = { | ||
val clusterStateNodeList = if (allowOfflineReferences) allNodes else liveNodes | ||
|
||
//First try by matching to the fully qualified domain name | ||
val dnsNodeList: Map[String, String] = dnsNameMap(clusterStateNodeList) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't think of an idiomatic way to implement this without refactoring each individual filter into a method and use case classes |
||
val dnsMatch = dnsNodeList.filter{ case (k, v) => pattern.findFirstIn(k).nonEmpty}.values.toSeq | ||
if(!dnsMatch.isEmpty){ dnsMatch } else{ | ||
//If matching via domain name didn't work, try matching to the IP addresses | ||
val ipNodeList: Map[String, String] = ipNameMap(clusterStateNodeList) | ||
val ipMatch = ipNodeList.filter{ case (k, v) => pattern.findFirstIn(k).nonEmpty}.values.toSeq | ||
if(!ipMatch.isEmpty){ ipMatch } else { | ||
//If either of these approaches do not work, trying matching with the unresolved list of nodes i.e. use the | ||
//node list from the cluster state without transforming it | ||
val clusterStateMatch = clusterStateNodeList.filter{pattern.findFirstIn(_).nonEmpty}.toSeq | ||
if(!clusterStateMatch.isEmpty){ clusterStateMatch } else { | ||
//return an empty sequence if nothing matches | ||
Seq() | ||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't tell me anything that isn't in the function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing