Skip to content

Commit

Permalink
Merge pull request #36 from xp-forge/refactor/commands
Browse files Browse the repository at this point in the history
Introduce Commands class which keeps all messages on the same connection
  • Loading branch information
thekid authored Aug 17, 2023
2 parents ea47be7 + 3bcba0d commit 9b5540c
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 138 deletions.
29 changes: 17 additions & 12 deletions src/main/php/com/mongodb/Collection.class.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php namespace com\mongodb;

use com\mongodb\io\Protocol;
use com\mongodb\io\{Commands, Protocol};
use com\mongodb\result\{Insert, Update, Delete, Cursor, Run, ChangeStream};
use lang\Value;
use util\Objects;
Expand Down Expand Up @@ -45,16 +45,17 @@ public function command($name, array $params= [], Session $session= null) {
*
* @param string $name
* @param [:var] $params
* @param string $method one of `read` or `write`
* @param string $semantics one of `read` or `write`
* @param ?com.mongodb.Session $session
* @return com.mongodb.result.Run
* @throws com.mongodb.Error
*/
public function run($name, array $params= [], $method= 'write', Session $session= null) {
public function run($name, array $params= [], $semantics= 'write', Session $session= null) {
$commands= Commands::using($this->proto, $semantics);
return new Run(
$this->proto,
$commands,
$session,
$this->proto->{$method}($session, [$name => $this->name] + $params + ['$db' => $this->database])
$commands->send($session, [$name => $this->name] + $params + ['$db' => $this->database])
);
}

Expand Down Expand Up @@ -161,12 +162,13 @@ public function delete($query, Session $session= null): Delete {
* @throws com.mongodb.Error
*/
public function find($query= [], Session $session= null): Cursor {
$result= $this->proto->read($session, [
$commands= Commands::reading($this->proto);
$result= $commands->send($session, [
'find' => $this->name,
'filter' => is_array($query) ? ($query ?: (object)[]) : ['_id' => $query],
'$db' => $this->database,
]);
return new Cursor($this->proto, $session, $result['body']['cursor']);
return new Cursor($commands, $session, $result['body']['cursor']);
}

/**
Expand Down Expand Up @@ -232,12 +234,13 @@ public function aggregate(array $pipeline= [], Session $session= null): Cursor {
// https://docs.mongodb.com/manual/reference/operator/aggregation/merge/
$last= $pipeline ? key($pipeline[sizeof($pipeline) - 1]) : null;
if ('$out' === $last || '$merge' === $last) {
$result= $this->proto->write($session, $sections);
$commands= Commands::writing($this->proto);
} else {
$result= $this->proto->read($session, $sections);
$commands= Commands::reading($this->proto);
}

return new Cursor($this->proto, $session, $result['body']['cursor']);
$result= $commands->send($session, $sections);
return new Cursor($commands, $session, $result['body']['cursor']);
}

/**
Expand All @@ -251,13 +254,15 @@ public function aggregate(array $pipeline= [], Session $session= null): Cursor {
*/
public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream {
array_unshift($pipeline, ['$changeStream' => (object)$options]);
$result= $this->proto->read($session, [

$commands= Commands::reading($this->proto);
$result= $commands->send($session, [
'aggregate' => $this->name,
'pipeline' => $pipeline,
'cursor' => (object)[],
'$db' => $this->database,
]);
return new ChangeStream($this->proto, $session, $result['body']['cursor']);
return new ChangeStream($commands, $session, $result['body']['cursor']);
}

/** @return string */
Expand Down
15 changes: 9 additions & 6 deletions src/main/php/com/mongodb/Database.class.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php namespace com\mongodb;

use com\mongodb\io\Protocol;
use com\mongodb\io\{Commands, Protocol};
use com\mongodb\result\{Cursor, ChangeStream};
use lang\Value;
use util\Objects;
Expand Down Expand Up @@ -34,12 +34,13 @@ public function collection(string $name): Collection {
* @return [:var][]
* @throws com.mongodb.Error
*/
public function collections($session= null) {
$result= $this->proto->read($session, [
public function collections(Session $session= null) {
$commands= Commands::reading($this->proto);
$result= $commands->send($session, [
'listCollections' => (object)[],
'$db' => $this->name
]);
return new Cursor($this->proto, $session, $result['body']['cursor']);
return new Cursor($commands, $session, $result['body']['cursor']);
}

/**
Expand All @@ -53,13 +54,15 @@ public function collections($session= null) {
*/
public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream {
array_unshift($pipeline, ['$changeStream' => (object)$options]);
$result= $this->proto->read($session, [

$commands= Commands::reading($this->proto);
$result= $commands->send($session, [
'aggregate' => 1,
'pipeline' => $pipeline,
'cursor' => (object)[],
'$db' => $this->name,
]);
return new ChangeStream($this->proto, $session, $result['body']['cursor']);
return new ChangeStream($commands, $session, $result['body']['cursor']);
}

/** @return string */
Expand Down
17 changes: 10 additions & 7 deletions src/main/php/com/mongodb/MongoConnection.class.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php namespace com\mongodb;

use com\mongodb\io\Protocol;
use com\mongodb\io\{Commands, Protocol};
use com\mongodb\result\{ChangeStream, Run};
use lang\{IllegalArgumentException, Value};
use peer\AuthenticationException;
Expand Down Expand Up @@ -54,18 +54,19 @@ public function connect(): self {
*
* @param string $name
* @param [:var] $arguments
* @param string $method one of `read` or `write`
* @param string $semantics one of `read` or `write`
* @param ?com.mongodb.Session $session
* @return com.mongodb.result.Run
* @throws com.mongodb.Error
*/
public function run($name, array $arguments= [], $method= 'write', Session $session= null) {
public function run($name, array $arguments= [], $semantics= 'write', Session $session= null) {
$this->proto->connect();

$commands= Commands::using($this->proto, $semantics);
return new Run(
$this->proto,
$commands,
$session,
$this->proto->{$method}($session, [$name => 1] + $arguments + ['$db' => 'admin'])
$commands->send($session, [$name => 1] + $params + ['$db' => 'admin'])
);
}

Expand Down Expand Up @@ -158,13 +159,15 @@ public function watch(array $pipeline= [], array $options= [], Session $session=
$this->proto->connect();

array_unshift($pipeline, ['$changeStream' => ['allChangesForCluster' => true] + $options]);
$result= $this->proto->read($session, [

$commands= Commands::reading($this->proto);
$result= $commands->send($session, [
'aggregate' => 1,
'pipeline' => $pipeline,
'cursor' => (object)[],
'$db' => 'admin',
]);
return new ChangeStream($this->proto, $session, $result['body']['cursor']);
return new ChangeStream($commands, $session, $result['body']['cursor']);
}

/** @return string */
Expand Down
66 changes: 66 additions & 0 deletions src/main/php/com/mongodb/io/Commands.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php namespace com\mongodb\io;

/**
* Ensures all message sent using this instance are executed against
* the same socket connection, e.g. for cursors.
*
* @see https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#cursors
*/
class Commands {
private $proto, $conn;

/**
* Creates an instance using a protocol and connection instance.
*
* @param com.mongodb.io.Protocol $proto
* @param com.mongodb.io.Connection $conn
*/
private function __construct($proto, $conn) {
$this->proto= $proto;
$this->conn= $conn;
}

/** Creates an instance for reading */
public static function reading(Protocol $proto): self {
return new self($proto, $proto->establish(
$proto->candidates($proto->readPreference['mode']),
'reading with '.$proto->readPreference['mode']
));
}

/** Creates an instance for writing */
public static function writing(Protocol $proto): self {
return new self($proto, $proto->establish(
[$proto->nodes['primary']],
'writing'
));
}

/**
* Creates an instance using given semantics
*
* @param com.mongodb.io.Protocol $proto
* @param string $semantics either "read" or "write"
* @return self
*/
public static function using($proto, $semantics) {
if ('read' === $semantics) {
return self::reading($proto);
} else {
return self::writing($proto);
}
}

/**
* Sends a message
*
* @param ?com.mongodb.Session $session
* @param [:var] $sections
* @return var
* @throws com.mongodb.Error
*/
public function send($session, $sections) {
$session && $sections+= $session->send($this->proto);
return $this->conn->message($sections, $this->proto->readPreference);
}
}
90 changes: 52 additions & 38 deletions src/main/php/com/mongodb/io/Protocol.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
class Protocol {
private $options, $auth;
private $conn= [];
protected $conn= [];
public $nodes= null;
public $readPreference;
public $socketCheckInterval= 5;
Expand Down Expand Up @@ -130,21 +130,54 @@ public function dsn(bool $password= false): string {
* @throws com.mongodb.Error
*/
public function connect() {
$this->nodes || $this->send(array_keys($this->conn), null, 'initial connect');
$this->nodes || $this->establish(array_keys($this->conn), 'initial connect');
return $this;
}

/**
* Select a connection, then send message
* Returns candidates for connecting to based on a given read preference.
*
* @see https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#read-preference
* @see https://docs.mongodb.com/manual/core/read-preference-mechanics/
* @param string $rp
* @return string[]
* @throws lang.IllegalArgumentException
*/
public function candidates($rp) {
if ('primary' === $rp) {
return [$this->nodes['primary']];
} else if ('secondary' === $rp) {
return $this->nodes['secondary'];
} else if ('primaryPreferred' === $rp) {
return array_merge([$this->nodes['primary']], $this->nodes['secondary']);
} else if ('secondaryPreferred' === $rp) {
return array_merge($this->nodes['secondary'], [$this->nodes['primary']]);
} else if ('nearest' === $rp) { // Prefer to stay on already open connections
$connected= null;
foreach ($this->conn as $id => $conn) {
if (null === $conn->server) continue;
$connected= $id;
break;
}
return array_unique(array_merge(
(array)$connected,
[$this->nodes['primary']],
$this->nodes['secondary']
));
}

throw new IllegalArgumentException('Unknown read preference "'.$rp.'"');
}

/**
* Establish a connection for a list of given candidates
*
* @see https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#checking-an-idle-socket-after-socketcheckintervalms
* @param string[] $candidates
* @param [:var] $sections
* @param string $intent used within potential error messages
* @return var
* @throws com.mongodb.Error
* @param string $intent
* @return com.mongodb.io.Connection
* @throws com.mongodb.NoSuitableCandidate
*/
private function send($candidates, $sections, $intent) {
public function establish($candidates, $intent) {
$time= time();
$cause= null;
foreach ($candidates as $candidate) {
Expand All @@ -169,7 +202,7 @@ private function send($candidates, $sections, $intent) {
}
}

return null === $sections ? null : $conn->message($sections, $this->readPreference);
return $conn;
} catch (SocketException $e) {
$conn->close();
$cause ? $cause->setCause($e) : $cause= $e;
Expand All @@ -180,10 +213,9 @@ private function send($candidates, $sections, $intent) {
}

/**
* Perform a read operation
* Perform a read operation, which selecting a suitable node based on the
* `readPreference` serting.
*
* @see https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#read-preference
* @see https://docs.mongodb.com/manual/core/read-preference-mechanics/
* @param ?com.mongodb.Session $session
* @param [:var] $sections
* @return var
Expand All @@ -193,33 +225,13 @@ public function read($session, $sections) {
$session && $sections+= $session->send($this);
$rp= $this->readPreference['mode'];

if ('primary' === $rp) {
$candidates= [$this->nodes['primary']];
} else if ('secondary' === $rp) {
$candidates= $this->nodes['secondary'];
} else if ('primaryPreferred' === $rp) {
$candidates= array_merge([$this->nodes['primary']], $this->nodes['secondary']);
} else if ('secondaryPreferred' === $rp) {
$candidates= array_merge($this->nodes['secondary'], [$this->nodes['primary']]);
} else if ('nearest' === $rp) { // Prefer to stay on already open connections
$connected= null;
foreach ($this->conn as $id => $conn) {
if (null === $conn->server) continue;
$connected= $id;
break;
}
$candidates= array_unique(array_merge(
(array)$connected,
[$this->nodes['primary']],
$this->nodes['secondary']
));
}

return $this->send($candidates, $sections, 'reading with '.$rp);
return $this->establish($this->candidates($rp), 'reading with '.$rp)
->message($sections, $this->readPreference)
;
}

/**
* Perform a write operation
* Perform a write operation, which always uses the primary node.
*
* @param ?com.mongodb.Session $session
* @param [:var] $sections
Expand All @@ -229,7 +241,9 @@ public function read($session, $sections) {
public function write($session, $sections) {
$session && $sections+= $session->send($this);

return $this->send([$this->nodes['primary']], $sections, 'writing');
return $this->establish([$this->nodes['primary']], 'writing')
->message($sections, $this->readPreference)
;
}

/** @return void */
Expand Down
Loading

0 comments on commit 9b5540c

Please sign in to comment.