使用 Redis 作为任务队列

更新日期: 2016-04-15 阅读次数: 29706 分类: Redis

需求

最近在写爬虫,用来爬取美区亚马逊的商品信息。为了方便地做水平扩展,需要用到任务队列。

需求其实很简单,就是将 MySQL 中未处理的商品 ID, 推送到任务队列中。再由 N 个 worker 拉取任务,进行处理。

对任务队列的要求是

  • 定长。防止任务队列内存占用过多,毕竟使用的是最便宜的 AWS 1G 内存的主机。
  • 操作原子性。即同一个任务不能被两个以上的 worker 拉取到。
  • 简单易用

RQ (Redis Queue)

由于之前使用 Redis 作为队列的代码没有做封装,剥离出来比较费事。于是 Google 了一下是否有现成的实现,还真有! RQ

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily.

扫了一遍文档,挺全,可以放心地测试了。

安装 Redis

Redis Download 下载最新版的 Redis。

不知不觉已经到了 3.0.7 版本。

解压,按照 README 中的说明

make

即可。创建两个软链

sudo ln -s ~/redis-3.0.7/src/redis-server /usr/bin/redis-server
sudo ln -s ~/redis-3.0.7/src/redis-cli /usr/bin/redis-cli

将配置文件 redis.conf 拷贝到项目目录下,进行修改。

daemonize yes
requirepass password

启动 redis

redis-server redis.conf

安装 RQ

sudo pip install rq

填充任务

新建一个 rq 配置文件 rq_settings.py

REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = 'password'

# Queues to listen on
QUEUES = ['high', 'normal', 'low']

新建一个入列的程序 enqueue_task.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import rq_settings
from rq import Queue
from redis import Redis
from utils import count_words_at_url

conn = Redis(host=rq_settings.REDIS_HOST, port=rq_settings.REDIS_PORT,
             password=rq_settings.REDIS_PASSWORD)
q = Queue("normal", connection=conn)

job = q.enqueue(count_words_at_url, 'http://sunzhongwei.com')

print job.result   # => None
print "length of task queue: %s" % len(q)

# Now, wait a while, until the worker is finished
time.sleep(5)
print job.result
print "length of task queue: %s" % len(q)

启动 worker

rq worker -c rq_settings

解决 Amazon Product Advertising API 请求频率限制

概况一下是,单 IP 每秒一次请求,单账号每秒一次请求。

官方描述

The Product Advertising API is pleased to announce an improvement with how request limits are calculated. To summarize, we are replacing the hourly-limit with a per-second-limit. We believe this is a better experience for associates and their end users. This change will take place on Sept 3rd, 2012.

If your application is trying to submit requests that exceed the maximum request limit for your account, you may receive error messages from Product Advertising API. The request limit for each account is calculated based on revenue performance. Each account used to access the Product Advertising API is allowed an initial usage limit of 1 request per second. Each account will receive an additional 1 request per second (up to a maximum of 10 requests per second) for every $4,600 of shipped item revenue driven per hour in a trailing 30-day period.

Formula is: Calls per second limit = 1 + round ([last 30 days shipped Sales])/$4,600).

超出限制会报 HTTP Error 503: Service Unavailable.

解决思路其实很简单

  1. 注册大量的 Amazon 账号
  2. 每个 worker 使用一个 Amazon 账号。先用两台 500M 内存的最廉价 AWS 同时跑,即先开两个 worker。
  3. 单机性能提升。维护一个共享的代理 IP 池,每个 worker 使用一个代理。但是,保证每台机器有一个 worker 不用代理。

问题来了,如何保证每个 worker 使用独立的 Amazon 账号呢?

我想到的一个思路是:

  1. 建立一个数据表用来存储 Amazon 账号对应的 Access Key 等信息。
  2. 用服务器编号及进程 ID 来标识一个 Amazon 账号正在被哪个 worker 使用。
  3. 每个 worker 启动时,获取一下本机正在运行的所有 worker 的进程 id, 去数据表里更新状态。不存在的进程 id reset 状态,然后获取一个 Amazon 账号,并更新状态。

RQ 的各种坑

获取 woker 标识的最简单办法就是调用 rq 的 get_current_job, 然后第一个坑出现了。。。

NoRedisConnectionException

第二个坑紧接着出现,每个 task 处理都要重新初始化一遍处理函数所在的 model, 日,也就是说每个 task 都要访问一遍账号表。

这个简单的队列,还是手写吧,去你妹的 RQ

关于作者 🌱

我是来自山东烟台的一名开发者,有敢兴趣的话题,或者软件开发需求,欢迎加微信 zhongwei 聊聊, 查看更多联系方式