redis python 连接redis

发布于:2024-07-24 ⋅ 阅读:(46) ⋅ 点赞:(0)

redis

介绍

  • redis 数据库是非关系型数据库
  • 将数据以键值对的形式保存到内存
  • 值有很多数据类型
    • string 字符串
    • list 链表
    • set 集合
    • zset 有序集合
    • hash 哈希
  • 与 memcached 比较
    • memcached 仅支持字符串类型的shuju
    • memcached 不支持持久化
  • 使用 redis 的好处
    • 速度快,因为数据存在内存中,类似于HashMap,
      • HashMap的优势就是查找和操作的时间复杂度都是O(1)
    • 支持丰富数据类型,支持string,list,set,sorted set,hash
    • 支持事务,操作都是原子性,
      • 对数据的更改要么全部执行,要么全部不执行
    • 丰富的特性:可用于缓存,消息,按key设置过期时间,过期后将会自动删除
    • 单线程不存在并发访问的问题(新版已经支持多线程了)
      • io 多路复用 没有线程之间的切换
  • redis 适合的场景
    • 最常见就是作为数据库的缓存
    • 排行榜 记录网站访问量、文章访问量 缓存数据库 发布订阅 去重 分布式等

安装

  1. 去官网下载压缩包,并解压到 /usr/local/src/

  2. 进行编译安装

    cd /usr/local/src/redis-stable
    make
    make PREFIX=/usr/local/redis/ install
    # 关于 PREFIX
    1. 添加此项,安装到指定目录
    2. 默认会将执行文件放到 /usr/local/bin/
    3. 将配置文件放到 /usr/local/etc/
    4. 其他的资源放到 /usr/local/share/
    
  3. 修改配置文件

    # 主要配置
    # bind 127.0.0.1 ::1 默认配置,如果想要远程连接需要
    bind 0.0.0.0
    # protected-mode yes 保护模式开启,远程连接
    protected-mod no
    # requirepass # 默认不需要密码,可以开启设置密码 123
    requirepass 123
    # daemonize no # 是否开启后台运行
    daemonize yes
    
  4. 启动服务

    cd /usr/local/redis/bin/
    ./redis-server redis.conf  # 安装配置文件进行启动
    ./redis-cli  # 进入客户端
    ./redis-cli -a 'password' -h 'ipaddr' -p 'port'  #远程连接
    ./redis-cli shutdown # 关闭服务
    

python 使用 redis

简单使用

from redis import Redis
conn = Redis(host='192.168.222.5',port=6379,db=0,decode_responses=True)

conn.set(name='age',value=18)
result = conn.get(name='age')
print(result)

创建连接池连接

  • 连接池必须是单例的!
# redis_pool.py
import redis
POOL = redis.ConnectionPool(host='192.168.222.5',port=6379,db=0,decode_responses=True,max_connections=100)
# redis_test.py
from t_redis import POOL
import redis

r = redis.Redis(connection_pool=POOL)
result = r.get(name='name')
print(result)

操作各种类型的数据

字符串类型

set 的用法

####1 set的用法
conn.set('height',180) #基本使用

conn.set('height','190',nx=True)

'''
ex,过期时间(秒)
     px,过期时间(毫秒)
nx,如果设置为True,则只有name不存在时,当前set操作才执行,值存在,就修改不了,执行没效果
xx,如果设置为True,则只有name存在时,当前set操作才执行,值存在才能修改,值不存在,不会设置新值
'''

批量设置键值对 mset

r.mset({'key1':'value1', 'key2':'value2'})
# 对应还有 mget

获取旧值放入新值 getset

before_vaule = r.getset('key','new_vaule')

统计网站访问量 incr

r.set('count',0)
r.incr('count')  # 只要一执行,数字加1
r.incr('count',2) # 执行一次数字加二
# 相应的有 decr

字符串拼接 append

r.append('name','Fun')

hash 操作

hset 和 hget

r.hset('dict_name','key','value') # key 不能重复,重复会覆盖
result = r.get('dict_name','key')

hmset hmget

r.hmset('dict_name',{'key1':'value1', 'key2':'value2'})
result = r.hmget('dict_name',['key1', 'key2'])

字典长度 hlen

result = r.hlen('dict_name')

是否有键

result = r.hexists('dict_name','key')

删除键

result r.hdel('dict_name','key')

自增 hincrby

r.hincrby('dict_name','key')

取出部分和全部 hscan_iter hgetall

result = conn.hscan_iter('dict_name')
print(result)
for i in ret:
	print(i)
result = conn.hgetall('dict_name')

链表操作

从左放多个 lpush 从右放 rpush

result = conn.lpush('list_name',1,2,3,4,5)

有指定列表才放

result = r.rpushx('list',10)

从左边去除 lpop 从右边去除 rpop

result = r.lpop('list_name')

长度 len

result = r.llen('list_name')

指定位置插入

# 从零开始数,在三个之前插入
r.linsert('list_name','before','3','77777777')
r.linsert('list_name','after','3','66666666')

指定范围取

r.lindex('list_name',0)  # 去一个
r.lrange('list_name',0,2)  # 前闭后闭

blpop

  • 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
result = r.blpop('list_name',timeout=10)

操作"表"

删除

r.delete('list_name')

存在

r.exists('list_name')

超时

r.expire('list_name',2)

类型

r.type('list_name')

