model
from django.db import models
class UserProfile(models.Model):
"""
用户信息
"""
name = models.CharField(u'姓名', max_length=32)
email = models.EmailField(u'邮箱')
phone = models.CharField(u'座机', max_length=32)
mobile = models.CharField(u'手机', max_length=32)
class Meta:
verbose_name_plural = "用户表"
def __str__(self):
return self.name
class AdminInfo(models.Model):
"""
用户登陆相关信息
"""
user_info = models.OneToOneField("UserProfile")
username = models.CharField(u'用户名', max_length=64)
password = models.CharField(u'密码', max_length=64)
class Meta:
verbose_name_plural = "管理员表"
def __str__(self):
return self.user_info.name
class UserGroup(models.Model):
"""
用户组
"""
name = models.CharField(max_length=32, unique=True)
users = models.ManyToManyField('UserProfile')
class Meta:
verbose_name_plural = "用户组表"
def __str__(self):
return self.name
class BusinessUnit(models.Model):
"""
业务线
"""
name = models.CharField('业务线', max_length=64, unique=True)
contact = models.ForeignKey('UserGroup', verbose_name='业务联系人', related_name='c') # 多个人
manager = models.ForeignKey('UserGroup', verbose_name='系统管理员', related_name='m') # 多个人
class Meta:
verbose_name_plural = "业务线表"
def __str__(self):
return self.name
class IDC(models.Model):
"""
机房信息
"""
name = models.CharField('机房', max_length=32)
floor = models.IntegerField('楼层', default=1)
class Meta:
verbose_name_plural = "机房表"
def __str__(self):
return self.name
class Tag(models.Model):
"""
资产标签
"""
name = models.CharField('标签', max_length=32, unique=True)
class Meta:
verbose_name_plural = "标签表"
def __str__(self):
return self.name
class Asset(models.Model):
"""
资产信息表,所有资产公共信息(交换机,服务器,防火墙等)
"""
device_type_choices = (
(1, '服务器'),
(2, '交换机'),
(3, '防火墙'),
)
device_status_choices = (
(1, '上架'),
(2, '在线'),
(3, '离线'),
(4, '下架'),
)
device_type_id = models.IntegerField(choices=device_type_choices, default=1)
device_status_id = models.IntegerField(choices=device_status_choices, default=1)
cabinet_num = models.CharField('机柜号', max_length=30, null=True, blank=True)
cabinet_order = models.CharField('机柜中序号', max_length=30, null=True, blank=True)
idc = models.ForeignKey('IDC', verbose_name='IDC机房', null=True, blank=True)
business_unit = models.ForeignKey('BusinessUnit', verbose_name='属于的业务线', null=True, blank=True)
tag = models.ManyToManyField('Tag')
latest_date = models.DateField(null=True)
create_at = models.DateTimeField(auto_now_add=True)
class Meta:
verbose_name_plural = "资产表"
def __str__(self):
return "%s-%s-%s" % (self.idc.name, self.cabinet_num, self.cabinet_order)
class Server(models.Model):
"""
服务器信息
"""
asset = models.OneToOneField('Asset')
hostname = models.CharField(max_length=128, unique=True)
sn = models.CharField('SN号', max_length=64, db_index=True)
manufacturer = models.CharField(verbose_name='制造商', max_length=64, null=True, blank=True)
model = models.CharField('型号', max_length=64, null=True, blank=True)
manage_ip = models.GenericIPAddressField('管理IP', null=True, blank=True)
os_platform = models.CharField('系统', max_length=16, null=True, blank=True)
os_version = models.CharField('系统版本', max_length=16, null=True, blank=True)
cpu_count = models.IntegerField('CPU个数', null=True, blank=True)
cpu_physical_count = models.IntegerField('CPU物理个数', null=True, blank=True)
cpu_model = models.CharField('CPU型号', max_length=128, null=True, blank=True)
create_at = models.DateTimeField(auto_now_add=True, blank=True)
class Meta:
verbose_name_plural = "服务器表"
def __str__(self):
return self.hostname
class NetworkDevice(models.Model):
asset = models.OneToOneField('Asset')
management_ip = models.CharField('管理IP', max_length=64, blank=True, null=True)
vlan_ip = models.CharField('VlanIP', max_length=64, blank=True, null=True)
intranet_ip = models.CharField('内网IP', max_length=128, blank=True, null=True)
sn = models.CharField('SN号', max_length=64, unique=True)
manufacture = models.CharField(verbose_name=u'制造商', max_length=128, null=True, blank=True)
model = models.CharField('型号', max_length=128, null=True, blank=True)
port_num = models.SmallIntegerField('端口个数', null=True, blank=True)
device_detail = models.CharField('设置详细配置', max_length=255, null=True, blank=True)
class Meta:
verbose_name_plural = "网络设备"
class Disk(models.Model):
"""
硬盘信息
"""
slot = models.CharField('插槽位', max_length=8)
model = models.CharField('磁盘型号', max_length=32)
capacity = models.FloatField('磁盘容量GB')
pd_type = models.CharField('磁盘类型', max_length=32)
server_obj = models.ForeignKey('Server',related_name='disk')
class Meta:
verbose_name_plural = "硬盘表"
def __str__(self):
return self.slot
class NIC(models.Model):
"""
网卡信息
"""
name = models.CharField('网卡名称', max_length=128)
hwaddr = models.CharField('网卡mac地址', max_length=64)
netmask = models.CharField(max_length=64)
ipaddrs = models.CharField('ip地址', max_length=256)
up = models.BooleanField(default=False)
server_obj = models.ForeignKey('Server',related_name='nic')
class Meta:
verbose_name_plural = "网卡表"
def __str__(self):
return self.name
class Memory(models.Model):
"""
内存信息
"""
slot = models.CharField('插槽位', max_length=32)
manufacturer = models.CharField('制造商', max_length=32, null=True, blank=True)
model = models.CharField('型号', max_length=64)
capacity = models.FloatField('容量', null=True, blank=True)
sn = models.CharField('内存SN号', max_length=64, null=True, blank=True)
speed = models.CharField('速度', max_length=16, null=True, blank=True)
server_obj = models.ForeignKey('Server',related_name='memory')
class Meta:
verbose_name_plural = "内存表"
def __str__(self):
return self.slot
class AssetRecord(models.Model):
"""
资产变更记录,creator为空时,表示是资产汇报的数据。
"""
asset_obj = models.ForeignKey('Asset', related_name='ar')
content = models.TextField(null=True)
creator = models.ForeignKey('UserProfile', null=True, blank=True)
create_at = models.DateTimeField(auto_now_add=True)
class Meta:
verbose_name_plural = "资产记录表"
def __str__(self):
return "%s-%s-%s" % (self.asset_obj.idc.name, self.asset_obj.cabinet_num, self.asset_obj.cabinet_order)
class ErrorLog(models.Model):
"""
错误日志,如:agent采集数据错误 或 运行错误
"""
asset_obj = models.ForeignKey('Asset', null=True, blank=True)
title = models.CharField(max_length=16)
content = models.TextField()
create_at = models.DateTimeField(auto_now_add=True)
class Meta:
verbose_name_plural = "错误日志表"
def __str__(self):
return self.title
asset.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import traceback
import datetime
from utils.response import BaseResponse
from utils import agorithm
from repository import models
from django.db.models import Q
import datetime
def get_untreated_servers():
response = BaseResponse()
try:
current_date = datetime.date.today()
condition = Q()
# 今日未采集的资产
con_date = Q()
con_date.connector = 'OR'
con_date.children.append(("asset__latest_date__lt", current_date))
con_date.children.append(("asset__latest_date", None))
# 在线状态的服务器
con_status = Q()
con_status.children.append(('asset__device_status_id', '2'))
condition.add(con_date, 'AND')
condition.add(con_status, 'AND')
result = models.Server.objects.filter(condition).values('hostname')
response.data = list(result)
response.status = True
except Exception as e:
response.message = str(e)
models.ErrorLog.objects.create(asset_obj=None, title='get_untreated_servers', content=traceback.format_exc())
return response
# ############# 操作基本信息(cpu和主板) #############
# 操作基本,并记录操作日志
# 更新cpu和主板信息
class HandleBasic(object):
# 处理基本信息,包括主板和CPU信息
@staticmethod
def process(server_obj, server_info, user_obj):
response = BaseResponse()
try:
log_list = []
main_board = server_info['main_board']['data']
cpu = server_info['cpu']['data']
if server_obj.os_platform != server_info['os_platform']:
log_list.append('系统由%s变更为%s' % (server_obj.os_platform, server_info['os_platform'],))
server_obj.os_platform = server_info['os_platform']
if server_obj.os_version != server_info['os_version']:
log_list.append(u'系统版本由%s变更为%s' % (server_obj.os_version, server_info['os_version'],))
server_obj.os_version = server_info['os_version']
if server_obj.sn != main_board['sn']:
log_list.append(u'主板SN号由%s变更为%s' % (server_obj.sn, main_board['sn'],))
server_obj.sn = main_board['sn']
if server_obj.manufacturer != main_board['manufacturer']:
log_list.append(u'主板厂商由%s变更为%s' % (server_obj.manufacturer, main_board['manufacturer'],))
server_obj.manufacturer = main_board['manufacturer']
if server_obj.model != main_board['model']:
log_list.append(u'主板型号由%s变更为%s' % (server_obj.model, main_board['model'],))
server_obj.model = main_board['model']
if server_obj.cpu_count != cpu['cpu_count']:
log_list.append(u'CPU逻辑核数由%s变更为%s' % (server_obj.cpu_count, cpu['cpu_count'],))
server_obj.cpu_count = cpu['cpu_count']
if server_obj.cpu_physical_count != cpu['cpu_physical_count']:
log_list.append(
u'CPU物理核数由%s变更为%s' % (server_obj.cpu_physical_count, cpu['cpu_physical_count'],))
server_obj.cpu_physical_count = cpu['cpu_physical_count']
if server_obj.cpu_model != cpu['cpu_model']:
log_list.append(u'CPU型号由%s变更为%s' % (server_obj.cpu_model, cpu['cpu_model'],))
server_obj.cpu_model = cpu['cpu_model']
server_obj.save()
if log_list:
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj,
content=';'.join(log_list))
except Exception as e:
response.status = False
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='basic-run',
content=traceback.format_exc())
return response
@staticmethod
def update_last_time(server_obj, user_obj):
response = BaseResponse()
try:
current_date = datetime.date.today()
server_obj.asset.latest_date = current_date
server_obj.asset.save()
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj, content='资产汇报')
except Exception as e:
response.status = False
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='basic-run',
content=traceback.format_exc())
return response
# ############# 操作网卡信息 #############
# 操作网卡,并记录操作日志
# 添加网卡
# 删除网卡
# 更新网卡信息
class HandleNic(object):
@staticmethod
def process(server_obj, server_info, user_obj):
response = BaseResponse()
try:
# 获取数据库中的所有网卡信息
# server_info,服务器最新汇报的数据 server_info['nic']
nic_info = server_info['nic']
if not nic_info['status']:
response.status = False
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='nic-agent', content=nic_info['error'])
return response
client_nic_dict = nic_info['data']
nic_obj_list = models.NIC.objects.filter(server_obj=server_obj)
nic_name_list = map(lambda x: x, (item.name for item in nic_obj_list))
update_list = agorithm.get_intersection(set(client_nic_dict.keys()), set(nic_name_list))
add_list = agorithm.get_exclude(client_nic_dict.keys(), update_list)
del_list = agorithm.get_exclude(nic_name_list, update_list)
# ==> 要删除、更新,添加
HandleNic._add_nic(add_list, client_nic_dict, server_obj, user_obj)
HandleNic._update_nic(update_list, nic_obj_list, client_nic_dict, server_obj, user_obj)
HandleNic._del_nic(del_list, nic_obj_list, server_obj, user_obj)
except Exception as e:
response.status = False
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='nic-run', content=traceback.format_exc())
return response
@staticmethod
def _add_nic(add_list, client_nic_dict, server_obj, user_obj):
for item in add_list:
cur_nic_dict = client_nic_dict[item]
cur_nic_dict['name'] = item
log_str = '[新增网卡]{name}:mac地址为{hwaddr};状态为{up};掩码为{netmask};IP地址为{ipaddrs}'.format(**cur_nic_dict)
cur_nic_dict['server_obj'] = server_obj
models.NIC.objects.create(**cur_nic_dict)
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj, content=log_str)
@staticmethod
def _del_nic(del_list, nic_objs, server_obj, user_obj):
for item in nic_objs:
if item.name in del_list:
log_str = '[移除网卡]{name}:mac地址为{hwaddr};状态为{up};掩码为{netmask};IP地址为{ipaddrs}'.format(**item.__dict__)
item.delete()
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj, content=log_str)
@staticmethod
def _update_nic(update_list, nic_objs, client_nic_dict, server_obj, user_obj):
for item in nic_objs:
if item.name in update_list:
log_list = []
new_hwaddr = client_nic_dict[item.name]['hwaddr']
if item.hwaddr != new_hwaddr:
log_list.append(u"[更新网卡]%s:mac地址由%s变更为%s" % (item.name, item.hwaddr, new_hwaddr))
item.hwaddr = new_hwaddr
new_up = client_nic_dict[item.name]['up']
if item.up != new_up:
log_list.append(u"[更新网卡]%s:状态由%s变更为%s" % (item.name, item.up, new_up))
item.up = new_up
new_netmask = client_nic_dict[item.name]['netmask']
if item.netmask != new_netmask:
log_list.append(u"[更新网卡]%s:掩码由%s变更为%s" % (item.name, item.netmask, new_netmask))
item.netmask = new_netmask
new_ipaddrs = client_nic_dict[item.name]['ipaddrs']
if item.ipaddrs != new_ipaddrs:
log_list.append(u"[更新网卡]%s:IP地址由%s变更为%s" % (item.name, item.ipaddrs, new_ipaddrs))
item.ipaddrs = new_ipaddrs
item.save()
if log_list:
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj,
content=';'.join(log_list))
# ############# 操作内存信息 #############
# 操作内存,并记录操作日志
# 添加内存
# 删除内存
# 更新内存信息
class HandleMemory(object):
@staticmethod
def process(server_obj, server_info, user_obj):
response = BaseResponse()
try:
mem_info = server_info['memory']
if not mem_info['status']:
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='memory-agent',
content=mem_info['error'])
response.status = False
return response
client_mem_dict = mem_info['data']
mem_obj_list = models.Memory.objects.filter(server_obj=server_obj)
mem_slots = map(lambda x: x, (item.slot for item in mem_obj_list))
update_list = agorithm.get_intersection(set(client_mem_dict.keys()), set(mem_slots))
add_list = agorithm.get_exclude(client_mem_dict.keys(), update_list)
del_list = agorithm.get_exclude(mem_slots, update_list)
HandleMemory._add_memory(add_list, client_mem_dict, server_obj, user_obj)
HandleMemory._update_memory(update_list, mem_obj_list, client_mem_dict, server_obj, user_obj)
HandleMemory._del_memory(del_list, mem_obj_list, server_obj, user_obj)
except Exception as e:
response.status = False
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='memory-run',
content=traceback.format_exc())
return response
@staticmethod
def _add_memory(add_list, client_mem_dict, server_obj, user_obj):
for item in add_list:
cur_mem_dict = client_mem_dict[item]
log_str = '[新增内存]插槽为{slot};容量为{capacity};类型为{model};速度为{speed};厂商为{manufacturer};SN号为{sn}'.format(
**cur_mem_dict)
cur_mem_dict['server_obj'] = server_obj
models.Memory.objects.create(**cur_mem_dict)
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj, content=log_str)
@staticmethod
def _del_memory(del_list, mem_objs, server_obj, user_obj):
for item in mem_objs:
if item.slot in del_list:
log_str = '[移除内存]插槽为{slot};容量为{capacity};类型为{model};速度为{speed};厂商为{manufacturer};SN号为{sn}'.format(
**item.__dict__)
item.delete()
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj, content=log_str)
@staticmethod
def _update_memory(update_list, mem_objs, client_mem_dict, server_obj, user_obj):
for item in mem_objs:
if item.slot in update_list:
log_list = []
new_manufacturer = client_mem_dict[item.slot]['manufacturer']
if item.manufacturer != new_manufacturer:
log_list.append(u"[更新内存]%s:厂商由%s变更为%s" % (item.slot, item.manufacturer, new_manufacturer))
item.manufacturer = new_manufacturer
new_model = client_mem_dict[item.slot]['model']
if item.model != new_model:
log_list.append(u"[更新内存]%s:型号由%s变更为%s" % (item.slot, item.model, new_model))
item.model = new_model
new_capacity = client_mem_dict[item.slot]['capacity']
if item.capacity != new_capacity:
log_list.append(u"[更新内存]%s:容量由%s变更为%s" % (item.slot, item.capacity, new_capacity))
item.capacity = new_capacity
new_sn = client_mem_dict[item.slot]['sn']
if item.sn != new_sn:
log_list.append(u"[更新内存]%s:SN号由%s变更为%s" % (item.slot, item.sn, new_sn))
item.sn = new_sn
new_speed = client_mem_dict[item.slot]['speed']
if item.speed != new_speed:
log_list.append(u"[更新内存]%s:速度由%s变更为%s" % (item.slot, item.speed, new_speed))
item.speed = new_speed
item.save()
if log_list:
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj,
content=';'.join(log_list))
# ############# 操作硬盘信息 #############
# 操作硬盘,并记录操作日志
# 添加硬盘
# 删除硬盘
# 更新硬盘信息
class HandleDisk(object):
@staticmethod
def process(server_obj, server_info, user_obj):
response = BaseResponse()
try:
disk_info = server_info['disk']
if not disk_info['status']:
response.status = False
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='disk-agent',
content=disk_info['error'])
return response
client_disk_dict = disk_info['data']
disk_obj_list = models.Disk.objects.filter(server_obj=server_obj)
disk_slots = map(lambda x: x, (item.slot for item in disk_obj_list))
update_list = agorithm.get_intersection(set(client_disk_dict.keys()), set(disk_slots))
add_list = agorithm.get_exclude(client_disk_dict.keys(), update_list)
del_list = agorithm.get_exclude(disk_slots, update_list)
HandleDisk._add_disk(add_list, client_disk_dict, server_obj, user_obj)
HandleDisk._update_disk(update_list, disk_obj_list, client_disk_dict, server_obj, user_obj)
HandleDisk._del_disk(del_list, disk_obj_list, server_obj, user_obj)
except Exception as e:
response.status = False
models.ErrorLog.objects.create(asset_obj=server_obj.asset, title='disk-run', content=traceback.format_exc())
return response
@staticmethod
def _add_disk(add_list, client_disk_dict, server_obj, user_obj):
for item in add_list:
cur_disk_dict = client_disk_dict[item]
log_str = '[新增硬盘]插槽为{slot};容量为{capacity};硬盘类型为{pd_type};型号为{model}'.format(**cur_disk_dict)
cur_disk_dict['server_obj'] = server_obj
models.Disk.objects.create(**cur_disk_dict)
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj, content=log_str)
@staticmethod
def _del_disk(del_list, disk_objs, server_obj, user_obj):
for item in disk_objs:
if item.slot in del_list:
log_str = '[移除硬盘]插槽为{slot};容量为{capacity};硬盘类型为{pd_type};型号为{model}'.format(**item.__dict__)
item.delete()
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj, content=log_str)
@staticmethod
def _update_disk(update_list, disk_objs, client_disk_dict, server_obj, user_obj):
for item in disk_objs:
if item.slot in update_list:
log_list = []
new_model = client_disk_dict[item.slot]['model']
if item.model != new_model:
log_list.append(u"[更新硬盘]插槽为%s:型号由%s变更为%s" % (item.slot, item.model, new_model))
item.model = new_model
new_capacity = client_disk_dict[item.slot]['capacity']
new_capacity = float(new_capacity)
if item.capacity != new_capacity:
log_list.append(u"[更新硬盘]插槽为%s:容量由%s变更为%s" % (item.slot, item.capacity, new_capacity))
item.capacity = new_capacity
new_pd_type = client_disk_dict[item.slot]['pd_type']
if item.pd_type != new_pd_type:
log_list.append(u"[更新硬盘]插槽为%s:硬盘类型由%s变更为%s" % (item.slot, item.pd_type, new_pd_type))
item.pd_type = new_pd_type
item.save()
if log_list:
models.AssetRecord.objects.create(asset_obj=server_obj.asset, creator=user_obj,
content=';'.join(log_list))
参考:AutoCmdb
import time
from functools import lru_cache
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.conf import settings
from rest_framework import serializers
from utils import fields as c_fields
class Permission(models.Model):
name = models.CharField(max_length=100, unique=True, verbose_name="权限名")
alias = models.CharField(max_length=100, verbose_name="权限别名")
def __str__(self):
return self.alias
class Meta:
verbose_name = "权限"
verbose_name_plural = "权限"
class Department(models.Model):
name = models.CharField(max_length=20, verbose_name="部门名")
level = models.SmallIntegerField(verbose_name="级别")
parent = models.ForeignKey("self", on_delete=models.CASCADE, null=True, blank=True, verbose_name="上级部门")
permissions = models.ManyToManyField(Permission, blank=True, related_name="departments", verbose_name="所有权限")
create_time = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
update_time = models.DateTimeField(auto_now=True, verbose_name="更新时间")
def __str__(self):
return self.name
class Meta:
verbose_name = "部门"
verbose_name_plural = verbose_name
unique_together = ("name", "parent")
class User(AbstractUser):
name = models.CharField(max_length=10, verbose_name="姓名")
position = models.CharField(blank=True, max_length=20, verbose_name="职位")
departments = models.ManyToManyField(Department, related_name="users", blank=True, verbose_name="部门集")
permissions = models.ManyToManyField(Permission, related_name="users", blank=True, verbose_name="权限集")
def __str__(self):
return self.name
class Meta:
verbose_name = "用户"
verbose_name_plural = verbose_name
def get_all_permissions_no_cache(self):
perms = [p.name for p in self.permissions.all()]
perms = set(perms)
for d in self.departments.all():
for p in d.permissions.all():
perms.add(p.name)
return list(perms)
@lru_cache(maxsize=64)
def _get_all_permissions_by_cache(self, refresh_mark_number):
print(f"{self.name} get_all_permissions_by_cache")
return self.get_all_permissions_no_cache()
# def get_all_permissions(self):
# if settings.PERMISSION_CACHE_TIME == 0:
# return self.get_all_permissions_no_cache()
# return self._get_all_permissions_by_cache(time.time()//settings.PERMISSION_CACHE_TIME)
def get_all_permission_names(self):
if settings.PERMISSION_CACHE_TIME == 0:
return self.get_all_permissions_no_cache()
return self._get_all_permissions_by_cache(time.time()//settings.PERMISSION_CACHE_TIME)
class Table(models.Model):
"""
cmdb表
"""
name = models.CharField(primary_key=True, max_length=20, verbose_name="表名")
alias = models.CharField(max_length=20, unique=True, null=True, verbose_name="别名")
readme = models.TextField(blank=True, default="", verbose_name="自述")
creator = models.ForeignKey(User, on_delete=models.PROTECT, verbose_name="创建者")
creation_time = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
def __str__(self):
return self.name
class Meta:
verbose_name = "表"
verbose_name_plural = verbose_name
class Field(models.Model):
"""
cmdb字段
"""
FIELD_TYPE_CHOICES = ((0, "string"),
(1, "integer"),
(2, "floating"),
(3, "datetime"),
(4, "date"),
(5, "boolean"),
(6, "Ip"))
FIELD_TYPE_MAP = {
0: serializers.CharField,
1: serializers.IntegerField,
2: serializers.FloatField,
3: serializers.DateTimeField,
4: serializers.DateField,
5: c_fields.BooleanField,
6: serializers.IPAddressField,
}
table = models.ForeignKey(Table, related_name="fields", verbose_name="字段", on_delete=models.CASCADE)
name = models.CharField(max_length=20, verbose_name="字段名")
alias = models.CharField(default="", max_length=20, null=True, blank=True, verbose_name="别名")
readme = models.TextField(null=True, blank=True, default="", verbose_name="自述")
type = models.SmallIntegerField(choices=FIELD_TYPE_CHOICES, verbose_name="字段类型")
is_multi = models.BooleanField(default=False, verbose_name="是否为多值字段")
required = models.BooleanField(default=False, verbose_name="是否必填")
def __str__(self):
return self.name
class Meta:
verbose_name = "字段"
verbose_name_plural = verbose_name
class RestPWVerifyCode(models.Model):
user = models.OneToOneField(User, unique=True)
code = models.CharField(max_length=10)
add_time = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.code
class Meta:
verbose_name = "验证码"
verbose_name_plural = verbose_name
参考:https://github.com/open-cmdb/cmdb/blob/master/apps/mgmt/models.py#L132