831 lines
24 KiB
TypeScript
831 lines
24 KiB
TypeScript
/*
|
|
* Copyright 2019 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
import { ChannelCredentials } from './channel-credentials';
|
|
import { ChannelOptions } from './channel-options';
|
|
import { ResolvingLoadBalancer } from './resolving-load-balancer';
|
|
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
|
|
import { ChannelControlHelper } from './load-balancer';
|
|
import { UnavailablePicker, Picker, QueuePicker } from './picker';
|
|
import { Metadata } from './metadata';
|
|
import { Status, LogVerbosity, Propagate } from './constants';
|
|
import { FilterStackFactory } from './filter-stack';
|
|
import { CompressionFilterFactory } from './compression-filter';
|
|
import {
|
|
CallConfig,
|
|
ConfigSelector,
|
|
getDefaultAuthority,
|
|
mapUriDefaultScheme,
|
|
} from './resolver';
|
|
import { trace } from './logging';
|
|
import { SubchannelAddress } from './subchannel-address';
|
|
import { mapProxyName } from './http_proxy';
|
|
import { GrpcUri, parseUri, uriToString } from './uri-parser';
|
|
import { ServerSurfaceCall } from './server-call';
|
|
|
|
import { ConnectivityState } from './connectivity-state';
|
|
import {
|
|
ChannelInfo,
|
|
ChannelRef,
|
|
ChannelzCallTracker,
|
|
ChannelzChildrenTracker,
|
|
ChannelzTrace,
|
|
registerChannelzChannel,
|
|
SubchannelRef,
|
|
unregisterChannelzRef,
|
|
} from './channelz';
|
|
import { LoadBalancingCall } from './load-balancing-call';
|
|
import { CallCredentials } from './call-credentials';
|
|
import { Call, CallStreamOptions, StatusObject } from './call-interface';
|
|
import { Deadline, deadlineToString } from './deadline';
|
|
import { ResolvingCall } from './resolving-call';
|
|
import { getNextCallNumber } from './call-number';
|
|
import { restrictControlPlaneStatusCode } from './control-plane-status';
|
|
import {
|
|
MessageBufferTracker,
|
|
RetryingCall,
|
|
RetryThrottler,
|
|
} from './retrying-call';
|
|
import {
|
|
BaseSubchannelWrapper,
|
|
ConnectivityStateListener,
|
|
SubchannelInterface,
|
|
} from './subchannel-interface';
|
|
|
|
/**
|
|
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
|
|
*/
|
|
const MAX_TIMEOUT_TIME = 2147483647;
|
|
|
|
const MIN_IDLE_TIMEOUT_MS = 1000;
|
|
|
|
// 30 minutes
|
|
const DEFAULT_IDLE_TIMEOUT_MS = 30 * 60 * 1000;
|
|
|
|
interface ConnectivityStateWatcher {
|
|
currentState: ConnectivityState;
|
|
timer: NodeJS.Timeout | null;
|
|
callback: (error?: Error) => void;
|
|
}
|
|
|
|
interface NoneConfigResult {
|
|
type: 'NONE';
|
|
}
|
|
|
|
interface SuccessConfigResult {
|
|
type: 'SUCCESS';
|
|
config: CallConfig;
|
|
}
|
|
|
|
interface ErrorConfigResult {
|
|
type: 'ERROR';
|
|
error: StatusObject;
|
|
}
|
|
|
|
type GetConfigResult =
|
|
| NoneConfigResult
|
|
| SuccessConfigResult
|
|
| ErrorConfigResult;
|
|
|
|
const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
|
|
|
|
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1 << 24; // 16 MB
|
|
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1 << 20; // 1 MB
|
|
|
|
class ChannelSubchannelWrapper
|
|
extends BaseSubchannelWrapper
|
|
implements SubchannelInterface
|
|
{
|
|
private refCount = 0;
|
|
private subchannelStateListener: ConnectivityStateListener;
|
|
constructor(
|
|
childSubchannel: SubchannelInterface,
|
|
private channel: InternalChannel
|
|
) {
|
|
super(childSubchannel);
|
|
this.subchannelStateListener = (
|
|
subchannel,
|
|
previousState,
|
|
newState,
|
|
keepaliveTime
|
|
) => {
|
|
channel.throttleKeepalive(keepaliveTime);
|
|
};
|
|
childSubchannel.addConnectivityStateListener(this.subchannelStateListener);
|
|
}
|
|
|
|
ref(): void {
|
|
this.child.ref();
|
|
this.refCount += 1;
|
|
}
|
|
|
|
unref(): void {
|
|
this.child.unref();
|
|
this.refCount -= 1;
|
|
if (this.refCount <= 0) {
|
|
this.child.removeConnectivityStateListener(this.subchannelStateListener);
|
|
this.channel.removeWrappedSubchannel(this);
|
|
}
|
|
}
|
|
}
|
|
|
|
export class InternalChannel {
|
|
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
|
|
private readonly subchannelPool: SubchannelPool;
|
|
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
|
|
private currentPicker: Picker = new UnavailablePicker();
|
|
/**
|
|
* Calls queued up to get a call config. Should only be populated before the
|
|
* first time the resolver returns a result, which includes the ConfigSelector.
|
|
*/
|
|
private configSelectionQueue: ResolvingCall[] = [];
|
|
private pickQueue: LoadBalancingCall[] = [];
|
|
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
|
|
private readonly defaultAuthority: string;
|
|
private readonly filterStackFactory: FilterStackFactory;
|
|
private readonly target: GrpcUri;
|
|
/**
|
|
* This timer does not do anything on its own. Its purpose is to hold the
|
|
* event loop open while there are any pending calls for the channel that
|
|
* have not yet been assigned to specific subchannels. In other words,
|
|
* the invariant is that callRefTimer is reffed if and only if pickQueue
|
|
* is non-empty.
|
|
*/
|
|
private readonly callRefTimer: NodeJS.Timeout;
|
|
private configSelector: ConfigSelector | null = null;
|
|
/**
|
|
* This is the error from the name resolver if it failed most recently. It
|
|
* is only used to end calls that start while there is no config selector
|
|
* and the name resolver is in backoff, so it should be nulled if
|
|
* configSelector becomes set or the channel state becomes anything other
|
|
* than TRANSIENT_FAILURE.
|
|
*/
|
|
private currentResolutionError: StatusObject | null = null;
|
|
private readonly retryBufferTracker: MessageBufferTracker;
|
|
private keepaliveTime: number;
|
|
private readonly wrappedSubchannels: Set<ChannelSubchannelWrapper> =
|
|
new Set();
|
|
|
|
private callCount = 0;
|
|
private idleTimer: NodeJS.Timeout | null = null;
|
|
private readonly idleTimeoutMs: number;
|
|
private lastActivityTimestamp: Date;
|
|
|
|
// Channelz info
|
|
private readonly channelzEnabled: boolean = true;
|
|
private readonly originalTarget: string;
|
|
private readonly channelzRef: ChannelRef;
|
|
private readonly channelzTrace: ChannelzTrace;
|
|
private readonly callTracker = new ChannelzCallTracker();
|
|
private readonly childrenTracker = new ChannelzChildrenTracker();
|
|
|
|
constructor(
|
|
target: string,
|
|
private readonly credentials: ChannelCredentials,
|
|
private readonly options: ChannelOptions
|
|
) {
|
|
if (typeof target !== 'string') {
|
|
throw new TypeError('Channel target must be a string');
|
|
}
|
|
if (!(credentials instanceof ChannelCredentials)) {
|
|
throw new TypeError(
|
|
'Channel credentials must be a ChannelCredentials object'
|
|
);
|
|
}
|
|
if (options) {
|
|
if (typeof options !== 'object') {
|
|
throw new TypeError('Channel options must be an object');
|
|
}
|
|
}
|
|
this.originalTarget = target;
|
|
const originalTargetUri = parseUri(target);
|
|
if (originalTargetUri === null) {
|
|
throw new Error(`Could not parse target name "${target}"`);
|
|
}
|
|
/* This ensures that the target has a scheme that is registered with the
|
|
* resolver */
|
|
const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
|
|
if (defaultSchemeMapResult === null) {
|
|
throw new Error(
|
|
`Could not find a default scheme for target name "${target}"`
|
|
);
|
|
}
|
|
|
|
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
|
|
this.callRefTimer.unref?.();
|
|
|
|
if (this.options['grpc.enable_channelz'] === 0) {
|
|
this.channelzEnabled = false;
|
|
}
|
|
|
|
this.channelzTrace = new ChannelzTrace();
|
|
this.channelzRef = registerChannelzChannel(
|
|
target,
|
|
() => this.getChannelzInfo(),
|
|
this.channelzEnabled
|
|
);
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
|
|
}
|
|
|
|
if (this.options['grpc.default_authority']) {
|
|
this.defaultAuthority = this.options['grpc.default_authority'] as string;
|
|
} else {
|
|
this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
|
|
}
|
|
const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
|
|
this.target = proxyMapResult.target;
|
|
this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
|
|
|
|
/* The global boolean parameter to getSubchannelPool has the inverse meaning to what
|
|
* the grpc.use_local_subchannel_pool channel option means. */
|
|
this.subchannelPool = getSubchannelPool(
|
|
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
|
|
);
|
|
this.retryBufferTracker = new MessageBufferTracker(
|
|
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
|
|
options['grpc.per_rpc_retry_buffer_size'] ??
|
|
DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
|
|
);
|
|
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
|
|
this.idleTimeoutMs = Math.max(
|
|
options['grpc.client_idle_timeout_ms'] ?? DEFAULT_IDLE_TIMEOUT_MS,
|
|
MIN_IDLE_TIMEOUT_MS
|
|
);
|
|
const channelControlHelper: ChannelControlHelper = {
|
|
createSubchannel: (
|
|
subchannelAddress: SubchannelAddress,
|
|
subchannelArgs: ChannelOptions
|
|
) => {
|
|
const subchannel = this.subchannelPool.getOrCreateSubchannel(
|
|
this.target,
|
|
subchannelAddress,
|
|
Object.assign({}, this.options, subchannelArgs),
|
|
this.credentials
|
|
);
|
|
subchannel.throttleKeepalive(this.keepaliveTime);
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace(
|
|
'CT_INFO',
|
|
'Created subchannel or used existing subchannel',
|
|
subchannel.getChannelzRef()
|
|
);
|
|
}
|
|
const wrappedSubchannel = new ChannelSubchannelWrapper(
|
|
subchannel,
|
|
this
|
|
);
|
|
this.wrappedSubchannels.add(wrappedSubchannel);
|
|
return wrappedSubchannel;
|
|
},
|
|
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
|
this.currentPicker = picker;
|
|
const queueCopy = this.pickQueue.slice();
|
|
this.pickQueue = [];
|
|
if (queueCopy.length > 0) {
|
|
this.callRefTimerUnref();
|
|
}
|
|
for (const call of queueCopy) {
|
|
call.doPick();
|
|
}
|
|
this.updateState(connectivityState);
|
|
},
|
|
requestReresolution: () => {
|
|
// This should never be called.
|
|
throw new Error(
|
|
'Resolving load balancer should never call requestReresolution'
|
|
);
|
|
},
|
|
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
|
if (this.channelzEnabled) {
|
|
this.childrenTracker.refChild(child);
|
|
}
|
|
},
|
|
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
|
|
if (this.channelzEnabled) {
|
|
this.childrenTracker.unrefChild(child);
|
|
}
|
|
},
|
|
};
|
|
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
|
|
this.target,
|
|
channelControlHelper,
|
|
options,
|
|
(serviceConfig, configSelector) => {
|
|
if (serviceConfig.retryThrottling) {
|
|
RETRY_THROTTLER_MAP.set(
|
|
this.getTarget(),
|
|
new RetryThrottler(
|
|
serviceConfig.retryThrottling.maxTokens,
|
|
serviceConfig.retryThrottling.tokenRatio,
|
|
RETRY_THROTTLER_MAP.get(this.getTarget())
|
|
)
|
|
);
|
|
} else {
|
|
RETRY_THROTTLER_MAP.delete(this.getTarget());
|
|
}
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace(
|
|
'CT_INFO',
|
|
'Address resolution succeeded'
|
|
);
|
|
}
|
|
this.configSelector = configSelector;
|
|
this.currentResolutionError = null;
|
|
/* We process the queue asynchronously to ensure that the corresponding
|
|
* load balancer update has completed. */
|
|
process.nextTick(() => {
|
|
const localQueue = this.configSelectionQueue;
|
|
this.configSelectionQueue = [];
|
|
if (localQueue.length > 0) {
|
|
this.callRefTimerUnref();
|
|
}
|
|
for (const call of localQueue) {
|
|
call.getConfig();
|
|
}
|
|
});
|
|
},
|
|
status => {
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace(
|
|
'CT_WARNING',
|
|
'Address resolution failed with code ' +
|
|
status.code +
|
|
' and details "' +
|
|
status.details +
|
|
'"'
|
|
);
|
|
}
|
|
if (this.configSelectionQueue.length > 0) {
|
|
this.trace(
|
|
'Name resolution failed with calls queued for config selection'
|
|
);
|
|
}
|
|
if (this.configSelector === null) {
|
|
this.currentResolutionError = {
|
|
...restrictControlPlaneStatusCode(status.code, status.details),
|
|
metadata: status.metadata,
|
|
};
|
|
}
|
|
const localQueue = this.configSelectionQueue;
|
|
this.configSelectionQueue = [];
|
|
if (localQueue.length > 0) {
|
|
this.callRefTimerUnref();
|
|
}
|
|
for (const call of localQueue) {
|
|
call.reportResolverError(status);
|
|
}
|
|
}
|
|
);
|
|
this.filterStackFactory = new FilterStackFactory([
|
|
new CompressionFilterFactory(this, this.options),
|
|
]);
|
|
this.trace(
|
|
'Channel constructed with options ' +
|
|
JSON.stringify(options, undefined, 2)
|
|
);
|
|
const error = new Error();
|
|
trace(
|
|
LogVerbosity.DEBUG,
|
|
'channel_stacktrace',
|
|
'(' +
|
|
this.channelzRef.id +
|
|
') ' +
|
|
'Channel constructed \n' +
|
|
error.stack?.substring(error.stack.indexOf('\n') + 1)
|
|
);
|
|
this.lastActivityTimestamp = new Date();
|
|
}
|
|
|
|
private getChannelzInfo(): ChannelInfo {
|
|
return {
|
|
target: this.originalTarget,
|
|
state: this.connectivityState,
|
|
trace: this.channelzTrace,
|
|
callTracker: this.callTracker,
|
|
children: this.childrenTracker.getChildLists(),
|
|
};
|
|
}
|
|
|
|
private trace(text: string, verbosityOverride?: LogVerbosity) {
|
|
trace(
|
|
verbosityOverride ?? LogVerbosity.DEBUG,
|
|
'channel',
|
|
'(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text
|
|
);
|
|
}
|
|
|
|
private callRefTimerRef() {
|
|
// If the hasRef function does not exist, always run the code
|
|
if (!this.callRefTimer.hasRef?.()) {
|
|
this.trace(
|
|
'callRefTimer.ref | configSelectionQueue.length=' +
|
|
this.configSelectionQueue.length +
|
|
' pickQueue.length=' +
|
|
this.pickQueue.length
|
|
);
|
|
this.callRefTimer.ref?.();
|
|
}
|
|
}
|
|
|
|
private callRefTimerUnref() {
|
|
// If the hasRef function does not exist, always run the code
|
|
if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
|
|
this.trace(
|
|
'callRefTimer.unref | configSelectionQueue.length=' +
|
|
this.configSelectionQueue.length +
|
|
' pickQueue.length=' +
|
|
this.pickQueue.length
|
|
);
|
|
this.callRefTimer.unref?.();
|
|
}
|
|
}
|
|
|
|
private removeConnectivityStateWatcher(
|
|
watcherObject: ConnectivityStateWatcher
|
|
) {
|
|
const watcherIndex = this.connectivityStateWatchers.findIndex(
|
|
value => value === watcherObject
|
|
);
|
|
if (watcherIndex >= 0) {
|
|
this.connectivityStateWatchers.splice(watcherIndex, 1);
|
|
}
|
|
}
|
|
|
|
private updateState(newState: ConnectivityState): void {
|
|
trace(
|
|
LogVerbosity.DEBUG,
|
|
'connectivity_state',
|
|
'(' +
|
|
this.channelzRef.id +
|
|
') ' +
|
|
uriToString(this.target) +
|
|
' ' +
|
|
ConnectivityState[this.connectivityState] +
|
|
' -> ' +
|
|
ConnectivityState[newState]
|
|
);
|
|
if (this.channelzEnabled) {
|
|
this.channelzTrace.addTrace(
|
|
'CT_INFO',
|
|
'Connectivity state change to ' + ConnectivityState[newState]
|
|
);
|
|
}
|
|
this.connectivityState = newState;
|
|
const watchersCopy = this.connectivityStateWatchers.slice();
|
|
for (const watcherObject of watchersCopy) {
|
|
if (newState !== watcherObject.currentState) {
|
|
if (watcherObject.timer) {
|
|
clearTimeout(watcherObject.timer);
|
|
}
|
|
this.removeConnectivityStateWatcher(watcherObject);
|
|
watcherObject.callback();
|
|
}
|
|
}
|
|
if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
|
|
this.currentResolutionError = null;
|
|
}
|
|
}
|
|
|
|
throttleKeepalive(newKeepaliveTime: number) {
|
|
if (newKeepaliveTime > this.keepaliveTime) {
|
|
this.keepaliveTime = newKeepaliveTime;
|
|
for (const wrappedSubchannel of this.wrappedSubchannels) {
|
|
wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
|
|
}
|
|
}
|
|
}
|
|
|
|
removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
|
|
this.wrappedSubchannels.delete(wrappedSubchannel);
|
|
}
|
|
|
|
doPick(metadata: Metadata, extraPickInfo: { [key: string]: string }) {
|
|
return this.currentPicker.pick({
|
|
metadata: metadata,
|
|
extraPickInfo: extraPickInfo,
|
|
});
|
|
}
|
|
|
|
queueCallForPick(call: LoadBalancingCall) {
|
|
this.pickQueue.push(call);
|
|
this.callRefTimerRef();
|
|
}
|
|
|
|
getConfig(method: string, metadata: Metadata): GetConfigResult {
|
|
this.resolvingLoadBalancer.exitIdle();
|
|
if (this.configSelector) {
|
|
return {
|
|
type: 'SUCCESS',
|
|
config: this.configSelector(method, metadata),
|
|
};
|
|
} else {
|
|
if (this.currentResolutionError) {
|
|
return {
|
|
type: 'ERROR',
|
|
error: this.currentResolutionError,
|
|
};
|
|
} else {
|
|
return {
|
|
type: 'NONE',
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
queueCallForConfig(call: ResolvingCall) {
|
|
this.configSelectionQueue.push(call);
|
|
this.callRefTimerRef();
|
|
}
|
|
|
|
private enterIdle() {
|
|
this.resolvingLoadBalancer.destroy();
|
|
this.updateState(ConnectivityState.IDLE);
|
|
this.currentPicker = new QueuePicker(this.resolvingLoadBalancer);
|
|
if (this.idleTimer) {
|
|
clearTimeout(this.idleTimer);
|
|
this.idleTimer = null;
|
|
}
|
|
}
|
|
|
|
private startIdleTimeout(timeoutMs: number) {
|
|
this.idleTimer = setTimeout(() => {
|
|
if (this.callCount > 0) {
|
|
/* If there is currently a call, the channel will not go idle for a
|
|
* period of at least idleTimeoutMs, so check again after that time.
|
|
*/
|
|
this.startIdleTimeout(this.idleTimeoutMs);
|
|
return;
|
|
}
|
|
const now = new Date();
|
|
const timeSinceLastActivity = now.valueOf() - this.lastActivityTimestamp.valueOf();
|
|
if (timeSinceLastActivity >= this.idleTimeoutMs) {
|
|
this.trace(
|
|
'Idle timer triggered after ' +
|
|
this.idleTimeoutMs +
|
|
'ms of inactivity'
|
|
);
|
|
this.enterIdle();
|
|
} else {
|
|
/* Whenever the timer fires with the latest activity being too recent,
|
|
* set the timer again for the time when the time since the last
|
|
* activity is equal to the timeout. This should result in the timer
|
|
* firing no more than once every idleTimeoutMs/2 on average. */
|
|
this.startIdleTimeout(this.idleTimeoutMs - timeSinceLastActivity);
|
|
}
|
|
}, timeoutMs);
|
|
this.idleTimer.unref?.();
|
|
}
|
|
|
|
private maybeStartIdleTimer() {
|
|
if (this.connectivityState !== ConnectivityState.SHUTDOWN && !this.idleTimer) {
|
|
this.startIdleTimeout(this.idleTimeoutMs);
|
|
}
|
|
}
|
|
|
|
private onCallStart() {
|
|
if (this.channelzEnabled) {
|
|
this.callTracker.addCallStarted();
|
|
}
|
|
this.callCount += 1;
|
|
}
|
|
|
|
private onCallEnd(status: StatusObject) {
|
|
if (this.channelzEnabled) {
|
|
if (status.code === Status.OK) {
|
|
this.callTracker.addCallSucceeded();
|
|
} else {
|
|
this.callTracker.addCallFailed();
|
|
}
|
|
}
|
|
this.callCount -= 1;
|
|
this.lastActivityTimestamp = new Date();
|
|
this.maybeStartIdleTimer();
|
|
}
|
|
|
|
createLoadBalancingCall(
|
|
callConfig: CallConfig,
|
|
method: string,
|
|
host: string,
|
|
credentials: CallCredentials,
|
|
deadline: Deadline
|
|
): LoadBalancingCall {
|
|
const callNumber = getNextCallNumber();
|
|
this.trace(
|
|
'createLoadBalancingCall [' + callNumber + '] method="' + method + '"'
|
|
);
|
|
return new LoadBalancingCall(
|
|
this,
|
|
callConfig,
|
|
method,
|
|
host,
|
|
credentials,
|
|
deadline,
|
|
callNumber
|
|
);
|
|
}
|
|
|
|
createRetryingCall(
|
|
callConfig: CallConfig,
|
|
method: string,
|
|
host: string,
|
|
credentials: CallCredentials,
|
|
deadline: Deadline
|
|
): RetryingCall {
|
|
const callNumber = getNextCallNumber();
|
|
this.trace(
|
|
'createRetryingCall [' + callNumber + '] method="' + method + '"'
|
|
);
|
|
return new RetryingCall(
|
|
this,
|
|
callConfig,
|
|
method,
|
|
host,
|
|
credentials,
|
|
deadline,
|
|
callNumber,
|
|
this.retryBufferTracker,
|
|
RETRY_THROTTLER_MAP.get(this.getTarget())
|
|
);
|
|
}
|
|
|
|
createInnerCall(
|
|
callConfig: CallConfig,
|
|
method: string,
|
|
host: string,
|
|
credentials: CallCredentials,
|
|
deadline: Deadline
|
|
): Call {
|
|
// Create a RetryingCall if retries are enabled
|
|
if (this.options['grpc.enable_retries'] === 0) {
|
|
return this.createLoadBalancingCall(
|
|
callConfig,
|
|
method,
|
|
host,
|
|
credentials,
|
|
deadline
|
|
);
|
|
} else {
|
|
return this.createRetryingCall(
|
|
callConfig,
|
|
method,
|
|
host,
|
|
credentials,
|
|
deadline
|
|
);
|
|
}
|
|
}
|
|
|
|
createResolvingCall(
|
|
method: string,
|
|
deadline: Deadline,
|
|
host: string | null | undefined,
|
|
parentCall: ServerSurfaceCall | null,
|
|
propagateFlags: number | null | undefined
|
|
): ResolvingCall {
|
|
const callNumber = getNextCallNumber();
|
|
this.trace(
|
|
'createResolvingCall [' +
|
|
callNumber +
|
|
'] method="' +
|
|
method +
|
|
'", deadline=' +
|
|
deadlineToString(deadline)
|
|
);
|
|
const finalOptions: CallStreamOptions = {
|
|
deadline: deadline,
|
|
flags: propagateFlags ?? Propagate.DEFAULTS,
|
|
host: host ?? this.defaultAuthority,
|
|
parentCall: parentCall,
|
|
};
|
|
|
|
const call = new ResolvingCall(
|
|
this,
|
|
method,
|
|
finalOptions,
|
|
this.filterStackFactory.clone(),
|
|
this.credentials._getCallCredentials(),
|
|
callNumber
|
|
);
|
|
|
|
this.onCallStart();
|
|
call.addStatusWatcher(status => {
|
|
this.onCallEnd(status);
|
|
});
|
|
return call;
|
|
}
|
|
|
|
close() {
|
|
this.resolvingLoadBalancer.destroy();
|
|
this.updateState(ConnectivityState.SHUTDOWN);
|
|
clearInterval(this.callRefTimer);
|
|
if (this.idleTimer) {
|
|
clearTimeout(this.idleTimer);
|
|
}
|
|
if (this.channelzEnabled) {
|
|
unregisterChannelzRef(this.channelzRef);
|
|
}
|
|
|
|
this.subchannelPool.unrefUnusedSubchannels();
|
|
}
|
|
|
|
getTarget() {
|
|
return uriToString(this.target);
|
|
}
|
|
|
|
getConnectivityState(tryToConnect: boolean) {
|
|
const connectivityState = this.connectivityState;
|
|
if (tryToConnect) {
|
|
this.resolvingLoadBalancer.exitIdle();
|
|
this.lastActivityTimestamp = new Date();
|
|
this.maybeStartIdleTimer();
|
|
}
|
|
return connectivityState;
|
|
}
|
|
|
|
watchConnectivityState(
|
|
currentState: ConnectivityState,
|
|
deadline: Date | number,
|
|
callback: (error?: Error) => void
|
|
): void {
|
|
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
|
throw new Error('Channel has been shut down');
|
|
}
|
|
let timer = null;
|
|
if (deadline !== Infinity) {
|
|
const deadlineDate: Date =
|
|
deadline instanceof Date ? deadline : new Date(deadline);
|
|
const now = new Date();
|
|
if (deadline === -Infinity || deadlineDate <= now) {
|
|
process.nextTick(
|
|
callback,
|
|
new Error('Deadline passed without connectivity state change')
|
|
);
|
|
return;
|
|
}
|
|
timer = setTimeout(() => {
|
|
this.removeConnectivityStateWatcher(watcherObject);
|
|
callback(
|
|
new Error('Deadline passed without connectivity state change')
|
|
);
|
|
}, deadlineDate.getTime() - now.getTime());
|
|
}
|
|
const watcherObject = {
|
|
currentState,
|
|
callback,
|
|
timer,
|
|
};
|
|
this.connectivityStateWatchers.push(watcherObject);
|
|
}
|
|
|
|
/**
|
|
* Get the channelz reference object for this channel. The returned value is
|
|
* garbage if channelz is disabled for this channel.
|
|
* @returns
|
|
*/
|
|
getChannelzRef() {
|
|
return this.channelzRef;
|
|
}
|
|
|
|
createCall(
|
|
method: string,
|
|
deadline: Deadline,
|
|
host: string | null | undefined,
|
|
parentCall: ServerSurfaceCall | null,
|
|
propagateFlags: number | null | undefined
|
|
): Call {
|
|
if (typeof method !== 'string') {
|
|
throw new TypeError('Channel#createCall: method must be a string');
|
|
}
|
|
if (!(typeof deadline === 'number' || deadline instanceof Date)) {
|
|
throw new TypeError(
|
|
'Channel#createCall: deadline must be a number or Date'
|
|
);
|
|
}
|
|
if (this.connectivityState === ConnectivityState.SHUTDOWN) {
|
|
throw new Error('Channel has been shut down');
|
|
}
|
|
return this.createResolvingCall(
|
|
method,
|
|
deadline,
|
|
host,
|
|
parentCall,
|
|
propagateFlags
|
|
);
|
|
}
|
|
}
|