From 29676e09448092fb7072bfd82eac57c10b20ee8f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 15 Mar 2022 23:58:37 -0400 Subject: [PATCH] delete unused code --- scribe/elasticsearch/search.py | 227 --------------------------------- 1 file changed, 227 deletions(-) diff --git a/scribe/elasticsearch/search.py b/scribe/elasticsearch/search.py index 70785c7..3a95445 100644 --- a/scribe/elasticsearch/search.py +++ b/scribe/elasticsearch/search.py @@ -241,233 +241,6 @@ class SearchIndex: if clients: await asyncio.gather(*(client.close() for client in clients)) - def delete_index(self): - return self.sync_client.indices.delete(self.index, ignore_unavailable=True) - - async def _consume_claim_producer(self, claim_producer): - count = 0 - async for op, doc in claim_producer: - if op == 'delete': - yield { - '_index': self.index, - '_op_type': 'delete', - '_id': doc - } - else: - yield { - 'doc': {key: value for key, value in doc.items() if key in ALL_FIELDS}, - '_id': doc['claim_id'], - '_index': self.index, - '_op_type': 'update', - 'doc_as_upsert': True - } - count += 1 - if count % 100 == 0: - self.logger.info("Indexing in progress, %d claims.", count) - if count: - self.logger.info("Indexing done for %d claims.", count) - else: - self.logger.debug("Indexing done for %d claims.", count) - - async def claim_consumer(self, claim_producer): - touched = set() - async for ok, item in async_streaming_bulk(self.sync_client, self._consume_claim_producer(claim_producer), - raise_on_error=False): - if not ok: - self.logger.warning("indexing failed for an item: %s", item) - else: - item = item.popitem()[1] - touched.add(item['_id']) - await self.sync_client.indices.refresh(self.index) - self.logger.debug("Indexing done.") - - def update_filter_query(self, censor_type, blockdict, channels=False): - blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()} - if channels: - update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") - else: - update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") - key = 'channel_id' if channels else 'claim_id' - update['script'] = { - "source": f"ctx._source.censor_type={censor_type}; " - f"ctx._source.censoring_channel_id=params[ctx._source.{key}];", - "lang": "painless", - "params": blockdict - } - return update - - async def update_trending_score(self, params): - update_trending_score_script = """ - double softenLBC(double lbc) { return (Math.pow(lbc, 1.0 / 3.0)); } - - double logsumexp(double x, double y) - { - double top; - if(x > y) - top = x; - else - top = y; - double result = top + Math.log(Math.exp(x-top) + Math.exp(y-top)); - return(result); - } - - double logdiffexp(double big, double small) - { - return big + Math.log(1.0 - Math.exp(small - big)); - } - - double squash(double x) - { - if(x < 0.0) - return -Math.log(1.0 - x); - else - return Math.log(x + 1.0); - } - - double unsquash(double x) - { - if(x < 0.0) - return 1.0 - Math.exp(-x); - else - return Math.exp(x) - 1.0; - } - - double log_to_squash(double x) - { - return logsumexp(x, 0.0); - } - - double squash_to_log(double x) - { - //assert x > 0.0; - return logdiffexp(x, 0.0); - } - - double squashed_add(double x, double y) - { - // squash(unsquash(x) + unsquash(y)) but avoiding overflow. - // Cases where the signs are the same - if (x < 0.0 && y < 0.0) - return -logsumexp(-x, logdiffexp(-y, 0.0)); - if (x >= 0.0 && y >= 0.0) - return logsumexp(x, logdiffexp(y, 0.0)); - // Where the signs differ - if (x >= 0.0 && y < 0.0) - if (Math.abs(x) >= Math.abs(y)) - return logsumexp(0.0, logdiffexp(x, -y)); - else - return -logsumexp(0.0, logdiffexp(-y, x)); - if (x < 0.0 && y >= 0.0) - { - // Addition is commutative, hooray for new math - return squashed_add(y, x); - } - return 0.0; - } - - double squashed_multiply(double x, double y) - { - // squash(unsquash(x)*unsquash(y)) but avoiding overflow. - int sign; - if(x*y >= 0.0) - sign = 1; - else - sign = -1; - return sign*logsumexp(squash_to_log(Math.abs(x)) - + squash_to_log(Math.abs(y)), 0.0); - } - - // Squashed inflated units - double inflateUnits(int height) { - double timescale = 576.0; // Half life of 400 = e-folding time of a day - // by coincidence, so may as well go with it - return log_to_squash(height / timescale); - } - - double spikePower(double newAmount) { - if (newAmount < 50.0) { - return(0.5); - } else if (newAmount < 85.0) { - return(newAmount / 100.0); - } else { - return(0.85); - } - } - - double spikeMass(double oldAmount, double newAmount) { - double softenedChange = softenLBC(Math.abs(newAmount - oldAmount)); - double changeInSoftened = Math.abs(softenLBC(newAmount) - softenLBC(oldAmount)); - double power = spikePower(newAmount); - if (oldAmount > newAmount) { - -1.0 * Math.pow(changeInSoftened, power) * Math.pow(softenedChange, 1.0 - power) - } else { - Math.pow(changeInSoftened, power) * Math.pow(softenedChange, 1.0 - power) - } - } - for (i in params.src.changes) { - double units = inflateUnits(i.height); - if (ctx._source.trending_score == null) { - ctx._source.trending_score = 0.0; - } - double bigSpike = squashed_multiply(units, squash(spikeMass(i.prev_amount, i.new_amount))); - ctx._source.trending_score = squashed_add(ctx._source.trending_score, bigSpike); - } - """ - start = time.perf_counter() - - def producer(): - for claim_id, claim_updates in params.items(): - yield { - '_id': claim_id, - '_index': self.index, - '_op_type': 'update', - 'script': { - 'lang': 'painless', - 'source': update_trending_score_script, - 'params': {'src': { - 'changes': [ - { - 'height': p.height, - 'prev_amount': p.prev_amount / 1E8, - 'new_amount': p.new_amount / 1E8, - } for p in claim_updates - ] - }} - }, - } - if not params: - return - async for ok, item in async_streaming_bulk(self.sync_client, producer(), raise_on_error=False): - if not ok: - self.logger.warning("updating trending failed for an item: %s", item) - await self.sync_client.indices.refresh(self.index) - self.logger.info("updated trending scores in %ims", int((time.perf_counter() - start) * 1000)) - - async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): - if filtered_streams: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.SEARCH, filtered_streams), slices=4) - await self.sync_client.indices.refresh(self.index) - if filtered_channels: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels), slices=4) - await self.sync_client.indices.refresh(self.index) - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels, True), slices=4) - await self.sync_client.indices.refresh(self.index) - if blocked_streams: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_streams), slices=4) - await self.sync_client.indices.refresh(self.index) - if blocked_channels: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels), slices=4) - await self.sync_client.indices.refresh(self.index) - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) - await self.sync_client.indices.refresh(self.index) - self.clear_caches() - def clear_caches(self): self.search_cache.clear() self.claim_cache.clear()