Move all RPC I/O to separate thread.

This commit is contained in:
Jeff Garzik 2011-03-14 23:17:34 -04:00 committed by Jeff Garzik
parent cdb4cd9c8b
commit 4f7a51e9ed
5 changed files with 640 additions and 51 deletions

View file

@ -13,7 +13,7 @@ INCLUDES = $(PTHREAD_FLAGS) -fno-strict-aliasing $(JANSSON_INCLUDES)
bin_PROGRAMS = minerd
minerd_SOURCES = miner.h compat.h \
minerd_SOURCES = elist.h miner.h compat.h \
cpu-miner.c util.c \
sha256_generic.c sha256_4way.c sha256_via.c \
sha256_cryptopp.c sha256_sse2_amd64.c

View file

@ -32,6 +32,25 @@
#define DEF_RPC_URL "http://127.0.0.1:8332/"
#define DEF_RPC_USERPASS "rpcuser:rpcpass"
struct thr_info {
int id;
pthread_t pth;
struct thread_q *q;
};
enum workio_commands {
WC_GET_WORK,
WC_SUBMIT_WORK,
};
struct workio_cmd {
enum workio_commands cmd;
struct thr_info *thr;
union {
struct work *work;
} u;
};
enum sha256_algos {
ALGO_C, /* plain C */
ALGO_4WAY, /* parallel SSE2 */
@ -70,6 +89,8 @@ static enum sha256_algos opt_algo = ALGO_C;
static int opt_n_threads = 1;
static char *rpc_url;
static char *userpass;
static struct thr_info *thr_info;
static int work_thr_id;
struct option_help {
@ -214,20 +235,21 @@ err_out:
return false;
}
static void submit_work(CURL *curl, struct work *work)
static bool submit_upstream_work(CURL *curl, const struct work *work)
{
char *hexstr = NULL;
json_t *val, *res;
char s[345], timestr[64];
time_t now;
struct tm *tm;
bool rc = false;
now = time(NULL);
/* build hex string */
hexstr = bin2hex(work->data, sizeof(work->data));
if (!hexstr) {
fprintf(stderr, "submit_work OOM\n");
fprintf(stderr, "submit_upstream_work OOM\n");
goto out;
}
@ -242,7 +264,7 @@ static void submit_work(CURL *curl, struct work *work)
/* issue JSON-RPC request */
val = json_rpc_call(curl, rpc_url, userpass, s);
if (!val) {
fprintf(stderr, "submit_work json_rpc_call failed\n");
fprintf(stderr, "submit_upstream_work json_rpc_call failed\n");
goto out;
}
@ -256,11 +278,14 @@ static void submit_work(CURL *curl, struct work *work)
json_decref(val);
rc = true;
out:
free(hexstr);
return rc;
}
static bool get_work(CURL *curl, struct work *work)
static bool get_upstream_work(CURL *curl, struct work *work)
{
static const char *rpc_req =
"{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";
@ -278,6 +303,120 @@ static bool get_work(CURL *curl, struct work *work)
return rc;
}
static void workio_cmd_free(struct workio_cmd *wc)
{
if (!wc)
return;
switch (wc->cmd) {
case WC_SUBMIT_WORK:
free(wc->u.work);
break;
default: /* do nothing */
break;
}
memset(wc, 0, sizeof(*wc)); /* poison */
free(wc);
}
static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
{
struct work *ret_work;
int failures = 0;
ret_work = calloc(1, sizeof(*ret_work));
if (!ret_work)
return false;
/* obtain new work from bitcoin via JSON-RPC */
while (!get_upstream_work(curl, ret_work)) {
fprintf(stderr, "json_rpc_call failed, ");
if ((opt_retries >= 0) && (++failures > opt_retries)) {
fprintf(stderr, "terminating workio thread\n");
free(ret_work);
return false;
}
/* pause, then restart work-request loop */
fprintf(stderr, "retry after %d seconds\n",
opt_fail_pause);
sleep(opt_fail_pause);
}
/* send work to requesting thread */
if (!tq_push(wc->thr->q, ret_work))
free(ret_work);
return true;
}
static bool workio_submit_work(struct workio_cmd *wc, CURL *curl)
{
int failures = 0;
/* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(curl, wc->u.work)) {
if ((opt_retries >= 0) && (++failures > opt_retries)) {
fprintf(stderr, "...terminating workio thread\n");
return false;
}
/* pause, then restart work-request loop */
fprintf(stderr, "...retry after %d seconds\n",
opt_fail_pause);
sleep(opt_fail_pause);
}
return true;
}
static void *workio_thread(void *userdata)
{
struct thr_info *mythr = userdata;
CURL *curl;
bool ok = true;
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "CURL initialization failed\n");
return NULL;
}
while (ok) {
struct workio_cmd *wc;
/* wait for workio_cmd sent to us, on our queue */
wc = tq_pop(mythr->q, NULL);
if (!wc) {
ok = false;
break;
}
/* process workio_cmd */
switch (wc->cmd) {
case WC_GET_WORK:
ok = workio_get_work(wc, curl);
break;
case WC_SUBMIT_WORK:
ok = workio_submit_work(wc, curl);
break;
default: /* should never happen */
ok = false;
break;
}
workio_cmd_free(wc);
}
tq_freeze(mythr->q);
curl_easy_cleanup(curl);
return NULL;
}
static void hashmeter(int thr_id, const struct timeval *diff,
unsigned long hashes_done)
{
@ -292,39 +431,82 @@ static void hashmeter(int thr_id, const struct timeval *diff,
khashes / secs);
}
static void *miner_thread(void *thr_id_int)
static bool get_work(struct thr_info *thr, struct work *work)
{
int thr_id = (unsigned long) thr_id_int;
int failures = 0;
uint32_t max_nonce = 0xffffff;
CURL *curl;
struct workio_cmd *wc;
struct work *work_heap;
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "CURL initialization failed\n");
return NULL;
/* fill out work request message */
wc = calloc(1, sizeof(*wc));
if (!wc)
return false;
wc->cmd = WC_GET_WORK;
wc->thr = thr;
/* send work request to workio thread */
if (!tq_push(thr_info[work_thr_id].q, wc)) {
workio_cmd_free(wc);
return false;
}
/* wait for response, a unit of work */
work_heap = tq_pop(thr->q, NULL);
if (!work_heap)
return false;
/* copy returned work into storage provided by caller */
memcpy(work, work_heap, sizeof(*work));
free(work_heap);
return true;
}
static bool submit_work(struct thr_info *thr, const struct work *work_in)
{
struct workio_cmd *wc;
/* fill out work request message */
wc = calloc(1, sizeof(*wc));
if (!wc)
return false;
wc->u.work = malloc(sizeof(*work_in));
if (!wc->u.work)
goto err_out;
wc->cmd = WC_SUBMIT_WORK;
wc->thr = thr;
memcpy(wc->u.work, work_in, sizeof(*work_in));
/* send solution to workio thread */
if (!tq_push(thr_info[work_thr_id].q, wc))
goto err_out;
return true;
err_out:
workio_cmd_free(wc);
return false;
}
static void *miner_thread(void *userdata)
{
struct thr_info *mythr = userdata;
int thr_id = mythr->id;
uint32_t max_nonce = 0xffffff;
while (1) {
struct work work __attribute__((aligned(128)));
unsigned long hashes_done;
struct timeval tv_start, tv_end, diff;
bool rc;
/* obtain new work from bitcoin */
if (!get_work(curl, &work)) {
fprintf(stderr, "json_rpc_call failed, ");
if ((opt_retries >= 0) && (++failures > opt_retries)) {
fprintf(stderr, "terminating thread\n");
return NULL; /* exit thread */
}
/* pause, then restart work loop */
fprintf(stderr, "retry after %d seconds\n",
opt_fail_pause);
sleep(opt_fail_pause);
continue;
/* obtain new work from internal workio thread */
if (!get_work(mythr, &work)) {
fprintf(stderr, "work retrieval failed, exiting "
"mining thread %d\n", mythr->id);
goto out;
}
hashes_done = 0;
@ -384,7 +566,7 @@ static void *miner_thread(void *thr_id_int)
default:
/* should never happen */
return NULL;
goto out;
}
/* record scanhash elapsed time */
@ -404,13 +586,12 @@ static void *miner_thread(void *thr_id_int)
max_nonce += 100000; /* small increase */
/* if nonce found, submit work */
if (rc)
submit_work(curl, &work);
failures = 0;
if (rc && !submit_work(mythr, &work))
break;
}
curl_easy_cleanup(curl);
out:
tq_freeze(mythr->q);
return NULL;
}
@ -564,8 +745,8 @@ static void parse_cmdline(int argc, char *argv[])
int main (int argc, char *argv[])
{
struct thr_info *thr;
int i;
pthread_t *t_all;
rpc_url = strdup(DEF_RPC_URL);
userpass = strdup(DEF_RPC_USERPASS);
@ -577,14 +758,33 @@ int main (int argc, char *argv[])
if (setpriority(PRIO_PROCESS, 0, 19))
perror("setpriority");
t_all = calloc(opt_n_threads, sizeof(pthread_t));
if (!t_all)
thr_info = calloc(opt_n_threads + 1, sizeof(*thr));
if (!thr_info)
return 1;
work_thr_id = opt_n_threads;
thr = &thr_info[work_thr_id];
thr->id = opt_n_threads;
thr->q = tq_new();
if (!thr->q)
return 1;
/* start work I/O thread */
if (pthread_create(&thr->pth, NULL, workio_thread, thr)) {
fprintf(stderr, "workio thread create failed\n");
return 1;
}
/* start mining threads */
for (i = 0; i < opt_n_threads; i++) {
if (pthread_create(&t_all[i], NULL, miner_thread,
(void *)(unsigned long) i)) {
thr = &thr_info[i];
thr->id = i;
thr->q = tq_new();
if (!thr->q)
return 1;
if (pthread_create(&thr->pth, NULL, miner_thread, thr)) {
fprintf(stderr, "thread %d create failed\n", i);
return 1;
}
@ -597,11 +797,10 @@ int main (int argc, char *argv[])
opt_n_threads,
algo_names[opt_algo]);
/* main loop - simply wait for all threads to exit */
for (i = 0; i < opt_n_threads; i++)
pthread_join(t_all[i], NULL);
/* main loop - simply wait for workio thread to exit */
pthread_join(thr_info[work_thr_id].pth, NULL);
fprintf(stderr, "all threads dead, fred. exiting.\n");
fprintf(stderr, "workio thread dead, exiting.\n");
return 0;
}

251
elist.h Normal file
View file

@ -0,0 +1,251 @@
#ifndef _LINUX_LIST_H
#define _LINUX_LIST_H
/*
* Simple doubly linked list implementation.
*
* Some of the internal functions ("__xxx") are useful when
* manipulating whole lists rather than single entries, as
* sometimes we already know the next/prev entries and we can
* generate better code by using them directly rather than
* using the generic single-entry routines.
*/
struct list_head {
struct list_head *next, *prev;
};
#define LIST_HEAD_INIT(name) { &(name), &(name) }
#define LIST_HEAD(name) \
struct list_head name = LIST_HEAD_INIT(name)
#define INIT_LIST_HEAD(ptr) do { \
(ptr)->next = (ptr); (ptr)->prev = (ptr); \
} while (0)
/*
* Insert a new entry between two known consecutive entries.
*
* This is only for internal list manipulation where we know
* the prev/next entries already!
*/
static inline void __list_add(struct list_head *new,
struct list_head *prev,
struct list_head *next)
{
next->prev = new;
new->next = next;
new->prev = prev;
prev->next = new;
}
/**
* list_add - add a new entry
* @new: new entry to be added
* @head: list head to add it after
*
* Insert a new entry after the specified head.
* This is good for implementing stacks.
*/
static inline void list_add(struct list_head *new, struct list_head *head)
{
__list_add(new, head, head->next);
}
/**
* list_add_tail - add a new entry
* @new: new entry to be added
* @head: list head to add it before
*
* Insert a new entry before the specified head.
* This is useful for implementing queues.
*/
static inline void list_add_tail(struct list_head *new, struct list_head *head)
{
__list_add(new, head->prev, head);
}
/*
* Delete a list entry by making the prev/next entries
* point to each other.
*
* This is only for internal list manipulation where we know
* the prev/next entries already!
*/
static inline void __list_del(struct list_head *prev, struct list_head *next)
{
next->prev = prev;
prev->next = next;
}
/**
* list_del - deletes entry from list.
* @entry: the element to delete from the list.
* Note: list_empty on entry does not return true after this, the entry is in an undefined state.
*/
static inline void list_del(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
entry->next = (void *) 0;
entry->prev = (void *) 0;
}
/**
* list_del_init - deletes entry from list and reinitialize it.
* @entry: the element to delete from the list.
*/
static inline void list_del_init(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
INIT_LIST_HEAD(entry);
}
/**
* list_move - delete from one list and add as another's head
* @list: the entry to move
* @head: the head that will precede our entry
*/
static inline void list_move(struct list_head *list, struct list_head *head)
{
__list_del(list->prev, list->next);
list_add(list, head);
}
/**
* list_move_tail - delete from one list and add as another's tail
* @list: the entry to move
* @head: the head that will follow our entry
*/
static inline void list_move_tail(struct list_head *list,
struct list_head *head)
{
__list_del(list->prev, list->next);
list_add_tail(list, head);
}
/**
* list_empty - tests whether a list is empty
* @head: the list to test.
*/
static inline int list_empty(struct list_head *head)
{
return head->next == head;
}
static inline void __list_splice(struct list_head *list,
struct list_head *head)
{
struct list_head *first = list->next;
struct list_head *last = list->prev;
struct list_head *at = head->next;
first->prev = head;
head->next = first;
last->next = at;
at->prev = last;
}
/**
* list_splice - join two lists
* @list: the new list to add.
* @head: the place to add it in the first list.
*/
static inline void list_splice(struct list_head *list, struct list_head *head)
{
if (!list_empty(list))
__list_splice(list, head);
}
/**
* list_splice_init - join two lists and reinitialise the emptied list.
* @list: the new list to add.
* @head: the place to add it in the first list.
*
* The list at @list is reinitialised
*/
static inline void list_splice_init(struct list_head *list,
struct list_head *head)
{
if (!list_empty(list)) {
__list_splice(list, head);
INIT_LIST_HEAD(list);
}
}
/**
* list_entry - get the struct for this entry
* @ptr: the &struct list_head pointer.
* @type: the type of the struct this is embedded in.
* @member: the name of the list_struct within the struct.
*/
#define list_entry(ptr, type, member) \
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
/**
* list_for_each - iterate over a list
* @pos: the &struct list_head to use as a loop counter.
* @head: the head for your list.
*/
#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); \
pos = pos->next)
/**
* list_for_each_prev - iterate over a list backwards
* @pos: the &struct list_head to use as a loop counter.
* @head: the head for your list.
*/
#define list_for_each_prev(pos, head) \
for (pos = (head)->prev; pos != (head); \
pos = pos->prev)
/**
* list_for_each_safe - iterate over a list safe against removal of list entry
* @pos: the &struct list_head to use as a loop counter.
* @n: another &struct list_head to use as temporary storage
* @head: the head for your list.
*/
#define list_for_each_safe(pos, n, head) \
for (pos = (head)->next, n = pos->next; pos != (head); \
pos = n, n = pos->next)
/**
* list_for_each_entry - iterate over list of given type
* @pos: the type * to use as a loop counter.
* @head: the head for your list.
* @member: the name of the list_struct within the struct.
*/
#define list_for_each_entry(pos, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member); \
&pos->member != (head); \
pos = list_entry(pos->member.next, typeof(*pos), member))
/**
* list_for_each_entry_safe - iterate over list of given type safe against removal of list entry
* @pos: the type * to use as a loop counter.
* @n: another type * to use as temporary storage
* @head: the head for your list.
* @member: the name of the list_struct within the struct.
*/
#define list_for_each_entry_safe(pos, n, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member), \
n = list_entry(pos->member.next, typeof(*pos), member); \
&pos->member != (head); \
pos = n, n = list_entry(n->member.next, typeof(*n), member))
/**
* list_for_each_entry_continue - iterate over list of given type
* continuing after existing point
* @pos: the type * to use as a loop counter.
* @head: the head for your list.
* @member: the name of the list_struct within the struct.
*/
#define list_for_each_entry_continue(pos, head, member) \
for (pos = list_entry(pos->member.next, typeof(*pos), member), \
prefetch(pos->member.next); \
&pos->member != (head); \
pos = list_entry(pos->member.next, typeof(*pos), member), \
prefetch(pos->member.next))
#endif