管道

  • 实现事务
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379)

conn = redis.Redis(connection_pool=pool)
# pipe = r.pipeline(transaction=False)
pipe = conn.pipeline(transaction=True)
pipe.multi()
pipe.set('name', 'alex')
pipe.set('role', 'sb')
pipe.execute()  # 这句话,才真正的去执行

django 中使用 redis

  • pip install django-redis
# 在 settings 中配置 redis
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://192.168.222.5:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}
# 使用
# 1 使用cache
from django.core.cache import cache
cache.set('name',user) 

# 直接使用conn对象
from django_redis import get_redis_connection
conn = get_redis_connection('default')
print(conn.hgetall('xxx'))

结合 celery 异步框架

celery 简介

  • celery 是一个异步框架,可以执行异步任务,延迟任务、定时任务等
  • celery 框架分为三部分:
    • broker 任务对列,用于存放任务,可用 redis
    • worker 调度 broker 中的任务,让 python 执行
    • backen 保存任务的状态信息和结果 可用 redis

安装使用 celery

pip install celery
pip install eventlet  # 在 windows 上运行需要该模块

基本结构

#1  只写一个py文件,内容如下celery_task.py:
from celery import Celery
broker='redis://127.0.0.1:6379/1'  #broker任务队列
backend='redis://127.0.0.1:6379/2'   # 结构存储,执行完的结果存在这
app=Celery(__name__,broker=broker,backend=backend)
#添加任务(使用这个装饰器装饰,@app.task)
@app.task
def add(x,y):
    print(x,y)
    return x+y
# 2启动worker
        # 用命令来执行
        # 非windows
        # 命令:celery worker -A celery_task -l info
        # windows:
        # pip3 install eventlet
        # celery worker -A celery_task -l info -P eventlet
        
# 3 添加任务
	from celery_task import add
    # add(3,4)  # 直接执行,不会被添加到broker中
    ret=add.delay(5,4)  #想broker中添加一个任务
    print(ret)
# 4 查看任务执行结果
	from celery_task import app
    from celery.result import AsyncResult
    id = '3e397fd7-e0c1-4c5c-999c-2655a96793bb'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')

包结构

#1 新建一个包,叫celery_task
    -celery_task
        -__init__.py
        -celery.py
        -task1.py
        -task2.py
# 2 celery.py
    from celery import Celery
    broker='redis://127.0.0.1:6379/1'  #broker任务队列
    backend='redis://127.0.0.1:6379/2'# 结构存储,执行完的结果存在这
    app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task1','celery_task.task2'])
    
# 3 task1.py
    from .celery import app
    @app.task
    def add(x,y):
        print(x,y)
        return x+y
# 4 task2.py
	from .celery import app
    @app.task
    def mutile(x,y):
        print(x,y)
        return x*y
# 5 添加任务(异步任务,延迟任务)
    from celery_task.task1 import add
    from celery_task.task2 import mutile
    #  提交异步
    ret=add.delay(6,7)
    print(ret)  # 2d4ad592-9548-4c7c-8df4-7f8583e8a1b1
    
    # 提交延迟任务
    from datetime import datetime, timedelta
    # 需要utc时间
    eta=datetime.utcnow() + timedelta(seconds=10)
    ret=add.apply_async(args=(240, 50), eta=eta)
    print(ret)

# 6获取结果同上
# 执行定时任务
#1 celery.py

    from celery import Celery
    broker='redis://127.0.0.1:6379/1'  #broker任务队列
    backend='redis://127.0.0.1:6379/2'   # 结构存储,执行完的结果存在这
    app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task1','celery_task.task2'])
    # 执行定时任务
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'add-task': {
            'task': 'celery_task.task1.add',
            # 'schedule': timedelta(seconds=3),
            'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'args': (300, 150),
        }
    }
    
# 2 启动worker,启动beat
	-celery worker -A celery_task -l info -P eventlet
	-celery beat -A celery_task -l info

启动时注意两个命令

celery worker -A celery_task -l info -P eventlet
celery beat -A celery_task -l info

实例:轮播图的延时上传

# E:\development\pythonProject\luffyapi\celery_task
# celery.py
from celery import Celery

# 加载django环境
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
django.setup()

broker='redis://192.168.222.5:6379/1'  #broker任务队列
backend='redis://192.168.222.5:6379/2'   # 结构存储,执行完的结果存在这

app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.home_task',])


# 执行定时任务
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'add-task': {
        'task': 'celery_task.home_task.banner_update',
        'schedule': timedelta(seconds=30),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        # 'args': (300, 150),
    }
}

# 一定要启动beat
# celery beat -A celery_task -l info
# home_task.py
from .celery import app

# cache
# model,serilizer

@app.task
def banner_update():
    # 放在里面防止循环导入
    from home import ser
    from home import models
    from django.conf import settings
    from django.core.cache import cache
    queryset_banner = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('display_order')[
               :settings.BANNER_COUNTER]
    serializer_banner=ser.BannerModelSerializer(instance=queryset_banner,many=True)
    print(serializer_banner.data)
    for banner in serializer_banner.data:
        banner['img']='http://127.0.0.1:8000'+banner['img']
    cache.set('banner_list',serializer_banner.data)
    # # import time
    # # time.sleep(1)
    # banner_list=cache.get('banner_list')
    # print(banner_list)
    return True