WIP: Coroutines

dev
gravel 2 years ago
parent 172e8605ac
commit 2669e975f1
Signed by: gravel
GPG Key ID: C0538F3C906B308F

@ -32,9 +32,16 @@
// set timeout for file_get_contents()
ini_set('default_socket_timeout', 6); // in seconds, default is 60
// curl timeout is millisecons
$curl_connecttimeout_ms = 3000; // time for initiation of the connection
$curl_timeout_ms = 6000; // max time for whole connection (incl. transfer)
// curl timeout in milliseconds
// max time for initiation of the connection
$CURL_CONNECT_TIMEOUT_MS = 2000;
// max time for each connection (incl. transfer)
$CURL_TIMEOUT_MS = $FAST_FETCH_MODE ? 1500 : 3000;
// delay between retries in miliseconds
$CURL_RETRY_SLEEP = 2000;
// do not report warnings (timeouts, SSL/TLS errors)
error_reporting(E_ALL & ~E_WARNING);

@ -0,0 +1,302 @@
<?php
require_once 'utils.php';
/**
* @template TReturn
*/
class FetchingCoroutine {
/**
* @var \Generator<int,CurlHandle,CurlHandle|false,TReturn> $generator
*/
private Generator $generator;
private bool $consumed = false;
/**
* @var \Closure():bool $is_valid_response
*/
private Closure $is_valid_response = function(CurlHandle $handle): bool {
return curl_getinfo($handle, CURLINFO_RESPONSE_CODE) < 300;
};
/**
* Creates a new Fetching Couroutine instance.
* @param \Generator<int,CurlHandle,CurlHandle|false,TReturn> $generator
* An instantiated generator yielding `string => CurlHandle` pairs.
*/
public function __construct(\Generator $generator) {
$this->generator = $generator;
}
/**
* Create a new FetchingCoroutine to fetch the contents of a URL.
* @param string $url URL to fetch.
*/
public static function from_url(string $url): \FetchingCoroutine {
/**
* @var Generator<int,CurlHandle,CurlHandle|false,CurlHandle|false> $oneshot
*/
$oneshot = (function() use ($url) {
yield make_curl_handle($url);
})();
return new FetchingCoroutine($oneshot);
}
/**
* Set callback deciding valid responses.
* @param callable $is_valid_response Predicate on a processed CurlHandle.
*/
public function set_response_filter(callable $is_valid_response): void {
$this->is_valid_response = Closure::fromCallable($is_valid_response);
}
private function assert_not_consumed() {
if ($this->consumed) {
throw new Error("This FetchingCoroutine has been used up by a transforming call");
}
}
private function consume() {
$this->consumed = true;
$this->generator = null;
}
/**
* Modifies the current coroutine to halt on failed fetches. Consumes current coroutine.
* Resulting coroutine will not produce further fetches.
* @return \FetchingCoroutine New FetchingCoroutine instance.
*/
public function stop_on_failure(): \FetchingCoroutine {
$this->assert_not_consumed();
$haltable = function () {
foreach ($this->generator as $id => $handle) {
if (!(yield $id => $handle)) {
return;
}
}
return $this->generator->getReturn();
};
$this->consume();
return $this->project_response_parameters(new FetchingCoroutine($haltable()));
}
/**
* Modifies the current coroutine to retry fetches. Consumes current coroutine.
* @param int $retries Number of additional retries made for curl handles returned.
* @param bool $tallied_retries If true, the retry count applies to the whole coroutine.
* If false, each request is afforded the given retries.
* @return \FetchingCoroutine New FetchingCoroutine instance.
*/
public function retryable(int $retries, bool $tallied_retries = true): \FetchingCoroutine {
$this->assert_not_consumed();
$coroutine = $this;
$retryable = function () use ($retries, $coroutine, $tallied_retries) {
processing_new_coroutine:
while ($coroutine->valid()) {
$retries_current = $retries;
$id = $coroutine->current_key();
$handle = $coroutine->current_request();
do {
if (!($attempt_handle = curl_copy_handle($handle))) {
log_error("Failed to clone cURL handle");
$coroutine->send(false);
goto processing_new_coroutine;
}
if ($coroutine->send(yield $id => $attempt_handle)) {
goto processing_new_coroutine;
}
} while ($retries_current-- > 0);
// failed to fetch handle
$coroutine->send(false);
// decrease the remaining retries
if ($tallied_retries) {
$retries = $retries_current;
}
}
return $coroutine->return_value();
};
$this->consume();
return $this->project_response_parameters(new FetchingCoroutine($retryable()));
}
/**
* Modifies the current coroutine to attempt HTTPS->HTTP downgrade after failure.
* Consumes current coroutine.
* @param bool $did_downgrade Set to true if a downgrade to HTTP has taken place.
* @return \FetchingCoroutine New FetchingCoroutine instance.
*/
public function downgradeable(mixed &$did_downgrade = NULL): \FetchingCoroutine {
$this->assert_not_consumed();
$coroutine = $this;
$has_downgrade_ref = func_num_args() >= 1;
if ($has_downgrade_ref) $did_downgrade = false;
$downgradeable = function () use ($coroutine, &$did_downgrade, $has_downgrade_ref) {
while ($coroutine->valid()) {
$id = $coroutine->current_key();
$handle = $coroutine->current_request();
$handle_downgraded = curl_handle_downgrade($handle);
// Try HTTPS first
if ($handle_downgraded) {
// Skip to next handle on success
if ($coroutine->send(yield $id => $handle)) {
continue;
}
if ($has_downgrade_ref) $did_downgrade = false;
}
// Use HTTP
$coroutine->send(yield $id => $handle);
}
return $coroutine->return_value();
};
$this->consume();
return $this->project_response_parameters(new FetchingCoroutine($downgradeable()));
}
/**
* Assign non-generator parameters to given FetchingCoroutine.
*/
private function project_response_parameters(\FetchingCoroutine $coroutine): \FetchingCoroutine {
$coroutine->set_response_filter($this->is_valid_response);
return $coroutine;
}
private function is_valid_response(CurlHandle $handle) {
$is_valid_response = $this->is_valid_response;
return $is_valid_response($handle);
}
/**
* Get the key of the handle yielded at this point in the coroutine, if applicable.
*/
public function current_key() {
return $this->generator->key();
}
/**
* Get the cURL handle yielded at this point in the coroutine, if applicable.
*/
public function current_request(): CurlHandle {
return $this->generator->current();
}
private function valid(): bool {
return $this->generator->valid();
}
/**
* Get the return value of the wrapped generator object once finished.
* @return TReturn
*/
public function return_value(): mixed {
return $this->generator->getReturn();
}
/**
* Step coroutine until next yield point or end.
* Coroutine must not be consumed by any transformations.
* @param CurlHandle|false $response
* Processed handle corresponding to yielded handle or false in case of failure.
*/
public function advance(CurlHandle|false $response_handle): bool {
$this->assert_not_consumed();
return $this->send($response_handle);
}
private function send(CurlHandle|false $handle): bool {
if ($handle && $this->is_valid_response($handle)) {
$this->generator->send($handle);
return true;
} else {
$this->generator->send(false);
return false;
}
}
}
class FetchingCoroutineRunner {
/**
* Collection of enroled transfers.
*/
private CurlMultiHandle $transfers = curl_multi_init();
/**
* Coroutines executed by runner.
* @var \FetchingCoroutine[] $coroutines
*/
private array $coroutines;
/**
* Create new FetchingCoroutineRunner instance with the given coroutines.
* @param \FetchingCoroutine[] $coroutines Coroutines to run in parallel.
*/
public function __construct(array $coroutines = []) {
$this->coroutines = $coroutines;
$this->initialize_coroutines();
}
/**
* Launches all coroutines in parallel.
* @return int CURLM_* status.
*/
public function fetch_all(): int {
do {
$curlm_status = curl_multi_exec($this->transfers, $curlm_active_transfer);
if ($curlm_active_transfer) {
// Block 1 second for pending transfers
curl_multi_select($this->transfers, timeout: 1.0);
// curl_multi_select($transfers, timeout: 6.0);
}
$this->process_curl_activity();
} while ($curlm_status == CURLM_OK);
return $curlm_status;
}
/**
* Enrol initial transfers from all coroutines.
*/
private function initialize_coroutines() {
foreach ($this->coroutines as $id => $coroutine) {
$this->poll_coroutine_for_transfer($id);
}
}
/**
* Enrol latest transfer from coroutine with given id.
*/
private function poll_coroutine_for_transfer(int $id) {
$coroutine = $this->coroutines[$id];
$handle = $coroutine->current_request();
if (!$handle) return;
curl_setopt($handle, CURLOPT_PRIVATE, $id);
curl_multi_add_handle($this->transfers, $handle);
}
/**
* Respond to new activity on enroled transfers.
*/
private function process_curl_activity() {
while (false !== ($info = curl_multi_info_read($this->transfers))) {
if ($info['msg'] != CURLMSG_DONE) continue;
/**
* @var \CurlHandle $handle
*/
$handle = $info['handle'];
curl_multi_remove_handle($this->transfers, $handle);
$coroutine_id = curl_getinfo($handle, CURLINFO_PRIVATE);
if (!isset($this->coroutines[$coroutine_id])) {
throw new Error("Invalid coroutine ID: " + $coroutine_id);
}
$this->coroutines[$coroutine_id]->advance($handle);
$this->poll_coroutine_for_transfer($coroutine_id);
}
}
}
?>

