Skip to content

Commit

Permalink
Merge pull request #13 from uniondrug/2.x
Browse files Browse the repository at this point in the history
2.x
  • Loading branch information
xueron authored May 9, 2018
2 parents 7cf58f8 + b9834be commit 350a1f0
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 71 deletions.
2 changes: 1 addition & 1 deletion server
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ try {
new InputOption('help', 'h', InputOption::VALUE_OPTIONAL, 'Show help', null),
]));

$output->writeln(sprintf("%s <info>%s</info>", $logo, "2.0.0"));
$output->writeln(sprintf("%s <info>%s</info>", $logo, Application::VERSION));
$output->writeln('');
if ($input->hasParameterOption(['--help', '-h'])) {
$output->writeln("<comment>Usage:</comment>");
Expand Down
2 changes: 1 addition & 1 deletion src/Application.php
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public function wrapRequest(ServerRequestInterface $request)

// 设置Headers
foreach ($request->getHeaders() as $key => $value) {
$serverKey = 'HTTP_' . strtoupper($key);
$serverKey = 'HTTP_' . strtoupper(strtr($key, '-', '_'));
if (!isset($_SERVER[$serverKey])) {
$_SERVER[$serverKey] = $request->getHeaderLine($key); // getHeaderLine return a string.
}
Expand Down
27 changes: 22 additions & 5 deletions src/Servitization/Client/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
*/
class Client extends SwooleClient
{
/**
* reset headers and cookies
*
* @return $this
*/
public function reset()
{
$this->cookies = [];
$this->headers = [];

return $this;
}

/**
* @return string
*/
Expand All @@ -33,10 +46,14 @@ public function getProtocol()
*/
public function ping()
{
if ($this->client->isConnected() && !$this->async) {
$this->client->send('ping');
$res = $this->receive();
return $res === 'pong';
try {
if ($this->client->isConnected() && !$this->async) {
$this->client->send('ping');
$res = $this->receive();

return $res === 'pong';
}
} catch (\Throwable $e) {
}

return false;
Expand Down Expand Up @@ -89,7 +106,7 @@ protected function wrapResponse($response)
try {
$responseData = Json::decode($response, 1);
$response = $responseData['body'];
$headers = $responseData['headers'];
$headers = $responseData['headers'];
} catch (\Exception $e) {
// Do nothing
}
Expand Down
15 changes: 9 additions & 6 deletions src/Servitization/CreateRequestTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,23 @@ public function createRequest($requestData, $fd, $connection = [])
}

// _SERVER
$host = 'localhost';
$host = isset($connection['server_addr']) ? $connection['server_addr'] : gethostbyname('localhost');
if (!$host) {
$host = 'localhost';
}
$serverParams = [
'REQUEST_METHOD' => $request->method,
'REQUEST_URI' => $parts['path'],
'PATH_INFO' => $parts['path'],
'REQUEST_TIME' => time(),
'REQUEST_TIME_FLOAT' => microtime(1),
'GATEWAY_INTERFACE' => 'Swoole/' . SWOOLE_VERSION,
'GATEWAY_INTERFACE' => 'swoole/' . SWOOLE_VERSION,

// Server
'SERVER_PROTOCOL' => 'HTTP/1.1',
'REQUEST_SCHEMA' => 'http',
'SERVER_PROTOCOL' => 'TCP/1.1',
'REQUEST_SCHEME' => 'TCP',
'SERVER_NAME' => $host,
'SERVER_ADDR' => isset($connection['server_addr']) ? $connection['server_addr'] : gethostbyname('localhost'),
'SERVER_ADDR' => $host,
'SERVER_PORT' => isset($connection['server_port']) ? $connection['server_port'] : 0,
'REMOTE_ADDR' => $connection['remote_ip'],
'REMOTE_PORT' => $connection['remote_port'],
Expand Down Expand Up @@ -93,7 +96,7 @@ public function createRequest($requestData, $fd, $connection = [])
$headers = [];
foreach ($serverParams as $k => $v) {
if (0 === strpos($k, 'HTTP_')) {
$headers[str_replace('HTTP_', '', $k)] = $v;
$headers[strtr(str_replace('HTTP_', '', $k), '_', '-')] = $v;
}
}
if (isset($request->headers) && !empty($request->headers)) {
Expand Down
19 changes: 7 additions & 12 deletions src/Servitization/OnTaskTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ trait OnTaskTrait
*/
public function doTask(swoole_server $server, $data, $taskId, $workerId)
{
$TaskWorkerId = $server->worker_id;

app()->getLogger("framework")->debug("[TaskWorker $TaskWorkerId] [FromWorkerId: $workerId, TaskId: $taskId] With data: " . $data);
console()->debug("[Task] doTask: fromWorkerId=%d, taskId=%d, data=%s", $workerId, $taskId, $data);
try {
$task = Json::decode($data, true);
if ($task && isset($task['handler']) && is_a($task['handler'], TaskHandler::class, true)) {
return app()->getShared($task['handler'])->handle($task['data']);
} else {
app()->getLogger("framework")->error("[TaskWorker $TaskWorkerId] [FromWorkerId: $workerId, TaskId: $taskId] Data is not a valid Task object");
console()->error("[Task] doTask: fromWorkerId=%d, taskId=%d, Data is not a valid Task object", $workerId, $taskId);

return false;
}
} catch (\Exception $e) {
app()->getLogger("framework")->error("[TaskWorker $TaskWorkerId] [FromWorkerId: $workerId, TaskId: $taskId] Handle task failed. Error: " . $e->getMessage());
console()->error("[Task] doTask: fromWorkerId=%d, taskId=%d, error=%s",$workerId, $taskId, $e->getMessage());

return false;
}
Expand All @@ -41,9 +39,7 @@ public function doTask(swoole_server $server, $data, $taskId, $workerId)
*/
public function doFinish(swoole_server $server, $data, $taskId)
{
$workerId = $server->worker_id;

app()->getLogger("framework")->debug("[Worker $workerId] task $taskId finished, with data: " . serialize($data));
console()->debug("[Task] doFinish: taskId=%d, result=%s", $taskId, serialize($data));
}

/**
Expand All @@ -54,11 +50,10 @@ public function doFinish(swoole_server $server, $data, $taskId)
public function doPipeMessage(swoole_server $server, int $src_worker_id, $message)
{
$taskId = $server->task($message);
$workerId = swoole()->worker_id;
if (false === $taskId) {
app()->getLogger("framework")->error("[Worker $workerId] Dispatch task failed. message=$message");
if (false === true) {
console()->error("[Task] doPipeMessage: data=%s, dispatch task failed", $message);
} else {
app()->getLogger("framework")->debug("[Worker $workerId] task $taskId send.");
console()->debug("[Task] doPipeMessage: data=%s, dispatched, taskId=%d", $message, $taskId);
}
}
}
4 changes: 2 additions & 2 deletions src/Servitization/Server/TCPServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public function doWork(swoole_server $server, $fd, $data, $from_id)
$fd = $response->getFileDescriptor();
}
if (false === $server->connection_info($fd)) {
app()->getLogger('framework')->error("TCPServer Error: Client has gone away.");
console()->error("[TCPServer] Error: Client has gone away.");

return -1;
}
Expand All @@ -86,7 +86,7 @@ public function doWork(swoole_server $server, $fd, $data, $from_id)
if ($request !== null) {
app()->shutdown($request, null);
}
app()->getLogger('framework')->error("TCPServer Error: " . $e->getMessage());
console()->error("[TCPServer] Error: " . $e->getMessage());

// 2. Build error messages
$res = call_user_func(app()->getConfig()->path('exception.response'), $e);
Expand Down
2 changes: 1 addition & 1 deletion src/Servitization/Server/UDPServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function doPacket(swoole_server $server, $data, $clientInfo)

app()->shutdown($request, $response);
} catch (\Exception $e) {
app()->getLogger('framework')->error("TCPServer Error: " . $e->getMessage());
console()->error("TCPServer Error: " . $e->getMessage());

$res = call_user_func(app()->getConfig()->path('exception.response'), $e);
$server->sendto($clientInfo['address'], $clientInfo['port'], Json::encode($res));
Expand Down
2 changes: 1 addition & 1 deletion src/Servitization/Server/WebSocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function doMessage(swoole_server $server, swoole_websocket_frame $frame)

app()->shutdown($request, $response);
} catch (\Exception $e) {
app()->getLogger('framework')->error("TCPServer Error: " . $e->getMessage());
console()->error("TCPServer Error: " . $e->getMessage());

$res = call_user_func(app()->getConfig()->path('exception.response'), $e);
$server->push($frame->fd, Json::encode($res));
Expand Down
18 changes: 9 additions & 9 deletions src/Task/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,28 @@ class Dispatcher
* @param string $handler Handler class name
* @param mixed $data Raw data
*
* @return int
* @throws \Uniondrug\Packet\Exceptions\PacketException
*/
public function dispatch($handler, $data = [])
{
if (!isset(swoole()->worker_pid) || swoole()->taskworker) {
app()->getLogger("framework")->error("Dispatch task failed. Task must be dispatched from worker.");
console()->error("[Task] dispatch: Dispatch task failed. Task must be dispatched from worker.");
throw new \RuntimeException("Dispatch task failed. Task must be dispatched from worker.");
}

if (!is_a($handler, TaskHandler::class, true)) {
app()->getLogger("framework")->error("Dispatch task failed. Handler: $handler is not a TaskHandler.");
console()->error("[Task] dispatch: Dispatch task failed. Handler: $handler is not a TaskHandler.");
throw new \RuntimeException("Dispatch task failed. Handler: $handler is not a TaskHandler.");
}

$task = Json::encode([
'handler' => $handler,
'data' => $data,
]);

$taskId = swoole()->task($task);
$workerId = swoole()->worker_id;
if (false === $taskId) {
app()->getLogger("framework")->error("[Worker $workerId] Dispatch task failed. Handler: $handler.");
} else {
app()->getLogger("framework")->debug("[Worker $workerId] task $taskId send, handle: $handler.");
}
return $taskId;
}

/**
Expand All @@ -61,6 +60,7 @@ public function dispatchByProcess($handler, $data = [])
'data' => $data,
]);

return swoole()->sendMessage($message, 0);
$res = swoole()->sendMessage($message, 0);
return $res;
}
}
6 changes: 3 additions & 3 deletions src/Utils/Connections.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public static function testConnections()
try {
app()->getShared($serviceName)->query("select 1");
} catch (\Exception $e) {
app()->getLogger('database')->alert("[$pid] [$serviceName] connection lost ({$e->getMessage()})");
console()->warning("[Connection] service=%s, error=%s, connection lost", $serviceName, $e->getMessage());
if (preg_match("/(errno=32 Broken pipe)|(MySQL server has gone away)/i", $e->getMessage())) {
$tryTimes++;
app()->getShared($serviceName)->close();
app()->removeSharedInstance($serviceName);
app()->getLogger('database')->alert("[$pid] [$serviceName] try to reconnect[$tryTimes]");
console()->warning("[Connection] service=%s, try=%d, try to reconnect", $serviceName, $tryTimes);
continue;
} else {
app()->getLogger('database')->error("[$pid] [$serviceName] try to reconnect failed");
console()->error("[Connection] service=%s, try to reconnect failed", $serviceName);
process_kill($pid);
}
}
Expand Down
86 changes: 56 additions & 30 deletions src/Utils/Console.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

namespace Uniondrug\Server\Utils;

use Phalcon\Di;
use Symfony\Component\Console\Output\ConsoleOutput;

/**
Expand Down Expand Up @@ -34,65 +35,90 @@ public function getFormat($level)
}

/**
* @param $level
* @param $msg
* @param $msg
* @param string $level
* @param array $ext
*/
public function log($msg, $level = 'INFO')
public function log($msg, $level = 'INFO', ...$ext)
{
$level = strtoupper($level);
$format = $this->getFormat($level);
$time = date("Y-m-d H:i:s");
$wid = 0;
$processFlag = isset(swoole()->master_pid) ? '@' : '#';
$processFlag = '#';
$pid = getmypid();
if (isset(swoole()->worker_id) && swoole()->worker_id >= 0) {
$wid = swoole()->worker_id;
if (swoole()->taskworker) {
$processFlag = '^'; // taskworker
} else {
$processFlag = '*'; // worker
$msg = sprintf($msg, ...$ext);

// only works under swoole mode
if (Di::getDefault()->has('server')) {
$processFlag = isset(swoole()->master_pid) ? '@' : '#';
if (isset(swoole()->worker_id) && swoole()->worker_id >= 0) {
$wid = swoole()->worker_id;
if (swoole()->taskworker) {
$processFlag = '^'; // taskworker
} else {
$processFlag = '*'; // worker
}
}
if (isset(swoole()->manager_pid) && swoole()->manager_pid == $pid) {
$processFlag = '$'; // manager
}
if (isset(swoole()->master_pid) && swoole()->master_pid == $pid) {
$processFlag = '#'; // master
}

// Write to console
$messages = sprintf("[%s %s%d.%d]<%s>\t%s\t</%s>%s", $time, $processFlag, $pid, $wid, $format, $level, $format, $msg);
$this->writeln($messages);
}
if (isset(swoole()->manager_pid) && swoole()->manager_pid == $pid) {
$processFlag = '$'; // manager
}
if (isset(swoole()->master_pid) && swoole()->master_pid == $pid) {
$processFlag = '#'; // master
}

$messages = sprintf("[%s %s%d.%d]<%s>\t%s\t</%s>%s", $time, $processFlag, $pid, $wid, $format, $level, $format, $msg);
$this->writeln($messages);
// Log to file
try {
$logMethod = strtolower($level);

$logger = app()->getLogger("server");
if (method_exists($logger, $logMethod)) {
$logMessage = sprintf("[%s%d.%d] %s", $processFlag, $pid, $wid, $msg);
call_user_func_array([$logger, $logMethod], [$logMessage]);
}
} catch (\Exception $e) {
$this->writeln(sprintf("[%s %s%d.%d]<error>\tERROR\t</error>%s", $time, $processFlag, $pid, $wid, $e->getMessage()));
}
}

/**
* @param $msg
* @param $msg
* @param array $ext
*/
public function error($msg)
public function error($msg, ...$ext)
{
$this->log($msg, 'ERROR');
$this->log($msg, 'ERROR', ...$ext);
}

/**
* @param $msg
* @param $msg
* @param array $ext
*/
public function warning($msg)
public function warning($msg, ...$ext)
{
$this->log($msg, 'WARNING');
$this->log($msg, 'WARNING', ...$ext);
}

/**
* @param $msg
* @param $msg
* @param array $ext
*/
public function info($msg)
public function info($msg, ...$ext)
{
$this->log($msg, 'INFO');
$this->log($msg, 'INFO', ...$ext);
}

/**
* @param $msg
* @param $msg
* @param array $ext
*/
public function debug($msg)
public function debug($msg, ...$ext)
{
$this->log($msg, 'DEBUG');
$this->log($msg, 'DEBUG', ...$ext);
}
}

0 comments on commit 350a1f0

Please sign in to comment.