diff options
Diffstat (limited to 'ot_livesync.c')
| -rw-r--r-- | ot_livesync.c | 292 |
1 files changed, 36 insertions, 256 deletions
diff --git a/ot_livesync.c b/ot_livesync.c index 9e1c723..87fe5cf 100644 --- a/ot_livesync.c +++ b/ot_livesync.c | |||
| @@ -33,23 +33,9 @@ char groupip_1[4] = { 224,0,23,5 }; | |||
| 33 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 33 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
| 34 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 34 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) |
| 35 | 35 | ||
| 36 | #ifdef WANT_SYNC_SCRAPE | ||
| 37 | #define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504 | ||
| 38 | #define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t)) | ||
| 39 | #define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100 | ||
| 40 | |||
| 41 | #define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */ | ||
| 42 | #define LIVESYNC_BEACON_INTERVAL 60 /* seconds */ | ||
| 43 | #define LIVESYNC_INQUIRE_THRESH 0.75 | ||
| 44 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 45 | |||
| 46 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | 36 | #define LIVESYNC_MAXDELAY 15 /* seconds */ |
| 47 | 37 | ||
| 48 | enum { OT_SYNC_PEER | 38 | enum { OT_SYNC_PEER }; |
| 49 | #ifdef WANT_SYNC_SCRAPE | ||
| 50 | , OT_SYNC_SCRAPE_BEACON, OT_SYNC_SCRAPE_INQUIRE, OT_SYNC_SCRAPE_TELL | ||
| 51 | #endif | ||
| 52 | }; | ||
| 53 | 39 | ||
| 54 | /* Forward declaration */ | 40 | /* Forward declaration */ |
| 55 | static void * livesync_worker( void * args ); | 41 | static void * livesync_worker( void * args ); |
| @@ -59,52 +45,24 @@ static int64 g_socket_in = -1; | |||
| 59 | 45 | ||
| 60 | /* For incoming packets */ | 46 | /* For incoming packets */ |
| 61 | static int64 g_socket_out = -1; | 47 | static int64 g_socket_out = -1; |
| 62 | static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | ||
| 63 | |||
| 64 | static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | ||
| 65 | static uint8_t *g_peerbuffer_pos; | ||
| 66 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; | ||
| 67 | 48 | ||
| 49 | char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | ||
| 50 | static size_t g_outbuf_data; | ||
| 68 | static ot_time g_next_packet_time; | 51 | static ot_time g_next_packet_time; |
| 69 | 52 | ||
| 70 | #ifdef WANT_SYNC_SCRAPE | ||
| 71 | /* Live sync scrape buffers, states and timers */ | ||
| 72 | static ot_time g_next_beacon_time; | ||
| 73 | static ot_time g_next_inquire_time; | ||
| 74 | |||
| 75 | static uint8_t g_scrapebuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE]; | ||
| 76 | static uint8_t *g_scrapebuffer_pos; | ||
| 77 | static uint8_t *g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE; | ||
| 78 | |||
| 79 | static size_t g_inquire_remote_count; | ||
| 80 | static uint32_t g_inquire_remote_host; | ||
| 81 | static int g_inquire_inprogress; | ||
| 82 | static int g_inquire_bucket; | ||
| 83 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 84 | |||
| 85 | static pthread_t thread_id; | 53 | static pthread_t thread_id; |
| 86 | void livesync_init( ) { | 54 | void livesync_init( ) { |
| 55 | |||
| 87 | if( g_socket_in == -1 ) | 56 | if( g_socket_in == -1 ) |
| 88 | exerr( "No socket address for live sync specified." ); | 57 | exerr( "No socket address for live sync specified." ); |
| 89 | 58 | ||
| 90 | /* Prepare outgoing peers buffer */ | 59 | /* Prepare outgoing peers buffer */ |
| 91 | g_peerbuffer_pos = g_peerbuffer_start; | 60 | memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) ); |
| 92 | memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | 61 | uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER); |
| 93 | uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); | 62 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); |
| 94 | g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
| 95 | |||
| 96 | #ifdef WANT_SYNC_SCRAPE | ||
| 97 | /* Prepare outgoing scrape buffer */ | ||
| 98 | g_scrapebuffer_pos = g_scrapebuffer_start; | ||
| 99 | memcpy( g_scrapebuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 100 | uint32_pack_big( (char*)g_scrapebuffer_pos + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_TELL); | ||
| 101 | g_scrapebuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
| 102 | |||
| 103 | /* Wind up timers for inquires */ | ||
| 104 | g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY; | ||
| 105 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 106 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | ||
| 107 | 63 | ||
| 64 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | ||
| 65 | |||
| 108 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); | 66 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); |
| 109 | } | 67 | } |
| 110 | 68 | ||
| @@ -148,264 +106,86 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | |||
| 148 | } | 106 | } |
| 149 | 107 | ||
| 150 | static void livesync_issue_peersync( ) { | 108 | static void livesync_issue_peersync( ) { |
| 151 | socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, | 109 | socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT); |
| 152 | groupip_1, LIVESYNC_PORT); | 110 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); |
| 153 | g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
| 154 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 111 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
| 155 | } | 112 | } |
| 156 | 113 | ||
| 157 | static void livesync_handle_peersync( ssize_t datalen ) { | 114 | static void livesync_handle_peersync( struct ot_workstruct *ws ) { |
| 158 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 115 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); |
| 159 | 116 | ||
| 160 | /* Now basic sanity checks have been done on the live sync packet | 117 | /* Now basic sanity checks have been done on the live sync packet |
| 161 | We might add more testing and logging. */ | 118 | We might add more testing and logging. */ |
| 162 | while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { | 119 | while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= ws->request_size ) { |
| 163 | ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); | 120 | memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), sizeof( ot_peer ) ); |
| 164 | ot_hash *hash = (ot_hash*)(g_inbuffer + off); | 121 | ws->hash = (ot_hash*)(ws->request + off); |
| 165 | 122 | ||
| 166 | if( !g_opentracker_running ) return; | 123 | if( !g_opentracker_running ) return; |
| 167 | 124 | ||
| 168 | if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) | 125 | if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_STOPPED ) |
| 169 | remove_peer_from_torrent( *hash, peer, NULL, FLAG_MCA ); | 126 | remove_peer_from_torrent( FLAG_MCA, ws ); |
| 170 | else | 127 | else |
| 171 | add_peer_to_torrent( *hash, peer, FLAG_MCA ); | 128 | add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); |
| 172 | 129 | ||
| 173 | off += sizeof( ot_hash ) + sizeof( ot_peer ); | 130 | off += sizeof( ot_hash ) + sizeof( ot_peer ); |
| 174 | } | 131 | } |
| 175 | 132 | ||
| 176 | stats_issue_event(EVENT_SYNC, 0, | 133 | stats_issue_event(EVENT_SYNC, 0, |
| 177 | (datalen - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / | 134 | (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / |
| 178 | ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); | 135 | ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); |
| 179 | } | 136 | } |
| 180 | 137 | ||
| 181 | #ifdef WANT_SYNC_SCRAPE | ||
| 182 | void livesync_issue_beacon( ) { | ||
| 183 | size_t torrent_count = mutex_get_torrent_count(); | ||
| 184 | uint8_t beacon[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ]; | ||
| 185 | |||
| 186 | memcpy( beacon, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 187 | uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_BEACON); | ||
| 188 | uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + sizeof(uint32_t), (uint32_t)((uint64_t)(torrent_count)>>32) ); | ||
| 189 | uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + 2 * sizeof(uint32_t), (uint32_t)torrent_count ); | ||
| 190 | |||
| 191 | socket_send4(g_socket_out, (char*)beacon, sizeof(beacon), groupip_1, LIVESYNC_PORT); | ||
| 192 | } | ||
| 193 | |||
| 194 | void livesync_handle_beacon( ssize_t datalen ) { | ||
| 195 | size_t torrent_count_local, torrent_count_remote; | ||
| 196 | if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ) | ||
| 197 | return; | ||
| 198 | torrent_count_local = mutex_get_torrent_count(); | ||
| 199 | torrent_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + sizeof(uint32_t))) << 32); | ||
| 200 | torrent_count_remote |= (size_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + 2 * sizeof(uint32_t)); | ||
| 201 | |||
| 202 | /* Empty tracker is useless */ | ||
| 203 | if( !torrent_count_remote ) return; | ||
| 204 | |||
| 205 | if( ((double)torrent_count_local ) / ((double)torrent_count_remote) < LIVESYNC_INQUIRE_THRESH) { | ||
| 206 | if( !g_next_inquire_time ) { | ||
| 207 | g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL; | ||
| 208 | g_inquire_remote_count = 0; | ||
| 209 | } | ||
| 210 | |||
| 211 | if( torrent_count_remote > g_inquire_remote_count ) { | ||
| 212 | g_inquire_remote_count = torrent_count_remote; | ||
| 213 | memcpy( &g_inquire_remote_host, g_inbuffer, sizeof( g_tracker_id ) ); | ||
| 214 | } | ||
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 218 | void livesync_issue_inquire( ) { | ||
| 219 | uint8_t inquire[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id)]; | ||
| 220 | |||
| 221 | memcpy( inquire, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 222 | uint32_pack_big( (char*)inquire + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_INQUIRE); | ||
| 223 | memcpy( inquire + sizeof(g_tracker_id) + sizeof(uint32_t), &g_inquire_remote_host, sizeof( g_tracker_id ) ); | ||
| 224 | |||
| 225 | socket_send4(g_socket_out, (char*)inquire, sizeof(inquire), groupip_1, LIVESYNC_PORT); | ||
| 226 | } | ||
| 227 | |||
| 228 | void livesync_handle_inquire( ssize_t datalen ) { | ||
| 229 | if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id) ) | ||
| 230 | return; | ||
| 231 | |||
| 232 | /* If it isn't us, they're inquiring, ignore inquiry */ | ||
| 233 | if( memcmp( &g_tracker_id, g_inbuffer, sizeof( g_tracker_id ) ) ) | ||
| 234 | return; | ||
| 235 | |||
| 236 | /* Start scrape tell on next ticker */ | ||
| 237 | if( !g_inquire_inprogress ) { | ||
| 238 | g_inquire_inprogress = 1; | ||
| 239 | g_inquire_bucket = 0; | ||
| 240 | } | ||
| 241 | } | ||
| 242 | |||
| 243 | void livesync_issue_tell( ) { | ||
| 244 | int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE; | ||
| 245 | while( packets_to_send > 0 && g_inquire_bucket < OT_BUCKET_COUNT ) { | ||
| 246 | ot_vector *torrents_list = mutex_bucket_lock( g_inquire_bucket ); | ||
| 247 | unsigned int j; | ||
| 248 | for( j=0; j<torrents_list->size; ++j ) { | ||
| 249 | ot_torrent *torrent = (ot_torrent*)(torrents_list->data) + j; | ||
| 250 | memcpy(g_scrapebuffer_pos, torrent->hash, sizeof(ot_hash)); | ||
| 251 | g_scrapebuffer_pos += sizeof(ot_hash); | ||
| 252 | uint32_pack_big( (char*)g_scrapebuffer_pos , (uint32_t)(g_now_minutes - torrent->peer_list->base )); | ||
| 253 | uint32_pack_big( (char*)g_scrapebuffer_pos + 4, (uint32_t)((uint64_t)(torrent->peer_list->down_count)>>32) ); | ||
| 254 | uint32_pack_big( (char*)g_scrapebuffer_pos + 8, (uint32_t)torrent->peer_list->down_count ); | ||
| 255 | g_scrapebuffer_pos += 12; | ||
| 256 | |||
| 257 | if( g_scrapebuffer_pos >= g_scrapebuffer_highwater ) { | ||
| 258 | socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); | ||
| 259 | g_scrapebuffer_pos = g_scrapebuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
| 260 | --packets_to_send; | ||
| 261 | } | ||
| 262 | } | ||
| 263 | mutex_bucket_unlock( g_inquire_bucket++, 0 ); | ||
| 264 | if( !g_opentracker_running ) | ||
| 265 | return; | ||
| 266 | } | ||
| 267 | if( g_inquire_bucket == OT_BUCKET_COUNT ) { | ||
| 268 | socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); | ||
| 269 | g_inquire_inprogress = 0; | ||
| 270 | } | ||
| 271 | } | ||
| 272 | |||
| 273 | void livesync_handle_tell( ssize_t datalen ) { | ||
| 274 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
| 275 | |||
| 276 | /* Some instance is in progress of telling. Our inquiry was successful. | ||
| 277 | Don't ask again until we see next beacon. */ | ||
| 278 | g_next_inquire_time = 0; | ||
| 279 | |||
| 280 | /* Don't cause any new inquiries during another tracker's tell */ | ||
| 281 | if( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL ) | ||
| 282 | g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; | ||
| 283 | |||
| 284 | while( off + sizeof(ot_hash) + 12 <= (size_t)datalen ) { | ||
| 285 | ot_hash *hash = (ot_hash*)(g_inbuffer+off); | ||
| 286 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(*hash); | ||
| 287 | size_t down_count_remote; | ||
| 288 | int exactmatch; | ||
| 289 | ot_torrent *torrent = vector_find_or_insert(torrents_list, hash, sizeof(ot_hash), OT_HASH_COMPARE_SIZE, &exactmatch); | ||
| 290 | |||
| 291 | if( !torrent ) { | ||
| 292 | mutex_bucket_unlock_by_hash( *hash, 0 ); | ||
| 293 | continue; | ||
| 294 | } | ||
| 295 | |||
| 296 | if( !exactmatch ) { | ||
| 297 | /* Create a new torrent entry, then */ | ||
| 298 | memcpy( &torrent->hash, hash, sizeof(ot_hash)); | ||
| 299 | |||
| 300 | if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { | ||
| 301 | vector_remove_torrent( torrents_list, torrent ); | ||
| 302 | mutex_bucket_unlock_by_hash( *hash, 0 ); | ||
| 303 | continue; | ||
| 304 | } | ||
| 305 | |||
| 306 | byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); | ||
| 307 | torrent->peer_list->base = g_now_minutes - uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash)); | ||
| 308 | } | ||
| 309 | |||
| 310 | down_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + sizeof(uint32_t))) << 32); | ||
| 311 | down_count_remote |= (size_t) uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + 2 * sizeof(uint32_t)); | ||
| 312 | |||
| 313 | if( down_count_remote > torrent->peer_list->down_count ) | ||
| 314 | torrent->peer_list->down_count = down_count_remote; | ||
| 315 | /* else | ||
| 316 | We might think of sending a tell packet, if we have a much larger downloaded count | ||
| 317 | */ | ||
| 318 | |||
| 319 | mutex_bucket_unlock( g_inquire_bucket++, exactmatch?0:1 ); | ||
| 320 | if( !g_opentracker_running ) | ||
| 321 | return; | ||
| 322 | off += sizeof(ot_hash) + 12; | ||
| 323 | } | ||
| 324 | } | ||
| 325 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 326 | |||
| 327 | /* Tickle the live sync module from time to time, so no events get | 138 | /* Tickle the live sync module from time to time, so no events get |
| 328 | stuck when there's not enough traffic to fill udp packets fast | 139 | stuck when there's not enough traffic to fill udp packets fast |
| 329 | enough */ | 140 | enough */ |
| 330 | void livesync_ticker( ) { | 141 | void livesync_ticker( ) { |
| 331 | |||
| 332 | /* livesync_issue_peersync sets g_next_packet_time */ | 142 | /* livesync_issue_peersync sets g_next_packet_time */ |
| 333 | if( g_now_seconds > g_next_packet_time && | 143 | if( g_now_seconds > g_next_packet_time && |
| 334 | g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) | 144 | g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) |
| 335 | livesync_issue_peersync(); | 145 | livesync_issue_peersync(); |
| 336 | |||
| 337 | #ifdef WANT_SYNC_SCRAPE | ||
| 338 | /* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY | ||
| 339 | seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */ | ||
| 340 | if( g_now_seconds > g_next_beacon_time ) { | ||
| 341 | livesync_issue_beacon( ); | ||
| 342 | g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; | ||
| 343 | } | ||
| 344 | |||
| 345 | /* If we're interested in an inquiry and waited long enough to see all | ||
| 346 | tracker's beacons, go ahead and inquire */ | ||
| 347 | if( g_next_inquire_time && g_now_seconds > g_next_inquire_time ) { | ||
| 348 | livesync_issue_inquire(); | ||
| 349 | |||
| 350 | /* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */ | ||
| 351 | g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; | ||
| 352 | } | ||
| 353 | |||
| 354 | /* If we're in process of telling, let's tell. */ | ||
| 355 | if( g_inquire_inprogress ) | ||
| 356 | livesync_issue_tell( ); | ||
| 357 | |||
| 358 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 359 | } | 146 | } |
| 360 | 147 | ||
| 361 | /* Inform live sync about whats going on. */ | 148 | /* Inform live sync about whats going on. */ |
| 362 | void livesync_tell( ot_hash const info_hash, const ot_peer * const peer ) { | 149 | void livesync_tell( struct ot_workstruct *ws ) { |
| 363 | 150 | ||
| 364 | memcpy( g_peerbuffer_pos, info_hash, sizeof(ot_hash) ); | 151 | memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); |
| 365 | memcpy( g_peerbuffer_pos+sizeof(ot_hash), peer, sizeof(ot_peer) ); | 152 | memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); |
| 366 | 153 | ||
| 367 | g_peerbuffer_pos += sizeof(ot_hash)+sizeof(ot_peer); | 154 | g_outbuf_data += sizeof(ot_hash) + sizeof(ot_peer); |
| 368 | 155 | ||
| 369 | if( g_peerbuffer_pos >= g_peerbuffer_highwater ) | 156 | if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) |
| 370 | livesync_issue_peersync(); | 157 | livesync_issue_peersync(); |
| 371 | } | 158 | } |
| 372 | 159 | ||
| 373 | static void * livesync_worker( void * args ) { | 160 | static void * livesync_worker( void * args ) { |
| 161 | struct ot_workstruct ws; | ||
| 374 | ot_ip6 in_ip; uint16_t in_port; | 162 | ot_ip6 in_ip; uint16_t in_port; |
| 375 | ssize_t datalen; | ||
| 376 | 163 | ||
| 377 | (void)args; | 164 | (void)args; |
| 378 | 165 | ||
| 166 | /* Initialize our "thread local storage" */ | ||
| 167 | ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); | ||
| 168 | ws.outbuf = ws.reply = 0; | ||
| 169 | |||
| 379 | memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); | 170 | memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); |
| 380 | 171 | ||
| 381 | while( 1 ) { | 172 | while( 1 ) { |
| 382 | datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); | 173 | ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); |
| 383 | 174 | ||
| 384 | /* Expect at least tracker id and packet type */ | 175 | /* Expect at least tracker id and packet type */ |
| 385 | if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) | 176 | if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) |
| 386 | continue; | 177 | continue; |
| 387 | if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) | 178 | if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) |
| 388 | continue; | 179 | continue; |
| 389 | if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { | 180 | if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) { |
| 390 | /* TODO: log packet coming from ourselves */ | 181 | /* TODO: log packet coming from ourselves */ |
| 391 | continue; | 182 | continue; |
| 392 | } | 183 | } |
| 393 | 184 | ||
| 394 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { | 185 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { |
| 395 | case OT_SYNC_PEER: | 186 | case OT_SYNC_PEER: |
| 396 | livesync_handle_peersync( datalen ); | 187 | livesync_handle_peersync( &ws ); |
| 397 | break; | ||
| 398 | #ifdef WANT_SYNC_SCRAPE | ||
| 399 | case OT_SYNC_SCRAPE_BEACON: | ||
| 400 | livesync_handle_beacon( datalen ); | ||
| 401 | break; | ||
| 402 | case OT_SYNC_SCRAPE_INQUIRE: | ||
| 403 | livesync_handle_inquire( datalen ); | ||
| 404 | break; | ||
| 405 | case OT_SYNC_SCRAPE_TELL: | ||
| 406 | livesync_handle_tell( datalen ); | ||
| 407 | break; | 188 | break; |
| 408 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 409 | default: | 189 | default: |
| 410 | break; | 190 | break; |
| 411 | } | 191 | } |