@ -3,6 +3,7 @@
include_once "$PROJECT_ROOT/php/servers/known-servers.php";
include_once 'tags.php';
include_once 'fetching-coroutines.php';
$MINUTE_SECONDS = 60;
$HOUR_SECONDS = 60 * $MINUTE_SECONDS;
@ -275,7 +276,7 @@
public function has_nsfw_keywords(): bool {
// Description not included due to false positives.
$blob =
$blob =
strtolower($this->name) . " " .
strtolower(join(" ", $this->tags));
@ -284,7 +285,7 @@
return true;
}
}
return false;
}
@ -332,9 +333,9 @@
}
if ($this->has_nsfw_keywords()) {
$derived_tags[] =
$derived_tags[] =
new CommunityTag(
"nsfw",
"nsfw",
TagType::WARNING_TAG,
"This Community may contain adult material. $WARNING"
);
@ -759,6 +760,14 @@
return $this->base_url;
}
/**
* Returns the URL to the endpoint listing this server's rooms.
*/
function get_rooms_api_url(): string {
$base_url = $this->base_url;
return "$base_url/rooms?all=1";
}
/**
* Returns the server's public key.
* @return string SOGS pubkey as used in the Session protocol.
@ -822,7 +831,7 @@
$candidates = array_filter($this->rooms, function(\CommunityRoom $room) use ($token) {
return $room->token == $token;
});
/** Filter doesn't reindex */
foreach ($candidates as $candidate) {
return $candidate;
@ -840,7 +849,7 @@
global $FAST_FETCH_MODE;
$base_url = $this->base_url;
list($rooms, $downgrade) = curl_get_contents_downgrade("$base_url/rooms?all=1", retries: $FAST_FETCH_MODE ? 2 : 4);
list($rooms, $downgrade) = curl_get_contents_downgrade($this->get_rooms_api_url(), retries: $FAST_FETCH_MODE ? 2 : 4);
if (!$rooms) {
log_info("Failed fetching /rooms.");
return false;
@ -855,6 +864,35 @@
return $room_data;
}
/**
* @return \Generator<int,CurlHandle,CurlHandle|false,array|bool>
*/
private function fetch_room_list_coroutine(): Generator {
global $FAST_FETCH_MODE;
$rooms_api_coroutine =
FetchingCoroutine::from_url($this->get_rooms_api_url())
->downgradeable($does_downgrade)
->retryable($FAST_FETCH_MODE ? 2 : 4);
/** @var CurlHandle|false $rooms_api_handle */
// assemble & propagate request to runner
$rooms_api_handle = yield $rooms_api_coroutine->current_request();
$rooms_raw = $rooms_api_handle && curl_multi_getcontent($rooms_api_handle);
if (!$rooms_raw) {
log_info("Failed fetching /rooms.");
return false;
}
if ($does_downgrade) $this->downgrade_scheme();
$room_data = json_decode($rooms_raw, true);
if ($room_data == null) {
log_info("Failed parsing /rooms.");
return false;
}
log_debug("Fetched /rooms successfully");
return $room_data;
}
/**
* Attempts to fetch the current server's rooms using observed room names.
* Downgrades the server's scheme to HTTP if necessary.
@ -927,6 +965,37 @@
return true;
}
function fetch_rooms_coroutine() {
$this->log_details();
$base_url = $this->base_url;
/*
// Check reachability before polling too much.
if (count($this->room_hints) >= 2) {
log_info("Checking reachability for $base_url first...");
if (!url_is_reachable($base_url, retries: $FAST_FETCH_MODE ? 1 : 4)) {
log_warning("Reachability test failed by $base_url.");
return false;
}
}
*/
log_info("Fetching rooms for $base_url.");
yield from ($room_list_coroutine = $this->fetch_room_list_coroutine());
$room_data = $room_list_coroutine->getReturn();
if ($room_data === null) {
yield from ($room_hints_coroutine = $this->fetch_room_hints_coroutine());
$room_data = $room_hints_coroutine->getReturn();
}
if ($room_data === null) {
log_warning("Could not fetch rooms for $base_url.");
return false;
}
$this->rooms = CommunityRoom::from_details_array($this, $room_data);
return true;
}
/**
* Attempt to fetch server public key by parsing SOGS HTML preview.
*

@ -122,30 +122,14 @@
* to an unreachable host.
*/
function curl_get_response(string $url, int $retries, $stop_on_codes = [404], $curlopts = []) {
global $FAST_FETCH_MODE;
global $CURL_RETRY_SLEEP;
// use separate timeouts to reliably get data from Chinese server with repeated tries
$connecttimeout = 2; // wait at most X seconds to connect
$timeout = $FAST_FETCH_MODE ? 1.5 : 3; // can't take longer than X seconds for the whole curl process
$sleep = 2; // sleep between tries in seconds
// takes at most ($timeout + $sleep) * $retries seconds
$contents = false;
$retcode = -1;
for ($counter = 1; $counter <= $retries; $counter++) {
$curl = curl_init($url);
// curl_setopt($curl, CURLOPT_VERBOSE, true);
curl_setopt($curl, CURLOPT_AUTOREFERER, true);
curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
curl_setopt($curl, CURLOPT_CONNECTTIMEOUT, $connecttimeout);
curl_setopt($curl, CURLOPT_TIMEOUT, $timeout);
foreach ($curlopts as $opt => $val) curl_setopt($curl, $opt, $val);
$curl = make_curl_handle($url, $curlopts);
$contents = curl_exec($curl);
$retcode = curl_getinfo($curl, CURLINFO_HTTP_CODE);
@ -153,12 +137,47 @@
log_debug("Attempt #" . $counter . " for " . $url . " returned code " . $retcode . ".");
if ($contents != null || $retcode == 200 || in_array($retcode, $stop_on_codes)) break;
sleep($sleep);
sleep($CURL_RETRY_SLEEP / 1E3);
}
return [$retcode, $retcode == 200 ? $contents : false];
}
function make_curl_handle(string $url, $curlopts = []) {
global $CURL_CONNECT_TIMEOUT_MS, $CURL_TIMEOUT_MS;
$curl = curl_init($url);
// curl_setopt($curl, CURLOPT_VERBOSE, true);
curl_setopt($curl, CURLOPT_AUTOREFERER, true);
curl_setopt($curl, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
curl_setopt($curl, CURLOPT_CONNECTTIMEOUT, $CURL_CONNECT_TIMEOUT_MS / 1E3);
curl_setopt($curl, CURLOPT_TIMEOUT, $CURL_TIMEOUT_MS / 1E3);
curl_setopt_array($curl, $curlopts);
foreach ($curlopts as $opt => $val) curl_setopt($curl, $opt, $val);
return $curl;
}
/**
* Downgrades a HTTPS-facing cURL handle to HTTP.
* @return CurlHandle|null Handle copy if can downgrade, or null if not applicable.
*/
function curl_handle_downgrade(CurlHandle $handle): CurlHandle|null {
$url = curl_getinfo($handle, CURLINFO_EFFECTIVE_URL);
$scheme = parse_url($url, PHP_URL_SCHEME);
if ($scheme != 'https') return null;
$handle_copy = curl_copy_handle($handle);
$url = 'http' . substr($url, strlen('https'));
curl_setopt($handle_copy, CURLOPT_URL, $url);
return $handle_copy;
}
/**
* Returns the base path of a URL.
* @param string $url The URL to slice the path from.

Loading…
Cancel
Save