Fix livestream state issues. Create unified long polling mechanism. #246

This commit is contained in:
infinite-persistence 2021-11-10 08:43:26 +08:00
commit c681d95ad7
No known key found for this signature in database
GPG key ID: B9C3252EDC3D0AA0
9 changed files with 166 additions and 123 deletions

View file

@ -135,7 +135,7 @@ const ClaimPreview = forwardRef<any, {}>((props: Props, ref: any) => {
renderActions, renderActions,
hideMenu = false, hideMenu = false,
// repostUrl, // repostUrl,
isLivestream, // need both? CHECK isLivestream,
isLivestreamActive, isLivestreamActive,
collectionId, collectionId,
collectionIndex, collectionIndex,
@ -146,6 +146,7 @@ const ClaimPreview = forwardRef<any, {}>((props: Props, ref: any) => {
indexInContainer, indexInContainer,
channelSubCount, channelSubCount,
} = props; } = props;
const isCollection = claim && claim.value_type === 'collection'; const isCollection = claim && claim.value_type === 'collection';
const collectionClaimId = isCollection && claim && claim.claim_id; const collectionClaimId = isCollection && claim && claim.claim_id;
const listId = collectionId || collectionClaimId; const listId = collectionId || collectionClaimId;
@ -482,11 +483,8 @@ const ClaimPreview = forwardRef<any, {}>((props: Props, ref: any) => {
</div> </div>
</div> </div>
{claimIsMine && isLivestream && ( {/* Todo: check isLivestreamActive once we have that data consistently everywhere. */}
<div className={'claim-preview__hints'}> {claim && isLivestream && <ClaimPreviewReset uri={uri} />}
<ClaimPreviewReset />
</div>
)}
{!hideMenu && <ClaimMenuList uri={uri} collectionId={listId} />} {!hideMenu && <ClaimMenuList uri={uri} collectionId={listId} />}
</> </>

View file

@ -1,13 +1,15 @@
import { connect } from 'react-redux'; import { connect } from 'react-redux';
import { selectActiveChannelClaim } from 'redux/selectors/app'; import { selectActiveChannelClaim } from 'redux/selectors/app';
import { makeSelectClaimIsMine } from 'redux/selectors/claims';
import { doToast } from 'redux/actions/notifications'; import { doToast } from 'redux/actions/notifications';
import ClaimPreviewReset from './view'; import ClaimPreviewReset from './view';
const select = (state) => { const select = (state, props) => {
const { claim_id: channelId, name: channelName } = selectActiveChannelClaim(state) || {}; const { claim_id: channelId, name: channelName } = selectActiveChannelClaim(state) || {};
return { return {
channelName, channelName,
channelId, channelId,
claimIsMine: props.uri && makeSelectClaimIsMine(props.uri)(state),
}; };
}; };

View file

@ -1,65 +1,33 @@
// @flow // @flow
import React from 'react'; import React from 'react';
import Lbry from 'lbry';
import { LIVESTREAM_KILL } from 'constants/livestream';
import { SITE_HELP_EMAIL } from 'config'; import { SITE_HELP_EMAIL } from 'config';
import { toHex } from 'util/hex';
import Button from 'component/button'; import Button from 'component/button';
import { killStream } from '$web/src/livestreaming';
import watchLivestreamStatus from '$web/src/livestreaming/long-polling';
import 'scss/component/claim-preview-reset.scss'; import 'scss/component/claim-preview-reset.scss';
// @Todo: move out of component.
const getStreamData = async (channelId: string, channelName: string) => {
if (!channelId || !channelName) throw new Error('Invalid channel data provided.');
const channelNameHex = toHex(channelName);
let channelSignature;
try {
channelSignature = await Lbry.channel_sign({ channel_id: channelId, hexdata: channelNameHex });
if (!channelSignature || !channelSignature.signature || !channelSignature.signing_ts) {
throw new Error('Error getting channel signature.');
}
} catch (e) {
throw e;
}
return {
d: channelNameHex,
s: channelSignature.signature,
t: channelSignature.signing_ts,
};
};
// @Todo: move out of component.
const killStream = async (channelId: string, payload: any) => {
fetch(`${LIVESTREAM_KILL}/${channelId}`, {
method: 'POST',
mode: 'no-cors',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams(payload),
})
.then((res) => {
if (!res.status === 200) throw new Error('Kill stream API failed.');
})
.catch((e) => {
throw e;
});
};
type Props = { type Props = {
channelId: string, channelId: string,
channelName: string, channelName: string,
claimIsMine: boolean,
doToast: ({ message: string, isError?: boolean }) => void, doToast: ({ message: string, isError?: boolean }) => void,
}; };
const ClaimPreviewReset = (props: Props) => { const ClaimPreviewReset = (props: Props) => {
const { channelId, channelName, doToast } = props; const { channelId, channelName, claimIsMine, doToast } = props;
const [isLivestreaming, setIsLivestreaming] = React.useState(false);
React.useEffect(() => {
return watchLivestreamStatus(channelId, (state) => setIsLivestreaming(state));
}, [channelId, setIsLivestreaming]);
if (!claimIsMine || !isLivestreaming) return null;
const handleClick = async () => { const handleClick = async () => {
try { try {
const streamData = await getStreamData(channelId, channelName); await killStream(channelId, channelName);
await killStream(channelId, streamData);
doToast({ message: __('Live stream successfully reset.'), isError: false }); doToast({ message: __('Live stream successfully reset.'), isError: false });
} catch { } catch {
doToast({ message: __('There was an error resetting the live stream.'), isError: true }); doToast({ message: __('There was an error resetting the live stream.'), isError: true });

View file

@ -3,6 +3,7 @@ import React from 'react';
import DateTime from 'component/dateTime'; import DateTime from 'component/dateTime';
import FileViewCount from 'component/fileViewCount'; import FileViewCount from 'component/fileViewCount';
import FileActions from 'component/fileActions'; import FileActions from 'component/fileActions';
import ClaimPreviewReset from 'component/claimPreviewReset';
type Props = { type Props = {
uri: string, uri: string,
@ -15,15 +16,18 @@ function FileSubtitle(props: Props) {
const { uri, livestream = false, activeViewers, isLive = false } = props; const { uri, livestream = false, activeViewers, isLive = false } = props;
return ( return (
<div className="media__subtitle--between"> <>
<div className="file__viewdate"> <div className="media__subtitle--between">
{livestream ? <span>{__('Right now')}</span> : <DateTime uri={uri} show={DateTime.SHOW_DATE} />} <div className="file__viewdate">
{livestream ? <span>{__('Right now')}</span> : <DateTime uri={uri} show={DateTime.SHOW_DATE} />}
<FileViewCount uri={uri} livestream={livestream} activeViewers={activeViewers} isLive={isLive} /> <FileViewCount uri={uri} livestream={livestream} activeViewers={activeViewers} isLive={isLive} />
</div>
<FileActions uri={uri} hideRepost={livestream} livestream={livestream} />
</div> </div>
{livestream && isLive && <ClaimPreviewReset uri={uri} />}
<FileActions uri={uri} hideRepost={livestream} livestream={livestream} /> </>
</div>
); );
} }

View file

@ -1,5 +1,4 @@
// @flow // @flow
import { LIVESTREAM_LIVE_API } from 'constants/livestream';
import * as CS from 'constants/claim_search'; import * as CS from 'constants/claim_search';
import React from 'react'; import React from 'react';
import Card from 'component/common/card'; import Card from 'component/common/card';
@ -7,6 +6,7 @@ import ClaimPreview from 'component/claimPreview';
import Lbry from 'lbry'; import Lbry from 'lbry';
import { useHistory } from 'react-router'; import { useHistory } from 'react-router';
import { formatLbryUrlForWeb } from 'util/url'; import { formatLbryUrlForWeb } from 'util/url';
import watchLivestreamStatus from '$web/src/livestreaming/long-polling';
type Props = { type Props = {
channelClaim: ChannelClaim, channelClaim: ChannelClaim,
@ -47,38 +47,9 @@ export default function LivestreamLink(props: Props) {
}, [livestreamChannelId, isChannelEmpty]); }, [livestreamChannelId, isChannelEmpty]);
React.useEffect(() => { React.useEffect(() => {
function fetchIsStreaming() { if (!livestreamClaim) return;
// $FlowFixMe livestream API can handle garbage return watchLivestreamStatus(livestreamChannelId, (state) => setIsLivestreaming(state));
fetch(`${LIVESTREAM_LIVE_API}/${livestreamChannelId}`) }, [livestreamChannelId, setIsLivestreaming, livestreamClaim]);
.then((res) => res.json())
.then((res) => {
if (res && res.success && res.data && res.data.live) {
setIsLivestreaming(true);
} else {
setIsLivestreaming(false);
}
})
.catch((e) => {});
}
let interval;
// Only call livestream api if channel has livestream claims
if (livestreamChannelId && livestreamClaim) {
if (!interval) fetchIsStreaming();
interval = setInterval(fetchIsStreaming, 10 * 1000);
}
// Prevent any more api calls on update
if (!livestreamChannelId || !livestreamClaim) {
if (interval) {
clearInterval(interval);
}
}
return () => {
if (interval) {
clearInterval(interval);
}
};
}, [livestreamChannelId, livestreamClaim]);
if (!livestreamClaim || !isLivestreaming) { if (!livestreamClaim || !isLivestreaming) {
return null; return null;
@ -87,7 +58,7 @@ export default function LivestreamLink(props: Props) {
// gonna pass the wrapper in so I don't have to rewrite the dmca/blocking logic in claimPreview. // gonna pass the wrapper in so I don't have to rewrite the dmca/blocking logic in claimPreview.
const element = (props: { children: any }) => ( const element = (props: { children: any }) => (
<Card <Card
className="livestream__channel-link" className="livestream__channel-link claim-preview__live"
title={__('Live stream in progress')} title={__('Live stream in progress')}
onClick={() => { onClick={() => {
push(formatLbryUrlForWeb(livestreamClaim.canonical_url)); push(formatLbryUrlForWeb(livestreamClaim.canonical_url));

View file

@ -1,11 +1,11 @@
// @flow // @flow
import { LIVESTREAM_LIVE_API } from 'constants/livestream';
import React from 'react'; import React from 'react';
import Page from 'component/page'; import Page from 'component/page';
import LivestreamLayout from 'component/livestreamLayout'; import LivestreamLayout from 'component/livestreamLayout';
import LivestreamComments from 'component/livestreamComments'; import LivestreamComments from 'component/livestreamComments';
import analytics from 'analytics'; import analytics from 'analytics';
import Lbry from 'lbry'; import Lbry from 'lbry';
import watchLivestreamStatus from '$web/src/livestreaming/long-polling';
type Props = { type Props = {
uri: string, uri: string,
@ -23,7 +23,6 @@ export default function LivestreamPage(props: Props) {
const livestreamChannelId = channelClaim && channelClaim.signing_channel && channelClaim.signing_channel.claim_id; const livestreamChannelId = channelClaim && channelClaim.signing_channel && channelClaim.signing_channel.claim_id;
const [hasLivestreamClaim, setHasLivestreamClaim] = React.useState(false); const [hasLivestreamClaim, setHasLivestreamClaim] = React.useState(false);
const STREAMING_POLL_INTERVAL_IN_MS = 10000;
const LIVESTREAM_CLAIM_POLL_IN_MS = 60000; const LIVESTREAM_CLAIM_POLL_IN_MS = 60000;
React.useEffect(() => { React.useEffect(() => {
@ -54,34 +53,9 @@ export default function LivestreamPage(props: Props) {
}, [livestreamChannelId, isLive]); }, [livestreamChannelId, isLive]);
React.useEffect(() => { React.useEffect(() => {
let interval; if (!hasLivestreamClaim || !livestreamChannelId) return;
function checkIsLive() { return watchLivestreamStatus(livestreamChannelId, (state) => setIsLive(state));
// TODO: duplicate code below }, [livestreamChannelId, setIsLive, hasLivestreamClaim]);
// $FlowFixMe livestream API can handle garbage
fetch(`${LIVESTREAM_LIVE_API}/${livestreamChannelId}`)
.then((res) => res.json())
.then((res) => {
if (!res || !res.data) {
setIsLive(false);
return;
}
if (res.data.hasOwnProperty('live')) {
setIsLive(res.data.live);
}
});
}
if (livestreamChannelId && hasLivestreamClaim) {
if (!interval) checkIsLive();
interval = setInterval(checkIsLive, STREAMING_POLL_INTERVAL_IN_MS);
return () => {
if (interval) {
clearInterval(interval);
}
};
}
}, [livestreamChannelId, hasLivestreamClaim]);
const stringifiedClaim = JSON.stringify(claim); const stringifiedClaim = JSON.stringify(claim);
React.useEffect(() => { React.useEffect(() => {

View file

@ -3,6 +3,7 @@
.claimPreviewReset { .claimPreviewReset {
display: flex; display: flex;
align-items: center; align-items: center;
justify-content: space-between;
padding-top: var(--spacing-xs); padding-top: var(--spacing-xs);
color: var(--color-text-subtitle); color: var(--color-text-subtitle);
font-size: var(--font-small); font-size: var(--font-small);

View file

@ -0,0 +1,55 @@
// @flow
import Lbry from 'lbry';
import { LIVESTREAM_KILL, LIVESTREAM_LIVE_API } from 'constants/livestream';
import { toHex } from 'util/hex';
type StreamData = {
d: string,
s: string,
t: string,
};
export const getStreamData = async (channelId: string, channelName: string): Promise<StreamData> => {
if (!channelId || !channelName) throw new Error('Invalid channel data provided.');
const channelNameHex = toHex(channelName);
let channelSignature;
try {
channelSignature = await Lbry.channel_sign({ channel_id: channelId, hexdata: channelNameHex });
if (!channelSignature || !channelSignature.signature || !channelSignature.signing_ts) {
throw new Error('Error getting channel signature.');
}
} catch (e) {
throw e;
}
return { d: channelNameHex, s: channelSignature.signature, t: channelSignature.signing_ts };
};
export const killStream = async (channelId: string, channelName: string) => {
try {
const streamData = await getStreamData(channelId, channelName);
fetch(`${LIVESTREAM_KILL}/${channelId}`, {
method: 'POST',
mode: 'no-cors',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams(streamData),
}).then((res) => {
if (res.status !== 200) throw new Error('Kill stream API failed.');
});
} catch (e) {
throw e;
}
};
export const isLiveStreaming = async (channelId: string): Promise<boolean> => {
try {
const response = await fetch(`${LIVESTREAM_LIVE_API}/${channelId}`);
const stream = await response.json();
return stream.data?.live;
} catch {
return false;
}
};

View file

@ -0,0 +1,70 @@
// @flow
/*
* This module is responsible for long polling the server to determine if a channel is actively streaming.
*
* One or many entities can subscribe to the live status while instantiating just one long poll interval per channel.
* Once all interested parties have disconnected the poll will shut down. For this reason, be sure to always call the
* disconnect function returned upon connecting.
*/
import { isLiveStreaming } from '$web/src/livestreaming';
const POLL_INTERVAL = 10000;
const pollers = {};
const pollingMechanism = {
streaming: false,
startPolling() {
if (this.interval !== 0) return;
const poll = async () => {
this.streaming = await isLiveStreaming(this.channelId);
this.subscribers.forEach((cb) => {
if (cb) cb(this.streaming);
});
};
poll();
this.interval = setInterval(poll, POLL_INTERVAL);
},
stopPolling() {
clearInterval(this.interval);
this.interval = 0;
},
connect(cb): number {
cb(this.streaming);
this.startPolling();
return this.subscribers.push(cb) - 1;
},
disconnect(subscriberIndex: number) {
this.subscribers[subscriberIndex] = null;
if (this.subscribers.every((item) => item === null)) {
this.stopPolling();
delete pollers[this.channelId];
}
},
};
const generateLongPoll = (channelId: string) => {
if (pollers[channelId]) return pollers[channelId];
pollers[channelId] = Object.create({
channelId,
interval: 0,
subscribers: [],
...pollingMechanism,
});
return pollers[channelId];
};
const watchLivestreamStatus = (channelId: ?string, cb: (boolean) => void) => {
if (!channelId || typeof cb !== 'function') return undefined;
const poll = generateLongPoll(channelId);
const subscriberIndex = poll.connect(cb);
return () => poll.disconnect(subscriberIndex);
};
export default watchLivestreamStatus;