11
miner.h
View file

@ -70,7 +70,7 @@ extern bool opt_protocol;
extern const uint32_t sha256_init_state[];
extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass,
const char *rpc_req);
extern char *bin2hex(unsigned char *p, size_t len);
extern char *bin2hex(const unsigned char *p, size_t len);
extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len);
extern unsigned int ScanHash_4WaySSE2(const unsigned char *pmidstate,
@ -109,4 +109,13 @@ timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);
extern bool fulltest(const unsigned char *hash, const unsigned char *target);
struct thread_q;
extern struct thread_q *tq_new(void);
extern void tq_free(struct thread_q *tq);
extern bool tq_push(struct thread_q *tq, void *data);
extern void *tq_pop(struct thread_q *tq, const struct timespec *abstime);
extern void tq_freeze(struct thread_q *tq);
extern void tq_thaw(struct thread_q *tq);
#endif /* __MINER_H__ */

132
util.c
View file

@ -14,9 +14,11 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <jansson.h>
#include <curl/curl.h>
#include "miner.h"
#include "elist.h"
struct data_buffer {
void *buf;
@ -28,6 +30,20 @@ struct upload_buffer {
size_t len;
};
struct tq_ent {
void *data;
struct list_head q_node;
};
struct thread_q {
struct list_head q;
bool frozen;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
static void databuf_free(struct data_buffer *db)
{
if (!db)
@ -179,7 +195,7 @@ err_out:
return NULL;
}
char *bin2hex(unsigned char *p, size_t len)
char *bin2hex(const unsigned char *p, size_t len)
{
int i;
char *s = malloc((len * 2) + 1);
@ -296,3 +312,117 @@ bool fulltest(const unsigned char *hash, const unsigned char *target)
return true; /* FIXME: return rc; */
}
struct thread_q *tq_new(void)
{
struct thread_q *tq;
tq = calloc(1, sizeof(*tq));
if (!tq)
return NULL;
INIT_LIST_HEAD(&tq->q);
pthread_mutex_init(&tq->mutex, NULL);
pthread_cond_init(&tq->cond, NULL);
return tq;
}
void tq_free(struct thread_q *tq)
{
struct tq_ent *ent, *iter;
if (!tq)
return;
list_for_each_entry_safe(ent, iter, &tq->q, q_node) {
list_del(&ent->q_node);
free(ent);
}
pthread_cond_destroy(&tq->cond);
pthread_mutex_destroy(&tq->mutex);
memset(tq, 0, sizeof(*tq)); /* poison */
free(tq);
}
static void tq_freezethaw(struct thread_q *tq, bool frozen)
{
pthread_mutex_lock(&tq->mutex);
tq->frozen = frozen;
pthread_cond_signal(&tq->cond);
pthread_mutex_unlock(&tq->mutex);
}
void tq_freeze(struct thread_q *tq)
{
tq_freezethaw(tq, true);
}
void tq_thaw(struct thread_q *tq)
{
tq_freezethaw(tq, false);
}
bool tq_push(struct thread_q *tq, void *data)
{
struct tq_ent *ent;
bool rc = true;
ent = calloc(1, sizeof(*ent));
if (!ent)
return false;
ent->data = data;
INIT_LIST_HEAD(&ent->q_node);
pthread_mutex_lock(&tq->mutex);
if (!tq->frozen) {
list_add_tail(&ent->q_node, &tq->q);
} else {
free(ent);
rc = false;
}
pthread_cond_signal(&tq->cond);
pthread_mutex_unlock(&tq->mutex);
return rc;
}
void *tq_pop(struct thread_q *tq, const struct timespec *abstime)
{
struct tq_ent *ent;
void *rval = NULL;
int rc;
pthread_mutex_lock(&tq->mutex);
if (!list_empty(&tq->q))
goto pop;
if (abstime)
rc = pthread_cond_timedwait(&tq->cond, &tq->mutex, abstime);
else
rc = pthread_cond_wait(&tq->cond, &tq->mutex);
if (rc)
goto out;
if (list_empty(&tq->q))
goto out;
pop:
ent = list_entry(tq->q.next, struct tq_ent, q_node);
rval = ent->data;
list_del(&ent->q_node);
free(ent);
out:
pthread_mutex_unlock(&tq->mutex);
return rval;
}